You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

46 lines
1.1 KiB

from __future__ import annotations
from hashlib import sha256
from pathlib import Path
from uuid import uuid4
from fastapi import UploadFile
from app.config import STORAGE_DIR
CHUNK_SIZE = 1024 * 1024
def ensure_storage_dirs() -> None:
STORAGE_DIR.mkdir(parents=True, exist_ok=True)
async def save_upload(file: UploadFile) -> tuple[str, int, str]:
ensure_storage_dirs()
suffix = Path(file.filename or "archive.tar.gz").suffixes
extension = "".join(suffix) if suffix else ".bin"
stored_name = f"{uuid4().hex}{extension}"
destination = STORAGE_DIR / stored_name
digest = sha256()
size = 0
with destination.open("wb") as buffer:
while chunk := await file.read(CHUNK_SIZE):
size += len(chunk)
digest.update(chunk)
buffer.write(chunk)
await file.close()
return stored_name, size, digest.hexdigest()
def delete_file(stored_name: str) -> None:
path = STORAGE_DIR / stored_name
if path.exists():
path.unlink()
def resolve_path(stored_name: str) -> Path:
return STORAGE_DIR / stored_name