# 文件服务接口 from typing import Any from flask import Request from werkzeug.utils import secure_filename import os , time, hashlib from alibabacloud_sample.dataservice import DataService dataservice = DataService() class FileService: # 创建文件服务实例 def __init__(self): pass # 上传文件 def upload(self, request: Request, file_path: str) -> None | str | dict[Any, Any] | Any: if request.method == 'POST': # 检查是否有文件在请求中 if 'file' not in request.files: return '没有文件被上传' file = request.files['file'] if file.filename == '': return '请重新选择文件' if file: date = time.strftime("%Y%m%d", time.localtime()) filetype = secure_filename(file.filename) filename = str(int(time.time())) + "_" + file.filename filepath = os.path.join(file_path, date, filename) os.makedirs(os.path.dirname(filepath), exist_ok=True) file.save(filepath) file_size = os.path.getsize(filepath) file_hash = self.get_hash(filepath) v_file = dataservice.get_file(file_hash,file_size) if v_file: os.remove(filepath) return v_file else: v_file = dataservice.insert_file_data(file.filename, filepath, file_size, filetype, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),file_hash) print("insert v_file:", v_file) return v_file return None # 获取文件md5 @staticmethod def get_hash(file_path: str) -> str: with open(file_path, 'rb') as f: md5 = hashlib.md5() while True: data = f.read(8192) if not data: break md5.update(data) return md5.hexdigest()