什么是数据库连接池呢?
数据库连接池维持了多个数据库连接,当我们需要执行数据库操作时,就去这个“池”里申请一个连接,执行完后将这个连接还给这个池,并不会关闭。避免了频繁连接数据库,提高了性能。
python中实现数据库连接池需要用来两个模块:pymysql, dbutils
基于函数的数据库连接池:
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的链接,0表示不创建
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='222',
database='cmdb',
charset='utf8'
)
def fetchall(sql, *args):
""" 获取所有数据 """
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchall()
cursor.close()
conn.close()
return result
def fetchone(sql, *args):
""" 获取单挑数据 """
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute(sql, args)
result = cursor.fetchone()
cursor.close()
conn.close()
return result
可以看到这里有很多重复代码,需要优化,这里仅仅作为例子演示
基于类的数据库连接池:
#!usr/bin/env python
# *- coding:utf-8 -*-
# Andy
import pymysql
from DBUtils.PooledDB import PooledDB
class SqlHelper():
def __init__(self):
self.pool = PooledDB(
creator=pymysql,
maxconnections=5,
mincached=1,
blocking=True,
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='zjgisadmin',
database='flask_excel',
charset='utf8')
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn, cursor
def close(self, cursor, conn):
cursor.close()
conn.close()
def fetchall(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn, cursor)
return result
def fetchone(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def commit_data(self, sql, *args):
conn, cursor = self.open()
cursor.execute(sql, args)
conn.commit()
self.close(conn, cursor)
这里对类的连接,关闭连接进行了封装,且实现了执行其它sql语句的commit_data功能,而不仅仅是查询 。
参考flask中上下文管理实现的数据库连接池
在实现之前我们先来看看原理:
import time
import threading
val = threading.local()
def task(i):
val.num = i
time.sleep(1)
print(threading.get_ident(), val.num)
for i in range(4):
t = threading.Thread(target=task,args=(i,))
t.start()
输出:
140716988368640 1
140716996822784 0
140716979914496 2
140716971460352 3
在每个线程执行val.num时会在内存中开辟一块独立空间,来存储这个值。每个线程的id根据get_ident()方法获取。
现在来自定义一个local, 在它内部我们维护一个storage字典,键为线程的id,值为一个字典。
赋值时:
当线程的id已经在storage字典中时,将我们的数据,即key,value存到进程id对应的字典中,如果还没有这个id,那么给这个id对应的值赋值为{key:value}.
取值时:
根据线程的id获取,如果没有对应的线程的Id返回None.
import threading
class Local(object):
def __init__(self):
object.__setattr__(self, 'storage', {})
def __setattr__(self, key, value):
ident = threading.get_ident()
if ident in self.storage:
self.storage[ident][key] = value
else:
self.storage[ident] = {key: value}
def __getattr__(self, item):
ident = threading.get_ident()
if ident not in self.storage:
return
return self.storage[ident].get(item)
local = Local()
def task(arg):
x = "arg%s" % arg
local.x = arg
print(local.__dict__)
for i in range(5):
t = threading.Thread(target=task, args=(i,))
t.start()
"""
storage = {
1111:{'x':[0,1,2,3]},
1112:{'x':1}
1113:{'x':2}
1114:{'x':3}
1115:{'x':4}
}
"""
#输出结果:
{'storage': {140584902526720: {'x': 0}, 140584894072576: {'x': 1}, 140584815625984: {'x': 2}, 140584807171840: {'x': 3},
140584798717696: {'x': 4}}}
我们对上面的代码做些优化:我们改成维护一个列表.
class Local(object):
def __init__(self):
object.__setattr__(self,'storage',{})
def __setattr__(self, key, value):
ident = threading.get_ident()
if ident in self.storage:
self.storage[ident][key].append(value)
else:
self.storage[ident] = {key:[value,]}
def __getattr__(self, item):
ident = threading.get_ident()
if ident not in self.storage:
return
return self.storage[ident][item][-1]
将这个种思路应用到数据库连接池中:
import threading
import pymysql
from DBUtils.PooledDB import PooledDB
class SqlHelper(object):
def __init__(self):
self.pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的链接,0表示不创建
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='222',
database='cmdb',
charset='utf8'
)
self.local = threading.local()
def open(self):
conn = self.pool.connection()
cursor = conn.cursor()
return conn, cursor
def close(self, cursor, conn):
cursor.close()
conn.close()
def fetchall(self, sql, *args):
""" 获取所有数据 """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchall()
self.close(conn, cursor)
return result
def fetchone(self, sql, *args):
""" 获取所有数据 """
conn, cursor = self.open()
cursor.execute(sql, args)
result = cursor.fetchone()
self.close(conn, cursor)
return result
def __enter__(self):
conn,cursor = self.open()
rv = getattr(self.local,'stack',None)
if not rv:
self.local.stack = [(conn,cursor),]
else:
rv.append((conn,cursor))
self.local.stack = rv
return cursor
def __exit__(self, exc_type, exc_val, exc_tb):
rv = getattr(self.local,'stack',None)
if not rv:
# del self.local.stack
return
conn,cursor = self.local.stack.pop()
cursor.close()
conn.close()
db = SqlHelper()
当使用with语句时,它会先执行__enter__方法,而执行完后会执行__exit__方法。
我们在local中维护一个列表,并通过append,pop来模拟栈的功能。如果没有这个字典时,我们将conn,cursor组成的元组放到列表中,如果已经存在,那么添加一个元组。退出时则将它销毁。
另一种更简单的实现:
import threading
import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的链接,0表示不创建
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='222',
database='cmdb',
charset='utf8'
)
class SqlHelper(object):
def __init__(self):
self.conn = None
self.cursor = None
def open(self):
conn = POOL.connection()
cursor = conn.cursor()
return conn, cursor
def close(self):
self.cursor.close()
self.conn.close()
def __enter__(self):
self.conn,self.cursor = self.open()
return self.cursor
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
with SqlHelper() as cursor:
cursor.execute('select * from table1 where id>10;')
Pool是全局变量,open时申请一个连接,而每个对象实例化后都维护一个自己的self.conn, self.cursor,也达到了隔离的目的。
以上就是四种数据库连接池的实现,其中第三种参考了flask源码,相对比较难,flask源码中其实实现了对协程的粒度,这里就不写了。