- 对 Python 3.8 适用
check_db3.py
# encoding: utf-8
# author: qbit
# date: 2022-05-10
# summary: 遍历检查 db3 文件是否正确,统计记录条数
import os
import sys
import time
import pprint
import sqlite3
import traceback
from multiprocessing import Pool, Lock
SubProcNum = 1 # 子进程数量
TableName = \'my_table_name\'
Db3Dir = r\'F:\\tmp\'
def Init4ProcOneFile(lock):
r\"\"\" 子进程初始化函数 \"\"\"
global gLock
gLock = lock
def ProcOneFile(db3file: str):
r\'\'\'
文件无损坏时返回 sqlite 文件中的记录条数
\'\'\'
pid = os.getpid()
if SubProcNum > 2: # 多进程版本
with gLock:
print(f\'pid:{pid}, ProcOneFile {db3file} ...\')
else: # 单进程版本
print(f\'pid:{pid}, ProcOneFile {db3file} ...\')
conn = sqlite3.connect(db3file)
cur = conn.cursor()
sql = f\'select count(*) from {TableName};\'
exMsg = \'\'
try:
cur.execute(sql)
row = cur.fetchone()
except Exception as ex:
exMsg = traceback.format_exc()
if SubProcNum > 2: # 多进程版本
with gLock:
print(f\'Error file: {db3file} \\n {exMsg}\')
else:
print(f\'Error file: {db3file} \\n {exMsg}\')
cur.close()
conn.close()
if exMsg:
return \'no\', 0, db3file
else:
return \'ok\', row[0], db3file
if __name__ == \'__main__\':
db3List = []
for root, dirs, files in os.walk(Db3Dir):
for file in files:
pathfile = os.path.join(root, file)
if not file.endswith(\'.db3\'):
continue
db3List.append(pathfile)
print(f\'db3List size: {len(db3List)}\')
if not db3List:
sys.exit(0)
startTime = time.time()
lock = Lock()
print(f\'子进程数量: {SubProcNum}\')
okDb3List = []
noDb3List = []
if SubProcNum > 1: # 多进程版本
# 还未开启多进程,无需加锁
print(f\'{db3List=}\')
with Pool(SubProcNum, initializer=Init4ProcOneFile, initargs=(lock,)) as p:
results = p.imap_unordered(func=ProcOneFile, iterable=db3List)
recordNum = 0 # 完好文件的数据总条数
fileCnt = 0 # 已处理的文件个数
for status, num, db3file in results:
if status == \'ok\':
recordNum += num
okDb3List.append((num, db3file))
else:
noDb3List.append(db3file)
fileCnt += 1
with lock:
print(f\'需处理总文件个数: {len(db3List)}, \'
f\'已处理文件个数: {fileCnt}, \'
f\'okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}\',
f\'已花费时间: {(time.time()-startTime):.2f}s\')
print(f\'需处理总文件个数: {len(db3List)}, \'
f\'已处理文件个数: {fileCnt}, \'
f\'okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}\',
f\'已花费时间: {(time.time()-startTime):.2f}s\')
print(f\"db3List size: {len(db3List)}, okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}\")
print(f\"okDb3 record number: {recordNum}\")
print(f\"okDb3List: \\n {pprint.pformat(okDb3List)}\")
print(f\"noDb3List: \\n {pprint.pformat(noDb3List)}\")
else: # 单进程版本
print(f\'{db3List=}\')
recordNum = 0 # 完好文件的数据总条数
fileCnt = 0 # 已处理的文件个数
for db3file in db3List:
status, num, db3file = ProcOneFile(db3file)
if status == \'ok\':
recordNum += num
okDb3List.append((num, db3file))
else:
noDb3List.append(db3file)
fileCnt += 1
with lock:
print(f\'需处理总文件个数: {len(db3List)}, \'
f\'已处理文件个数: {fileCnt}, \'
f\'okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}\',
f\'已花费时间: {(time.time()-startTime):.2f}s\')
print(f\"db3List size: {len(db3List)}, okDb3List size: {len(okDb3List)}, noDb3List size: {len(noDb3List)}\")
print(f\"okDb3 record number: {recordNum}\")
print(f\"okDb3List: \\n {pprint.pformat(okDb3List)}\")
print(f\"noDb3List: \\n {pprint.pformat(noDb3List)}\")
print(f\"Time total: {(time.time()-startTime):.2f}s\")
本文出自 qbit snap