Python使用adbapi创建MySQL连接池

 看这里


这里我们使用adbapi 进行MySql的连接


import pymysql
# adbapi是python的数据库连接池
from twisted.enterprise import adbapi
from twisted.internet import reactor

class DBHelper(object):
    # 初始化连接池
    def __init__(self):
        print("初始化连接池")
        dbparams = dict(
            host ='47.101.181.53', 
            # 端口号不设置默认为3306,需要其他端口号请用Number类型。否则抛出类型异常
            port = 3307,
            db='boottest',
            user='root',
            passwd='520413',
            charset='utf8',  #编码要加上,否则可能出现中文乱码问题
            cursorclass=pymysql.cursors.DictCursor, #如果使用的MySQLdb,可以修改为MySQLdb.
            use_unicode=False,
        )
        #**表示将字典扩展为关键字参数,相当于host=xxx,db=yyy....
        dbpool = adbapi.ConnectionPool('pymysql', **dbparams)
        self.dbpool = dbpool

    def connect(self):
        return self.dbpool

    def truncate(self):
        sql = "truncate table novel_tab";
        #调用插入的方法
        query = self.dbpool.runInteraction(self._truncate, sql)
        #调用异常处理方法
        query.addErrback(self._handle_error)


    def insert(self, item):
        # sql = "insert into tech_courses(title,image,brief,course_url,created_at) values(%s,%s,%s,%s,%s)"
        sql ="insert into dubbo_dog(appId,traceId,className,methodName) values(%s,%s,%s,%s)"
        #调用插入的方法
        query = self.dbpool.runInteraction(self._conditional_insert, sql, item)
        #调用异常处理方法
        query.addErrback(self._handle_error)
        return item
    
    #写入的数据
    def _conditional_insert(self,tx, sql, item):
        params = (item[0],item[1],item[2],item[3])
        tx.execute(sql, params)

    def update(self,item):
        sql = "update novel_tab set novelLastUpdateChapter = %s, novelLastUpdateChapterUrl = %s where novelUrl = %s"
        query = self.dbpool.runInteraction(self._conditional_insert, sql, item)
        #调用异常处理方法
        query.addErrback(self._handle_error)
        return item

    ## 需要修改的数据
    def _conditional_update(self, tx, sql, item):
        params = (item['lastUpdateChapter'], item['lastUpdateChapterUrl'], item['url'])
        tx.execute(sql, params)

    def _truncate(self, tx, sql):
        tx.execute(sql)

    #错误处理方法
    def _handle_error(self, failue):
        print('--------------database operation exception!!-----------------')
        print(failue)

if __name__ =='__main__':
    db = DBHelper()
    item = ['sdf',"f","fd","dfdsf"]
    db.insert(item)
    # 由于adbapi 为异步非阻塞,单独使用py文件时,
    # 必须设置异步返回的时间,否则返回结果时,由于没有回调,adbapi会进行回滚,数据库插入失败。
    # 而使用框架如Scrapy时,则不必。
    reactor.callLater(10, reactor.stop)
    reactor.run()


备注:我是在看这个爬虫项目时,发现pipelines里用的这种方式的连接池,以前自己用的PooledDB(from DBUtils.PooledDB import PooledDB),
本意是尽快熟悉下scrapy如何爬取appstore,所以并没有改成PooledDB,而且简单修改了下连接池的设置,就开始使用了。