# 文件服务接口 from typing import Any from flask import Request from werkzeug.datastructures import FileStorage from werkzeug.utils import secure_filename import os , time, hashlib from alibabacloud_sample.dataservice import DataService dataservice = DataService() class FileService: # 创建文件服务实例 def __init__(self): pass # 上传文件 def upload(self, request: Request, file_path: str) -> None | str | dict[Any, Any] | Any: if request.method == 'POST': # 检查是否有文件在请求中 if 'file' not in request.files: return 'No file part' file = request.files['file'] if file.filename == '': return 'No selected file' if file: date = time.strftime("%Y%m%d", time.localtime()) filetype = secure_filename(file.filename) filename = str(int(time.time())) + "_" + file.filename filepath = os.path.join(file_path, date, filename) os.makedirs(os.path.dirname(filepath), exist_ok=True) file.save(filepath) file_size = os.path.getsize(filepath) file_hash = self.getHash(filepath) v_file = dataservice.get_file(file_hash,file_size) if v_file: os.remove(filepath) return v_file else: v_file = dataservice.insert_file_data(file.filename, filepath, file_size, filetype, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),file_hash) print("insert v_file:", v_file) return v_file # 获取文件md5 def getHash(self, file_path: str) -> str: with open(file_path, 'rb') as f: md5 = hashlib.md5() while True: data = f.read(8192) if not data: break md5.update(data) return md5.hexdigest()