VerifVATInvoice/alibabacloud_sample/dataservice.py
liuxiaoqing 11530baefb feat(database): 将数据库从 SQLite 迁移到 MySQL
- 修改了 DBService 类,增加了 MySQL 数据库连接配置
- 重新定义了表结构,适应 MySQL 数据库- 更新了相关的 SQL 查询语句
- 增加了创建数据库的逻辑
- 优化了表创建语句,使用 InnoDB 引擎和 UTF-8 字符集
2025-08-20 15:11:35 +08:00

273 lines
12 KiB
Python

# 数据管理服务类
from datetime import datetime
from typing import Any
from dateutil import parser
from alibabacloud_sample.dbservice import DBService
class DataService:
conn = None
dbservice = DBService()
def __init__(self):
self.conn = self.dbservice.get_conn()
# 插入文件记录
def insert_file_data(self, file_name: str, file_path: str, file_size: int, file_type: str,file_time: str, file_hash: str):
conn = self.dbservice.get_conn()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO file_data(file_name,file_path,file_size,file_type,file_time,file_hash) values(%s,%s,%s,%s,%s,%s)",
(file_name, file_path, file_size, file_type, file_time, file_hash))
conn.commit()
except Exception as e:
print(e)
conn.rollback()
cursor.execute("select file_name,file_path,file_size,file_type,file_time,file_hash from file_data where `file_hash` = %s", (file_hash,))
rows = cursor.fetchall()
data = {}
for i in range(len(rows[0])):
data[cursor.description[i][0]] = rows[0][i]
return data
return file_path
# 根据文件路径获取hash
def get_invoice(self, file_path: str = None,invice_id: int = None) -> dict[Any, Any]:
conn = self.dbservice.get_conn()
cursor = conn.cursor()
sql = """
select id, checkCode, drawer, formType, invoiceAmountPreTax, invoiceCode,
invoiceDate, invoiceNumber, invoiceTax, invoiceType, machineCode,
passwordArea, printedInvoiceCode, printedInvoiceNumber,
purchaserBankAccountInfo, purchaserContactInfo, purchaserName,
purchaserTaxNumber, recipient, remarks, reviewer,
sellerBankAccountInfo, sellerContactInfo, sellerName, sellerTaxNumber,
specialTag, title, totalAmount, totalAmountInWords,file_path,verify_time,
verify_time,verify_status,`desc`,desc_files,status
from invoice
"""
if invice_id:
sql += " where id = '"+str(invice_id)+"'"
cursor.execute(sql)
else:
sql += " where file_path = '"+file_path+"'"
cursor.execute(sql)
rows = cursor.fetchall()
data = {}
if len(rows) > 0:
for i in range(len(rows[0])):
data[cursor.description[i][0]] = rows[0][i]
return data
# 根据文件hash获取文件
def get_file(self, file_hash: str,file_size: int) -> dict[Any, Any] | None:
conn = self.dbservice.get_conn()
cursor = conn.cursor()
cursor.execute("""
select file_name,file_path,file_size,file_type,file_time,file_hash
from file_data where `file_hash` = %s and file_size = %s
"""
, (file_hash,file_size,))
rows = cursor.fetchall()
if len(rows) > 0:
data = {}
for i in range(len(rows[0])):
data[cursor.description[i][0]] = rows[0][i]
return data
return None
# 插入发票记录
def insert_invoice_log(self, data: dict, file_path: str):
conn = self.dbservice.get_conn()
cursor = conn.cursor()
# 插入主表数据
invoice_insert = """
INSERT INTO invoice (
checkCode, drawer, formType, invoiceAmountPreTax, invoiceCode,
invoiceDate, invoiceNumber, invoiceTax, invoiceType, machineCode,
passwordArea, printedInvoiceCode, printedInvoiceNumber,
purchaserBankAccountInfo, purchaserContactInfo, purchaserName,
purchaserTaxNumber, recipient, remarks, reviewer,
sellerBankAccountInfo, sellerContactInfo, sellerName, sellerTaxNumber,
specialTag, title, totalAmount, totalAmountInWords,file_path
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
invoice_values = (
data.get('checkCode', ''),
data.get('drawer', ''),
data.get('formType', ''),
float(data.get('invoiceAmountPreTax', 0.0)),
data.get('invoiceCode', ''),
data.get('invoiceDate', ''),
data.get('invoiceNumber', ''),
float(data.get('invoiceTax', 0.0)),
data.get('invoiceType', ''),
data.get('machineCode', ''),
data.get('passwordArea', ''),
data.get('printedInvoiceCode', ''),
data.get('printedInvoiceNumber', ''),
data.get('purchaserBankAccountInfo', ''),
data.get('purchaserContactInfo', ''),
data.get('purchaserName', ''),
data.get('purchaserTaxNumber', ''),
data.get('recipient', ''),
data.get('remarks', ''),
data.get('reviewer', ''),
data.get('sellerBankAccountInfo', ''),
data.get('sellerContactInfo', ''),
data.get('sellerName', ''),
data.get('sellerTaxNumber', ''),
data.get('specialTag', ''),
data.get('title', ''),
float(data.get('totalAmount', 0.0)),
data.get('totalAmountInWords', ''),
file_path
)
cursor.execute(invoice_insert, invoice_values)
conn.commit()
invoice_id = cursor.lastrowid
return invoice_id
# 更新发票状态
def update_invoice_status(self, invoice_id: int, verify_status: str,inspectionAmount :str ):
conn = self.dbservice.get_conn()
cursor = conn.cursor()
cursor.execute(
"UPDATE invoice SET verify_time = %s, "
"verify_status = %s, "
"inspectionAmount = %s, "
"status = 'no'"
"WHERE id = %s",
(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), verify_status,inspectionAmount, invoice_id))
conn.commit()
def update_invoice_desc(self, invoice_id: int|str, desc: str, desc_file: str,status: str) -> str:
conn = self.dbservice.get_conn()
cursor = conn.cursor()
cursor.execute(
"UPDATE invoice SET "
"`desc` = %s,"
"desc_files = %s,"
"status = %s "
"WHERE id = %s",
(desc, desc_file,status, invoice_id))
conn.commit()
return "success"
# 查询验证记录
def get_verify_log(self, file_path: str) -> dict[Any, Any]|None:
conn = self.dbservice.get_conn()
cursor = conn.cursor()
cursor.execute("""
select inspectionAmount,cyjgxx,verify_time,file_path from verify_log where file_path = %s order by verify_time desc limit 1
"""
, (file_path,))
rows = cursor.fetchall()
if len(rows) > 0:
data = {}
for i in range(len(rows[0])):
data[cursor.description[i][0]] = rows[0][i]
return data
return None
# 插入验证记录
def insert_verify_log(self, inspection_amount: str, cyjgxx: str, file_path: str):
conn = self.dbservice.get_conn()
cursor = conn.cursor()
cursor.execute(
"INSERT INTO verify_log(inspectionAmount,cyjgxx,verify_time,file_path) values(%s,%s,%s,%s)",
(inspection_amount,cyjgxx,parser.parse(datetime.now().strftime("%Y-%m-%d %H:%M:%S")),file_path))
conn.commit()
# 查询验证记录
def get_verify_log_list(self, file_path: str) -> list[dict[Any, Any]]:
conn = self.dbservice.get_conn()
cursor = conn.cursor()
cursor.execute("""
select inspectionAmount,cyjgxx,verify_time,file_path from verify_log where file_path = %s
"""
, (file_path,))
rows = cursor.fetchall()
data = []
for row in rows:
data.append({
'inspectionAmount': row[0],
'cyjgxx': row[1],
'verify_time': row[2],
'file_path': row[3]
})
return data
# 查询发票列表
def get_invoice_list(self, value: str, verify_status: str, time_out = 'all' ,start_date: datetime = None, end_date: datetime = None) -> list[dict[Any, Any]]:
conn = self.dbservice.get_conn()
cursor = conn.cursor()
sql = """
select id, checkCode, drawer, formType, invoiceAmountPreTax, invoiceCode,
invoiceDate, invoiceNumber, invoiceTax, invoiceType, machineCode,
passwordArea, printedInvoiceCode, printedInvoiceNumber,
purchaserBankAccountInfo, purchaserContactInfo, purchaserName,
purchaserTaxNumber, recipient, remarks, reviewer,
sellerBankAccountInfo, sellerContactInfo, sellerName, sellerTaxNumber,
specialTag, title, totalAmount, totalAmountInWords,file_path,verify_time,verify_status,inspectionAmount,`desc`,desc_files,status
from invoice where
"""
if time_out != 'all':
sql += " date('now', '-"+time_out+" day') >= date(verify_time) and verify_status == 'success' "
elif verify_status != 'all':
sql += " verify_status = '" + verify_status+"'"
else:
sql += " 1 = 1 "
if start_date is not None:
sql += " and verify_time >= '" + start_date + "'"
if end_date is not None:
sql += " and verify_time <= '" + end_date + "'"
if value is not None and value != '':
sql += " and ( "
sql += "file_path like '%" + value + "%' or "
sql += "checkCode like '%" + value + "%' or "
sql += "drawer like '%" + value + "%' or "
sql += "formType like '%" + value + "%' or "
sql += "invoiceAmountPreTax like '%" + value + "%' or "
sql += "invoiceCode like '%" + value + "%' or "
sql += "invoiceDate like '%" + value + "%' or "
sql += "invoiceNumber like '%" + value + "%' or "
sql += "invoiceTax like '%" + value + "%' or "
sql += "invoiceType like '%" + value + "%' or "
sql += "machineCode like '%" + value + "%' or "
sql += "passwordArea like '%" + value + "%' or "
sql += "printedInvoiceCode like '%" + value + "%' or "
sql += "printedInvoiceNumber like '%" + value + "%' or "
sql += "purchaserBankAccountInfo like '%" + value + "%' or "
sql += "purchaserContactInfo like '%" + value + "%' or "
sql += "purchaserName like '%" + value + "%' or "
sql += "purchaserTaxNumber like '%" + value + "%' or "
sql += "recipient like '%" + value + "%' or "
sql += "remarks like '%" + value + "%' or "
sql += "reviewer like '%" + value + "%' or "
sql += "sellerBankAccountInfo like '%" + value + "%' or "
sql += "sellerContactInfo like '%" + value + "%' or "
sql += "sellerName like '%" + value + "%' or "
sql += "sellerTaxNumber like '%" + value + "%' or "
sql += "specialTag like '%" + value + "%' or "
sql += "title like '%" + value + "%' or "
sql += "totalAmount like '%" + value + "%' or "
sql += "totalAmountInWords like '%" + value + "%' or "
sql += "`desc` like '%" + value + "%' or "
sql += "desc_files like '%" + value + "%' "
sql += ")"
sql += " order by verify_time desc"
print(sql)
cursor.execute(sql)
rows = cursor.fetchall()
datas = []
for row in rows:
data = {}
for i in range(len(row)):
data[cursor.description[i][0]] = row[i]
datas.append(data)
return datas