# -*-coding:utf8;-*-
import sqlite3
import queue
import threading
import time
import re
import inspect
__peewee__ = True
try:
import peewee
except ImportError:
__peewee__ = False
"""基于python实现的sqlite队列,方便的处理sqlite并发。
讲道理,写这个库的人并不会写python。SqliteQueue是继承了threading.Thread的线程,并且维护了一个向sqlite请求的队列。
回调函数参数:lst_rowid(整数,最后一行行号)、data(数据,如select会有返回)、rowcount(行数)
"""
__author__ = "KAAAsS" # ←超帅
__version__ = '0.1.0'
__license__ = "GPL v2"
_OPERATOR_MAPPING = {
"!": "!=",
"~": "LIKE",
"!~": "NOT LIKE"
}
class SqliteQueue(threading.Thread):
def __init__(self, db, wait=5):
"""
:param db: sqlite数据库文件
:param wait: 任务等候时间,单位秒,默认5
"""
threading.Thread.__init__(self)
self.daemon = True # 默认为守护线程
self._queue = queue.Queue()
self.wait = wait
self._db = db
self._conn = None
self._cursor = None
def run(self):
self._conn = sqlite3.connect(self._db) # 链接sqlite库
self._cursor = self._conn.cursor() # 获取cursor
while True:
if self._queue.qsize() < 1: # 如果队列空则sleep线程一段时间,以等待新操作
time.sleep(self.wait)
continue
task = self._queue.get()
self._deal_task(task)
def _deal_task(self, task):
if isinstance(task['data'], tuple): # 元组即execute
self._cursor.execute(task['execute'], task['data'])
elif isinstance(task['data'], list): # 列表即executemany
self._cursor.executemany(task['execute'], task['data'])
else:
self._cursor.execute(task['execute'])
self._conn.commit()
# 取回调函数的参数
if task['callback'] is None:
return
paras = inspect.getargspec(task['callback'])[0]
kwargs = {}
for param in paras:
if param == 'data':
kwargs['data'] = self._cursor.fetchall()
elif param == 'rowcount':
kwargs['rowcount'] = self._cursor.rowcount
elif param == 'lst_rowid':
kwargs['lst_rowid'] = self._cursor.lastrowid
else:
kwargs[param] = None
task['callback'](**kwargs) # 回调
def register_execute(self, execute, data=None, callback=None):
"""
注册一个执行指定SQL命令的操作
:param execute: SQL语句
:param data: 预编译参数
:param callback: 回调
:return:
"""
if not isinstance(execute, str):
raise SqliteQueueError('Illegal param! "execute" must be string!')
elif data is not None and (not isinstance(data, tuple) and not isinstance(data, list)):
raise SqliteQueueError('Illegal param! "data" must be tuple or list!')
elif callback is not None and str(type(callback)) != "<class 'function'>":
raise SqliteQueueError('Illegal param! "callback" must be function!')
self._queue.put({
'execute': execute,
'data': data,
'callback': callback
})
def register_peewee_query(self, pw_query, callback=None):
"""
注册一个执行peewee查询的操作
:param pw_query: peewee查询对象
:param callback: 回调
:return:
"""
if not __peewee__:
raise SqliteQueueError('Module "peewee" have not been installed.')
if not isinstance(pw_query, peewee.Query):
raise SqliteQueueError('Illegal param! "pw_query" must be peewee.Query!')
sql = pw_query.sql()
self.register_execute(sql[0], tuple(sql[1]), callback=callback)
def select(self, table):
"""
构建select语句
:param table: 目标表
:return:
"""
return SqlQuery(table, obj_queue=self)
def insert(self, table, data=None):
"""
构建insert语句
:param table: 目标表
:param data: 插入的数据(dict)
:return:
"""
return SqlQuery(table, method='INSERT', params=data, obj_queue=self)
def update(self, table, data=None):
"""
构建update语句
:param table: 目标表
:param data: 插入的数据(dict)
:return:
"""
return SqlQuery(table, method='UPDATE', params=data, obj_queue=self)
def delete(self, table):
"""
构建delete语句
:param table: 目标表
:return:
"""
return SqlQuery(table, method='UPDATE', obj_queue=self)
def drop(self, table):
"""
构建drop语句
:param table: 目标表
:return:
"""
return SqlQuery(table, method='DROP', obj_queue=self)
def create(self, table, params=None):
"""
创建表
:param table: 目标表
:param params: 包含字段
:return:
"""
return SqlQuery(table, method='CREATE', params=params, obj_queue=self)
class SqlQuery:
"""
简单的sql命令封装
"""
def __init__(self, table, method='SELECT', params=None, obj_queue=None):
self._queue = obj_queue
self._sql = {'table': table, 'method': method, 'distinct': False}
self._params = params
self._data = None
def execute(self, command, data=None):
self._sql = command
self._data = data
return self
def _has_commanded(self):
"""
检查是否执行过execute方法
:return:
"""
if isinstance(self._sql, str):
raise Exception('This method cannot be invoked after using method "execute"!')
def table(self, table):
"""
设定操作的目标表
:param table:
:return:
"""
self._has_commanded()
self._sql['table'] = table
return self
def field(self, *args):
"""
表示操作对象的字段
:param args:
:return:
"""
self._has_commanded()
result = ''
if len(args) < 1:
raise ValueError('Method "field" needs at least one param!')
elif len(args) == 1:
args = args[0]
if isinstance(args, str): # 若为字符串,则视为直接输入SQL格式
result = args + ' ' # 空格占位哈哈哈哈,我有毒
elif isinstance(args, dict): # 若为字典,则可能有as
if len(args) < 1:
raise ValueError('Illegal length of param for method "field"!')
for k, v in args.items():
if v is None:
result += '`%s`,' % str(k)
else:
result += '`%s` AS %s,' % (str(k), str(v))
else:
raise ValueError('Illegal param for method "field"!')
else:
for v in args: # 直接拼接即可
result += '`%s`,' % str(v)
self._sql['field'] = result[:-1]
return self
def where(self, *args):
"""
设置where条件,重复调用会叠加并使用AND连接
:param args:
:return:
"""
self._has_commanded()
cond = _parse_condition(*args)
if 'where' in self._sql: # 已经存在where了,拼接之
self._sql['where'][0] += ' AND ' + cond[0] # 拼接条件语句
self._sql['where'][1] += cond[1] # 拼接参数
else:
self._sql['where'] = cond
return self
def or_where(self, *args):
"""
设置where条件,重复调用会叠加并使用OR连接
:param args:
:return:
"""
self._has_comman
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论


























格式:pdf 资源大小:56.5KB 页数:3

收起资源包目录











共 8 条
- 1
资源评论


Java程序员-张凯
- 粉丝: 1w+
上传资源 快速赚钱
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助


最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈



安全验证
文档复制为VIP权益,开通VIP直接复制
