# 数据管理服务类 from datetime import datetime from typing import Any from dateutil import parser from alibabacloud_sample.dbservice import DBService class DataService: conn = None dbservice = DBService() def __init__(self): self.conn = self.dbservice.get_conn() # 插入文件记录 def insert_file_data(self, file_name: str, file_path: str, file_size: int, file_type: str,file_time: str, file_hash: str): conn = self.dbservice.get_conn() cursor = conn.cursor() try: cursor.execute( "INSERT INTO file_data(file_name,file_path,file_size,file_type,file_time,file_hash) values(%s,%s,%s,%s,%s,%s)", (file_name, file_path, file_size, file_type, file_time, file_hash)) conn.commit() except Exception as e: print(e) conn.rollback() cursor.execute("select file_name,file_path,file_size,file_type,file_time,file_hash from file_data where `file_hash` = %s", (file_hash,)) rows = cursor.fetchall() data = {} for i in range(len(rows[0])): data[cursor.description[i][0]] = rows[0][i] return data return file_path # 根据文件路径获取hash def get_invoice(self, file_path: str = None,invice_id: int = None) -> dict[Any, Any]: conn = self.dbservice.get_conn() cursor = conn.cursor() sql = """ select id, checkCode, drawer, formType, invoiceAmountPreTax, invoiceCode, invoiceDate, invoiceNumber, invoiceTax, invoiceType, machineCode, passwordArea, printedInvoiceCode, printedInvoiceNumber, purchaserBankAccountInfo, purchaserContactInfo, purchaserName, purchaserTaxNumber, recipient, remarks, reviewer, sellerBankAccountInfo, sellerContactInfo, sellerName, sellerTaxNumber, specialTag, title, totalAmount, totalAmountInWords,file_path,verify_time, verify_time,verify_status,`desc`,desc_files,status from invoice """ if invice_id: sql += " where id = '"+str(invice_id)+"'" cursor.execute(sql) else: sql += " where file_path = '"+file_path+"'" cursor.execute(sql) rows = cursor.fetchall() data = {} if len(rows) > 0: for i in range(len(rows[0])): data[cursor.description[i][0]] = rows[0][i] return data # 根据文件hash获取文件 def get_file(self, file_hash: str,file_size: int) -> dict[Any, Any] | None: conn = self.dbservice.get_conn() cursor = conn.cursor() cursor.execute(""" select file_name,file_path,file_size,file_type,file_time,file_hash from file_data where `file_hash` = %s and file_size = %s """ , (file_hash,file_size,)) rows = cursor.fetchall() if len(rows) > 0: data = {} for i in range(len(rows[0])): data[cursor.description[i][0]] = rows[0][i] return data return None # 插入发票记录 def insert_invoice_log(self, data: dict, file_path: str): conn = self.dbservice.get_conn() cursor = conn.cursor() # 插入主表数据 invoice_insert = """ INSERT INTO invoice ( checkCode, drawer, formType, invoiceAmountPreTax, invoiceCode, invoiceDate, invoiceNumber, invoiceTax, invoiceType, machineCode, passwordArea, printedInvoiceCode, printedInvoiceNumber, purchaserBankAccountInfo, purchaserContactInfo, purchaserName, purchaserTaxNumber, recipient, remarks, reviewer, sellerBankAccountInfo, sellerContactInfo, sellerName, sellerTaxNumber, specialTag, title, totalAmount, totalAmountInWords,file_path ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ invoice_values = ( data.get('checkCode', ''), data.get('drawer', ''), data.get('formType', ''), float(data.get('invoiceAmountPreTax', 0.0)), data.get('invoiceCode', ''), data.get('invoiceDate', ''), data.get('invoiceNumber', ''), float(data.get('invoiceTax', 0.0)), data.get('invoiceType', ''), data.get('machineCode', ''), data.get('passwordArea', ''), data.get('printedInvoiceCode', ''), data.get('printedInvoiceNumber', ''), data.get('purchaserBankAccountInfo', ''), data.get('purchaserContactInfo', ''), data.get('purchaserName', ''), data.get('purchaserTaxNumber', ''), data.get('recipient', ''), data.get('remarks', ''), data.get('reviewer', ''), data.get('sellerBankAccountInfo', ''), data.get('sellerContactInfo', ''), data.get('sellerName', ''), data.get('sellerTaxNumber', ''), data.get('specialTag', ''), data.get('title', ''), float(data.get('totalAmount', 0.0)), data.get('totalAmountInWords', ''), file_path ) cursor.execute(invoice_insert, invoice_values) conn.commit() invoice_id = cursor.lastrowid return invoice_id # 更新发票状态 def update_invoice_status(self, invoice_id: int, verify_status: str,inspectionAmount :str ): conn = self.dbservice.get_conn() cursor = conn.cursor() cursor.execute( "UPDATE invoice SET verify_time = %s, " "verify_status = %s, " "inspectionAmount = %s, " "status = 'no'" "WHERE id = %s", (datetime.now().strftime("%Y-%m-%d %H:%M:%S"), verify_status,inspectionAmount, invoice_id)) conn.commit() def update_invoice_desc(self, invoice_id: int|str, desc: str, desc_file: str,status: str) -> str: conn = self.dbservice.get_conn() cursor = conn.cursor() cursor.execute( "UPDATE invoice SET " "`desc` = %s," "desc_files = %s," "status = %s " "WHERE id = %s", (desc, desc_file,status, invoice_id)) conn.commit() return "success" # 查询验证记录 def get_verify_log(self, file_path: str) -> dict[Any, Any]|None: conn = self.dbservice.get_conn() cursor = conn.cursor() cursor.execute(""" select inspectionAmount,cyjgxx,verify_time,file_path from verify_log where file_path = %s order by verify_time desc limit 1 """ , (file_path,)) rows = cursor.fetchall() if len(rows) > 0: data = {} for i in range(len(rows[0])): data[cursor.description[i][0]] = rows[0][i] return data return None # 插入验证记录 def insert_verify_log(self, inspection_amount: str, cyjgxx: str, file_path: str): conn = self.dbservice.get_conn() cursor = conn.cursor() cursor.execute( "INSERT INTO verify_log(inspectionAmount,cyjgxx,verify_time,file_path) values(%s,%s,%s,%s)", (inspection_amount,cyjgxx,parser.parse(datetime.now().strftime("%Y-%m-%d %H:%M:%S")),file_path)) conn.commit() # 查询验证记录 def get_verify_log_list(self, file_path: str) -> list[dict[Any, Any]]: conn = self.dbservice.get_conn() cursor = conn.cursor() cursor.execute(""" select inspectionAmount,cyjgxx,verify_time,file_path from verify_log where file_path = %s """ , (file_path,)) rows = cursor.fetchall() data = [] for row in rows: data.append({ 'inspectionAmount': row[0], 'cyjgxx': row[1], 'verify_time': row[2], 'file_path': row[3] }) return data # 查询发票列表 def get_invoice_list(self, value: str, verify_status: str, time_out = 'all' ,start_date: datetime = None, end_date: datetime = None) -> list[dict[Any, Any]]: conn = self.dbservice.get_conn() cursor = conn.cursor() sql = """ select id, checkCode, drawer, formType, invoiceAmountPreTax, invoiceCode, invoiceDate, invoiceNumber, invoiceTax, invoiceType, machineCode, passwordArea, printedInvoiceCode, printedInvoiceNumber, purchaserBankAccountInfo, purchaserContactInfo, purchaserName, purchaserTaxNumber, recipient, remarks, reviewer, sellerBankAccountInfo, sellerContactInfo, sellerName, sellerTaxNumber, specialTag, title, totalAmount, totalAmountInWords,file_path,verify_time,verify_status,inspectionAmount,`desc`,desc_files,status from invoice where """ if time_out != 'all': sql += " date('now', '-"+time_out+" day') >= date(verify_time) and verify_status == 'success' " elif verify_status != 'all': sql += " verify_status = '" + verify_status+"'" else: sql += " 1 = 1 " if start_date is not None: sql += " and verify_time >= '" + start_date + "'" if end_date is not None: sql += " and verify_time <= '" + end_date + "'" if value is not None and value != '': sql += " and ( " sql += "file_path like '%" + value + "%' or " sql += "checkCode like '%" + value + "%' or " sql += "drawer like '%" + value + "%' or " sql += "formType like '%" + value + "%' or " sql += "invoiceAmountPreTax like '%" + value + "%' or " sql += "invoiceCode like '%" + value + "%' or " sql += "invoiceDate like '%" + value + "%' or " sql += "invoiceNumber like '%" + value + "%' or " sql += "invoiceTax like '%" + value + "%' or " sql += "invoiceType like '%" + value + "%' or " sql += "machineCode like '%" + value + "%' or " sql += "passwordArea like '%" + value + "%' or " sql += "printedInvoiceCode like '%" + value + "%' or " sql += "printedInvoiceNumber like '%" + value + "%' or " sql += "purchaserBankAccountInfo like '%" + value + "%' or " sql += "purchaserContactInfo like '%" + value + "%' or " sql += "purchaserName like '%" + value + "%' or " sql += "purchaserTaxNumber like '%" + value + "%' or " sql += "recipient like '%" + value + "%' or " sql += "remarks like '%" + value + "%' or " sql += "reviewer like '%" + value + "%' or " sql += "sellerBankAccountInfo like '%" + value + "%' or " sql += "sellerContactInfo like '%" + value + "%' or " sql += "sellerName like '%" + value + "%' or " sql += "sellerTaxNumber like '%" + value + "%' or " sql += "specialTag like '%" + value + "%' or " sql += "title like '%" + value + "%' or " sql += "totalAmount like '%" + value + "%' or " sql += "totalAmountInWords like '%" + value + "%' or " sql += "`desc` like '%" + value + "%' or " sql += "desc_files like '%" + value + "%' " sql += ")" sql += " order by verify_time desc" print(sql) cursor.execute(sql) rows = cursor.fetchall() datas = [] for row in rows: data = {} for i in range(len(row)): data[cursor.description[i][0]] = row[i] datas.append(data) return datas