算法暴露接口(xhs、dy、ks、wx、hnw)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

234 lines
7.5 KiB

import threading
import pymysql
from pymysql.cursors import DictCursor
from dbutils.pooled_db import PooledDB
class CRAWLER_DB_CONF(object):
"""
点评
"""
DBHOST = '172.18.1.181'
DBPORT = 3306
DBUSER = 'crawl'
DBPWD = 'crawl'
DBNAME = 'dianpinggfy'
DBCHAR = 'utf8'
DB_FULL_NAME = "dianpinggfy"
class CRAWLER_DB_CONF_KS(object):
"""
快手
"""
DBHOST = '172.18.1.134'
DBPORT = 3306
DBUSER = 'crawl666'
DBPWD = 'lx2a4jN1xFT96kj20LU='
DBNAME = 'KS_storage'
DBCHAR = 'utf8'
DB_FULL_NAME = "KS_storage"
class CRAWLER_DB_CONF_DY(object):
"""
抖音
"""
DBHOST = '172.18.1.181'
DBPORT = 3306
DBUSER = 'crawl'
DBPWD = 'crawl'
DBNAME = 'test'
DBCHAR = 'utf8'
DB_FULL_NAME = "test"
class MysqlPoolClient(object):
"""
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现获取连接对象:conn = Mysql.getConn()
释放连接对象;conn.close()或del conn
备注:单步进入
"""
# 连接池对象
__pool = {}
__lock = threading.Lock()
# TODO(YaoPeng): 反复加锁影响性能,但是爬虫场景下,可以暂时容忍
def __init__(self, db_conf):
MysqlPoolClient.__lock.acquire()
# 数据库构造函数,从连接池中取出连接,并生成操作游标
# pip install DBUtils
self._conn = MysqlPoolClient.__getConn(db_conf)
self._cursor = self._conn.cursor()
MysqlPoolClient.__lock.release()
def __del__(self):
self.dispose()
@staticmethod
def __getConn(db_conf):
pool_name = db_conf.DB_FULL_NAME
"""
@summary: 静态方法,从连接池中取出连接
@return MySQLdb.connection
"""
if pool_name not in MysqlPoolClient.__pool:
MysqlPoolClient.__pool[pool_name] = PooledDB(creator=pymysql,
mincached=1,
maxcached=20,
host=db_conf.DBHOST,
port=db_conf.DBPORT,
user=db_conf.DBUSER,
passwd=db_conf.DBPWD,
db=db_conf.DBNAME,
use_unicode=True,
charset=db_conf.DBCHAR,
cursorclass=DictCursor)
return MysqlPoolClient.__pool[pool_name].connection()
def getAll(self, sql, param=None):
"""
@summary: 执行查询,并取出所有结果集
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list(字典对象)/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
query_result = self._cursor.fetchall()
else:
query_result = False
return query_result
def getOne(self, sql, param=None):
"""
@summary: 执行查询,并取出第一条
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
query_result = self._cursor.fetchone()
else:
query_result = False
return count,query_result
def getMany(self, sql, num, param=None):
"""
@summary: 执行查询,并取出num条结果
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param num:取得的结果条数
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
query_result = self._cursor.fetchmany(num)
else:
query_result = False
return query_result
def insertOne(self, sql, value=None):
"""
@summary: 向数据表插入一条记录
@param sql:要插入的SQL格式
@param value:要插入的记录数据tuple/list
@return: insertId 受影响的行数
"""
self._cursor.execute(sql, value)
return self.__getInsertId()
def insertMany(self, sql, values):
"""
@summary: 向数据表插入多条记录
@param sql:要插入的SQL格式
@param values:要插入的记录数据tuple(tuple)/list[list]
@return: count 受影响的行数
"""
count = self._cursor.executemany(sql, values)
return count
def updateMany(self, sql, values):
"""
@summary: 向数据表更新多条记录
@param sql:要插入的SQL格式
@param values:要插入的记录数据tuple(tuple)/list[list]
@return: count 受影响的行数
"""
count = self._cursor.executemany(sql, values)
return count
def __getInsertId(self):
"""
获取当前连接最后一次插入操作生成的id,如果没有则为0
"""
self._cursor.execute("SELECT @@IDENTITY AS id")
result = self._cursor.fetchall()
return result[0]['id']
def __query(self, sql, param=None, commit=True):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if commit:
self._conn.commit()
return count
def update(self, sql, param=None):
"""
@summary: 更新数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def delete(self, sql, param=None):
"""
@summary: 删除数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要删除的条件 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def begin(self):
"""
@summary: 开启事务
"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""
@summary: 结束事务
"""
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, is_end=1):
"""
@summary: 释放连接池资源
"""
MysqlPoolClient.__lock.acquire()
if is_end == 1:
self.end('commit')
else:
self.end('rollback')
self._cursor.close()
self._conn.close()
MysqlPoolClient.__lock.release()