数据库连接池

什么是数据库连接池呢?

数据库连接池维持了多个数据库连接,当我们需要执行数据库操作时,就去这个“池”里申请一个连接,执行完后将这个连接还给这个池,并不会关闭。避免了频繁连接数据库,提高了性能。

python中实现数据库连接池需要用来两个模块:pymysql, dbutils

基于函数的数据库连接池:

import pymysql
from DBUtils.PooledDB import PooledDB

POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的链接,0表示不创建
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    ping=0,  # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always

    host='127.0.0.1',
    port=3306,
    user='root',
    password='222',
    database='cmdb',
    charset='utf8'
)


def fetchall(sql, *args):
    """ 获取所有数据 """
    conn = POOL.connection()
    cursor = conn.cursor()
    cursor.execute(sql, args)
    result = cursor.fetchall()
    cursor.close()
    conn.close()

    return result


def fetchone(sql, *args):
    """ 获取单挑数据 """
    conn = POOL.connection()
    cursor = conn.cursor()
    cursor.execute(sql, args)
    result = cursor.fetchone()
    cursor.close()
    conn.close()

    return result  

可以看到这里有很多重复代码,需要优化,这里仅仅作为例子演示

 

基于类的数据库连接池:

#!usr/bin/env python
# *- coding:utf-8 -*-
# Andy
import pymysql

from DBUtils.PooledDB import PooledDB


class SqlHelper():
    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,
            maxconnections=5,
            mincached=1,
            blocking=True,
            ping=0,
            host='127.0.0.1',
            port=3306,
            user='root',
            password='zjgisadmin',
            database='flask_excel',
            charset='utf8')

    def open(self):
        conn = self.pool.connection()
        cursor = conn.cursor()
        return conn, cursor

    def close(self, cursor, conn):
        cursor.close()
        conn.close()

    def fetchall(self, sql, *args):
        conn, cursor = self.open()
        cursor.execute(sql, args)
        result = cursor.fetchall()
        self.close(conn, cursor)
        return result

    def fetchone(self, sql, *args):
        conn, cursor = self.open()
        cursor.execute(sql, args)
        result = cursor.fetchone()
        self.close(conn, cursor)
        return result

    def commit_data(self, sql, *args):
        conn, cursor = self.open()
        cursor.execute(sql, args)
        conn.commit()
        self.close(conn, cursor)

这里对类的连接,关闭连接进行了封装,且实现了执行其它sql语句的commit_data功能,而不仅仅是查询 。

参考flask中上下文管理实现的数据库连接池

在实现之前我们先来看看原理:

import time
import threading

val = threading.local()

def task(i):
    val.num = i
    time.sleep(1)
    print(threading.get_ident(), val.num)

for i in range(4):
    t = threading.Thread(target=task,args=(i,))
    t.start()

输出:

140716988368640 1
140716996822784 0
140716979914496 2
140716971460352 3

在每个线程执行val.num时会在内存中开辟一块独立空间,来存储这个值。每个线程的id根据get_ident()方法获取。

现在来自定义一个local, 在它内部我们维护一个storage字典,键为线程的id,值为一个字典。

赋值时:

当线程的id已经在storage字典中时,将我们的数据,即key,value存到进程id对应的字典中,如果还没有这个id,那么给这个id对应的值赋值为{key:value}.

取值时:

根据线程的id获取,如果没有对应的线程的Id返回None.

import threading

class Local(object):
    def __init__(self):
        object.__setattr__(self, 'storage', {})

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in self.storage:
            self.storage[ident][key] = value
        else:
            self.storage[ident] = {key: value}

    def __getattr__(self, item):
        ident = threading.get_ident()
        if ident not in self.storage:
            return
        return self.storage[ident].get(item)


local = Local()


def task(arg):
    x = "arg%s" % arg
    local.x = arg
    print(local.__dict__)


for i in range(5):
    t = threading.Thread(target=task, args=(i,))
    t.start()

"""
storage = {
    1111:{'x':[0,1,2,3]},
    1112:{'x':1}
    1113:{'x':2}
    1114:{'x':3}
    1115:{'x':4}
}
"""
#输出结果:
{'storage': {140584902526720: {'x': 0}, 140584894072576: {'x': 1}, 140584815625984: {'x': 2}, 140584807171840: {'x': 3},
 140584798717696: {'x': 4}}}

我们对上面的代码做些优化:我们改成维护一个列表.

class Local(object):
    def __init__(self):
        object.__setattr__(self,'storage',{})

    def __setattr__(self, key, value):
        ident = threading.get_ident()
        if ident in self.storage:
            self.storage[ident][key].append(value)
        else:
            self.storage[ident] = {key:[value,]}

    def __getattr__(self, item):
        ident = threading.get_ident()
        if ident not in self.storage:
            return
        return self.storage[ident][item][-1]

将这个种思路应用到数据库连接池中:

import threading

import pymysql
from DBUtils.PooledDB import PooledDB



class SqlHelper(object):
    def __init__(self):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的链接,0表示不创建
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='222',
            database='cmdb',
            charset='utf8'
        )
        self.local = threading.local()

    def open(self):
        conn = self.pool.connection()
        cursor = conn.cursor()
        return conn, cursor

    def close(self, cursor, conn):
        cursor.close()
        conn.close()

    def fetchall(self, sql, *args):
        """ 获取所有数据 """
        conn, cursor = self.open()
        cursor.execute(sql, args)
        result = cursor.fetchall()
        self.close(conn, cursor)
        return result

    def fetchone(self, sql, *args):
        """ 获取所有数据 """
        conn, cursor = self.open()
        cursor.execute(sql, args)
        result = cursor.fetchone()
        self.close(conn, cursor)
        return result

    def __enter__(self):
        conn,cursor = self.open()
        rv = getattr(self.local,'stack',None)
        if not rv:
            self.local.stack = [(conn,cursor),]
        else:
            rv.append((conn,cursor))
            self.local.stack = rv
        return cursor

    def __exit__(self, exc_type, exc_val, exc_tb):
        rv = getattr(self.local,'stack',None)
        if not rv:
            # del self.local.stack
            return
        conn,cursor = self.local.stack.pop()
        cursor.close()
        conn.close()

db = SqlHelper()

当使用with语句时,它会先执行__enter__方法,而执行完后会执行__exit__方法。

我们在local中维护一个列表,并通过append,pop来模拟栈的功能。如果没有这个字典时,我们将conn,cursor组成的元组放到列表中,如果已经存在,那么添加一个元组。退出时则将它销毁。

 

另一种更简单的实现:

import threading
import pymysql

from DBUtils.PooledDB import PooledDB

POOL = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的链接,0表示不创建
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='222',
            database='cmdb',
            charset='utf8'
        )

class SqlHelper(object):
    def __init__(self):
        self.conn = None
        self.cursor = None

    def open(self):
        conn = POOL.connection()
        cursor = conn.cursor()
        return conn, cursor

    def close(self):
        self.cursor.close()
        self.conn.close()

    def __enter__(self):
        self.conn,self.cursor = self.open()
        return self.cursor

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


with SqlHelper() as cursor:
    cursor.execute('select * from table1 where id>10;')

 

Pool是全局变量,open时申请一个连接,而每个对象实例化后都维护一个自己的self.conn, self.cursor,也达到了隔离的目的。

以上就是四种数据库连接池的实现,其中第三种参考了flask源码,相对比较难,flask源码中其实实现了对协程的粒度,这里就不写了。

上一篇:Flask 蓝图

下一篇:Django使用原生sql的三种方式