1 问题描述
最近在做项目的时候遇到稍微复杂的一个场景,需要夸多组件数据处理。具体流程如下:
- 从 
googlebigquery大叔据平台将计算和聚合过的每小时结果集存在pandas的dataframe中。 pandas的数据与postgresql数据库同步, 将结果集中有些字段为空的部分从postgresq中再回填给pandas的dataframe中。- 合并之后的结果在更新入 
postgresq, 同时写入hive表中便于后期用presto做长查询。 
本来这套逻辑似乎更适用于 spark map reduce 来做,但是由于 bigquery 的计算分布在几十个 gcp project 里面。
spark 调用 bigquery 的话配置权限很麻烦,同时结果集每小时的大小也只在三万到十万级别。 
计算任务是在 bigquery 中完成, 所以这时候调用 spark 似乎有点重了。所以决定使用 python pandas  配合别的大数据主件。
bigquer 每次会计算历史10几个小时的聚合结果集,按照每小时3-10万的量算下来,最多一次合并 postgresq 的量级在百万以内。
这个量级在合并的时候从 pgsql 拉数据下来,再在内存中用 pandas 做 join 明显不是明智的选择。 
从 pgsql 中 select 百万记录下来 pandas join 完之后再上传 pqsql ,再 update 无疑增加了计算的复杂度。
所以我的选择是: 按小时将 pandas dataframe 上传到 pgsql 临时表, 直接在数据库中做 join 并 update pgsql 中的记录 。
因此我们需要实现的就是如何优雅的把 pandas dataframe 上传到 pgsql 的临时表,并提供接口操作临时表和已有的表做运算。
2 解决方案
虽然 pandas 自己也带了数据库连接的接口,但是过于简单,不能满足复杂的处理过程。
但是又不想用很重的库如 sqlalchemy 等支持 ORM 的库,觉得还是对程序员不友好。而且要写很多手脚架来适配。
所以这里推荐我常用的数据库连接库 records
动态解析
pandas dataframe对应的数据库临时表的schemadef infer_pd_schema(df):
'''infer pandas dataframe schema
'''
schema = []
type_map = {
'int64': 'int8',
'object': 'varchar(512)',
'float64': 'float8'
}
for col, tp in df.dtypes.items():
t = type_map[tp.name]
item = f'''"{col}" {t}'''
schema.append(item)
return ', '.join(schema)如果要增加跟多复杂的类型作为
pandas dataframe和pgsql的映射,可以往type_map里面添加对应的类型。 或者你也可以修改非pgsql的类型数据库对应的字段,以匹配你真实的需求。上传
pandas dataframe数据到数据库临时表def create_temp_table_with_df(conn, table, dataframe):
'''conn is a pgsql connect transaction
table is the name of the temp table and df is the pandas dataframe
'''
schema = infer_pd_schema(dataframe)
conn.query(f'drop table if exists {table}')
query = f'''create TEMPORARY table if not exists {table}
({schema});'''
logger.info(query)
conn.query(query)
# After testing, threads will not improve the uploading performance
# here is a tricky way to upload a pandas dataframe to pgsql without "insert" query.
# 600,000 records can be delivered within secs.
with tempfile.NamedTemporaryFile() as tf:
name = tf.name
logger.info(f'creating temp data file {name}')
dataframe.to_csv(name, header=True, index=False)
query = f'''
copy {table} from STDIN WITH (FORMAT CSV, HEADER TRUE)
'''
rconn = conn._conn.connection
cursor = rconn.cursor()
with open(name, 'r') as f:
cursor.copy_expert(query, f)
rconn.commit()
logger.info('upload data done')当
pandas的数据量很大比如几百万条记录时,sql insert操作的性能往往不会很好。即便是使用多线程操作,性能的提升也是不明显。 这里我采用了pgsql的copy命令, 很tricky的深入了records库的低层connection调用方法。 百万级的记录上传只需要秒级就可以完成。对
pandas dataframe的临时表与数据库中已经有的表做联合操作def query_pg_with_df(table, dataframe, sql):
'''Temp upload pandas dataframe to pgsql for interaction with tables in pg.
return a pandas dataframe
'''
pgdb = records.Database(
'postgres://{host}/{db}?user={user}&password={pwd}'.format(
host=PG_SQL_HOST,
db=PG_SQL_NAME,
user=PG_SQL_USER,
pwd=PG_SQL_PASS)
)
try:
with pgdb.transaction() as conn:
create_temp_table_with_df(conn, table, dataframe)
logger.info(sql)
res = conn.query(sql)
return pd.DataFrame(res.as_dict())
except Exception as e:
logger.error(e.msg)
finally:
pgdb.close()这里函数中的
table变量是指后面pandas dataframe上传至pgsql之后的临时表名。变量sql就可以操作这个临时表和数据库里面其他表做正常的类似join的操作。 返回的结果是一个pandas dataframe。 这样就可以在内存中做后续进一步的处理了。
3 总结
- 如果采用 
sql insert一条条纪录的话,性能不能符合要求,所以采用了copy csv的方式。 - 提供了 
pandas dataframe如何与pgsql里的表做交互比如join等操作。 - 可以根据实际采用的 
sql服务类型,修改此代码porting到mysql或者别的sql service。 
Date: 2020-03-18
Created: 2022-09-24 Sat 14:46