1 问题描述
最近在做项目的时候遇到稍微复杂的一个场景,需要夸多组件数据处理。具体流程如下:
- 从
google
bigquery
大叔据平台将计算和聚合过的每小时结果集存在pandas
的dataframe
中。 pandas
的数据与postgresql
数据库同步, 将结果集中有些字段为空的部分从postgresq
中再回填给pandas
的dataframe
中。- 合并之后的结果在更新入
postgresq
, 同时写入hive
表中便于后期用presto
做长查询。
本来这套逻辑似乎更适用于 spark
map reduce
来做,但是由于 bigquery
的计算分布在几十个 gcp project
里面。
spark
调用 bigquery
的话配置权限很麻烦,同时结果集每小时的大小也只在三万到十万级别。
计算任务是在 bigquery
中完成, 所以这时候调用 spark
似乎有点重了。所以决定使用 python pandas
配合别的大数据主件。
bigquer
每次会计算历史10几个小时的聚合结果集,按照每小时3-10万的量算下来,最多一次合并 postgresq
的量级在百万以内。
这个量级在合并的时候从 pgsql
拉数据下来,再在内存中用 pandas
做 join
明显不是明智的选择。
从 pgsql
中 select
百万记录下来 pandas
join
完之后再上传 pqsql
,再 update
无疑增加了计算的复杂度。
所以我的选择是: 按小时将 pandas
dataframe
上传到 pgsql
临时表, 直接在数据库中做 join
并 update
pgsql
中的记录 。
因此我们需要实现的就是如何优雅的把 pandas dataframe
上传到 pgsql
的临时表,并提供接口操作临时表和已有的表做运算。
2 解决方案
虽然 pandas
自己也带了数据库连接的接口,但是过于简单,不能满足复杂的处理过程。
但是又不想用很重的库如 sqlalchemy
等支持 ORM
的库,觉得还是对程序员不友好。而且要写很多手脚架来适配。
所以这里推荐我常用的数据库连接库 records
动态解析
pandas dataframe
对应的数据库临时表的schema
def infer_pd_schema(df):
'''infer pandas dataframe schema
'''
schema = []
type_map = {
'int64': 'int8',
'object': 'varchar(512)',
'float64': 'float8'
}
for col, tp in df.dtypes.items():
t = type_map[tp.name]
item = f'''"{col}" {t}'''
schema.append(item)
return ', '.join(schema)如果要增加跟多复杂的类型作为
pandas dataframe
和pgsql
的映射,可以往type_map
里面添加对应的类型。 或者你也可以修改非pgsql
的类型数据库对应的字段,以匹配你真实的需求。上传
pandas dataframe
数据到数据库临时表def create_temp_table_with_df(conn, table, dataframe):
'''conn is a pgsql connect transaction
table is the name of the temp table and df is the pandas dataframe
'''
schema = infer_pd_schema(dataframe)
conn.query(f'drop table if exists {table}')
query = f'''create TEMPORARY table if not exists {table}
({schema});'''
logger.info(query)
conn.query(query)
# After testing, threads will not improve the uploading performance
# here is a tricky way to upload a pandas dataframe to pgsql without "insert" query.
# 600,000 records can be delivered within secs.
with tempfile.NamedTemporaryFile() as tf:
name = tf.name
logger.info(f'creating temp data file {name}')
dataframe.to_csv(name, header=True, index=False)
query = f'''
copy {table} from STDIN WITH (FORMAT CSV, HEADER TRUE)
'''
rconn = conn._conn.connection
cursor = rconn.cursor()
with open(name, 'r') as f:
cursor.copy_expert(query, f)
rconn.commit()
logger.info('upload data done')当
pandas
的数据量很大比如几百万条记录时,sql insert
操作的性能往往不会很好。即便是使用多线程操作,性能的提升也是不明显。 这里我采用了pgsql
的copy
命令, 很tricky
的深入了records
库的低层connection
调用方法。 百万级的记录上传只需要秒级就可以完成。对
pandas dataframe
的临时表与数据库中已经有的表做联合操作def query_pg_with_df(table, dataframe, sql):
'''Temp upload pandas dataframe to pgsql for interaction with tables in pg.
return a pandas dataframe
'''
pgdb = records.Database(
'postgres://{host}/{db}?user={user}&password={pwd}'.format(
host=PG_SQL_HOST,
db=PG_SQL_NAME,
user=PG_SQL_USER,
pwd=PG_SQL_PASS)
)
try:
with pgdb.transaction() as conn:
create_temp_table_with_df(conn, table, dataframe)
logger.info(sql)
res = conn.query(sql)
return pd.DataFrame(res.as_dict())
except Exception as e:
logger.error(e.msg)
finally:
pgdb.close()这里函数中的
table
变量是指后面pandas dataframe
上传至pgsql
之后的临时表名。变量sql
就可以操作这个临时表和数据库里面其他表做正常的类似join
的操作。 返回的结果是一个pandas dataframe
。 这样就可以在内存中做后续进一步的处理了。
3 总结
- 如果采用
sql insert
一条条纪录的话,性能不能符合要求,所以采用了copy csv
的方式。 - 提供了
pandas dataframe
如何与pgsql
里的表做交互比如join
等操作。 - 可以根据实际采用的
sql
服务类型,修改此代码porting
到mysql
或者别的sql service
。
Date: 2020-03-18
Created: 2022-09-24 Sat 14:46