Python Pandas库如何优雅的与数据库交互

1 问题描述

最近在做项目的时候遇到稍微复杂的一个场景,需要夸多组件数据处理。具体流程如下:

  • google bigquery 大叔据平台将计算和聚合过的每小时结果集存在 pandasdataframe 中。
  • pandas 的数据与 postgresql 数据库同步, 将结果集中有些字段为空的部分从 postgresq 中再回填给 pandasdataframe 中。
  • 合并之后的结果在更新入 postgresq, 同时写入 hive 表中便于后期用 presto 做长查询。

本来这套逻辑似乎更适用于 spark map reduce 来做,但是由于 bigquery 的计算分布在几十个 gcp project 里面。 spark 调用 bigquery 的话配置权限很麻烦,同时结果集每小时的大小也只在三万到十万级别。 计算任务是在 bigquery 中完成, 所以这时候调用 spark 似乎有点重了。所以决定使用 python pandas 配合别的大数据主件。

bigquer 每次会计算历史10几个小时的聚合结果集,按照每小时3-10万的量算下来,最多一次合并 postgresq 的量级在百万以内。 这个量级在合并的时候从 pgsql 拉数据下来,再在内存中用 pandasjoin 明显不是明智的选择。 从 pgsqlselect 百万记录下来 pandas join 完之后再上传 pqsql ,再 update 无疑增加了计算的复杂度。 所以我的选择是: 按小时将 pandas dataframe 上传到 pgsql 临时表, 直接在数据库中做 joinupdate pgsql 中的记录

因此我们需要实现的就是如何优雅的把 pandas dataframe 上传到 pgsql 的临时表,并提供接口操作临时表和已有的表做运算。

2 解决方案

虽然 pandas 自己也带了数据库连接的接口,但是过于简单,不能满足复杂的处理过程。 但是又不想用很重的库如 sqlalchemy 等支持 ORM 的库,觉得还是对程序员不友好。而且要写很多手脚架来适配。

所以这里推荐我常用的数据库连接库 records

records.jpg

  • 动态解析 pandas dataframe 对应的数据库临时表的 schema

    def infer_pd_schema(df):
    '''infer pandas dataframe schema
    '''

    schema = []
    type_map = {
    'int64': 'int8',
    'object': 'varchar(512)',
    'float64': 'float8'
    }
    for col, tp in df.dtypes.items():
    t = type_map[tp.name]
    item = f'''"{col}" {t}'''
    schema.append(item)
    return ', '.join(schema)

    如果要增加跟多复杂的类型作为 pandas dataframepgsql 的映射,可以往 type_map 里面添加对应的类型。 或者你也可以修改非 pgsql 的类型数据库对应的字段,以匹配你真实的需求。

  • 上传 pandas dataframe 数据到数据库临时表

    def create_temp_table_with_df(conn, table, dataframe):
    '''conn is a pgsql connect transaction
    table is the name of the temp table and df is the pandas dataframe
    '''

    schema = infer_pd_schema(dataframe)
    conn.query(f'drop table if exists {table}')

    query = f'''create TEMPORARY table if not exists {table}
    ({schema});'''

    logger.info(query)
    conn.query(query)

    # After testing, threads will not improve the uploading performance
    # here is a tricky way to upload a pandas dataframe to pgsql without "insert" query.
    # 600,000 records can be delivered within secs.

    with tempfile.NamedTemporaryFile() as tf:
    name = tf.name
    logger.info(f'creating temp data file {name}')
    dataframe.to_csv(name, header=True, index=False)
    query = f'''
    copy {table} from STDIN WITH (FORMAT CSV, HEADER TRUE)
    '''

    rconn = conn._conn.connection
    cursor = rconn.cursor()
    with open(name, 'r') as f:
    cursor.copy_expert(query, f)
    rconn.commit()
    logger.info('upload data done')

    pandas 的数据量很大比如几百万条记录时, sql insert 操作的性能往往不会很好。即便是使用多线程操作,性能的提升也是不明显。 这里我采用了 pgsqlcopy 命令, 很 tricky 的深入了 records 库的低层 connection 调用方法。 百万级的记录上传只需要秒级就可以完成。

  • pandas dataframe 的临时表与数据库中已经有的表做联合操作

    def query_pg_with_df(table, dataframe, sql):
    '''Temp upload pandas dataframe to pgsql for interaction with tables in pg.
    return a pandas dataframe
    '''

    pgdb = records.Database(
    'postgres://{host}/{db}?user={user}&password={pwd}'.format(
    host=PG_SQL_HOST,
    db=PG_SQL_NAME,
    user=PG_SQL_USER,
    pwd=PG_SQL_PASS)
    )

    try:
    with pgdb.transaction() as conn:
    create_temp_table_with_df(conn, table, dataframe)
    logger.info(sql)
    res = conn.query(sql)
    return pd.DataFrame(res.as_dict())
    except Exception as e:
    logger.error(e.msg)
    finally:
    pgdb.close()

    这里函数中的 table 变量是指后面 pandas dataframe 上传至 pgsql 之后的临时表名。变量 sql 就可以操作这个临时表和数据库里面其他表做正常的类似 join 的操作。 返回的结果是一个 pandas dataframe 。 这样就可以在内存中做后续进一步的处理了。

3 总结

  • 如果采用 sql insert 一条条纪录的话,性能不能符合要求,所以采用了 copy csv 的方式。
  • 提供了 pandas dataframe 如何与 pgsql 里的表做交互比如 join 等操作。
  • 可以根据实际采用的 sql 服务类型,修改此代码 portingmysql 或者别的 sql service

Date: 2020-03-18

Author: shawn-win11

Created: 2022-09-24 Sat 14:46

Validate