Python 基础教程

Python 高级教程

Python 相关应用

Python 笔记

python pymysql 轮询批量查询示例

Python 笔记 Python 笔记


python 的 mysql 连接组件 pymysql 如何使用轮询方式批量读取数据,以控制每次查询数据量,保护 db 的压力?

示例

思路是根据表的主键按照升序批量查询,然后下一次根据主键的偏移量范围升序批量查询,如此往复。

直接上代码:

import pymysql


def connect_db():
    return pymysql.connect(
        host='x.x.x.x',
        port=3306,
        user='xxx',
        password='xxxx',
        database='xxxxx',
        charset='utf8'
    )


def select_data_loop_cursor(conn: pymysql.Connection, table: str, id_field='id', fetch_fields='name', condition='1=1'):
    #   元组结果列表
    result_list = []

    cursor = 0
    fetch_size = 1000
    loop = True
    finished_count = 0
    cur = conn.cursor()
    while loop:
        sql = 'SELECT {id}, {fetch_fields} FROM {table} WHERE {id} > {cursor} AND {condition} ' \
              'ORDER BY {id} LIMIT {fetch_size}' \
            .format(table=table,
                    id=id_field,
                    fetch_fields=fetch_fields,
                    cursor=cursor,
                    fetch_size=fetch_size)
        cur.execute(sql)
        result = cur.fetchall()
        if result:
            size = len(result)
            if size < fetch_size:
                loop = False
            finished_count = finished_count + size
            if finished_count % 2000 == 0:
                print("finished count ======>", finished_count)
            if size > 1:
                #   返回多条记录时,两层元组
                cursor = result[size - 1][0]
                for t in result:
                    result_list.append(t)
            else:
                #   返回单条记录时,一层元组
                cursor = result[0]
                result_list.append(result)
    cur.close()
    return result_list


if __name__ == '__main__':
    con = connect_db()
    rl = select_data_loop_cursor(con, table='tb_demo_name')
    con.close()
    for r in rl:
        print(r)