近期写数据处理脚本时,由于数据量太大导致数据处理时间非常长,因此总结下使用到的加速技巧。

多线程处理

在历史文章中也提到了多线程处理方法,例如将一个大的 DataFrame 根据CPU数量拆成多份分别处理后合并;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import numpy as np
import pandas as pd
from multiprocessing import cpu_count, Pool


def parallelize(func, df):
""" Split data into max core partitions and execute func in parallel.
https://www.machinelearningplus.com/python/parallel-processing-python/
Parameters
----------
df : pandas Dataframe
func : any functions
Returns
-------
data : pandas Dataframe
Returned dataframe of func.
"""
cores = cpu_count()
data_split = np.array_split(df, cores)
pool = Pool(cores)
data = pd.concat(pool.map(func, data_split), ignore_index=True)
pool.close()
pool.join()
return data

如果不是 DataFrame 对象而是一个大的文件对象,对应的处理方法为:

1
2
3
4
5
6
from multiprocessing import cpu_count, Pool
pool = Pool(processes=cpu_count())
with open('test.txt', 'r') as file:
rows = pool.map(preprocess, file) # preprocess 返回值会以列表的形式保存在 rows 中
pool.close()
df = pd.DataFrame(rows, columns=["timestamp", "cmdb_id", "parent_id", "span_id", "trace_id", "duration"])

目前这两种方法应该可以处理大部分情况;

Pandas 加速处理

在文章 Python 代码优化技巧(一) 中介绍了Modin 库,此处再介绍一种库 swifter,详细教程参考:https://github.com/jmcarpenter2/swifter,这个库可以和 Modin 联用而且用法非常简单。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pandas as pd
# import modin.pandas as pd
import swifter

df = pd.DataFrame({'x': [1, 2, 3, 4], 'y': [5, 6, 7, 8]})

# runs on single core
df['x2'] = df['x'].apply(lambda x: x**2)
# runs on multiple cores
df['x2'] = df['x'].swifter.apply(lambda x: x**2)

# use swifter apply on whole dataframe
df['agg'] = df.swifter.apply(lambda x: x.sum() - x.min())

# use swifter apply on specific columns
df['outCol'] = df[['inCol1', 'inCol2']].swifter.apply(my_func)
df['outCol'] = df[['inCol1', 'inCol2', 'inCol3']].swifter.apply(my_func,
positional_arg, keyword_arg=keyword_argval)

每列出来自带进度条显示效果哦!