如何将Python中的for循环转换为并行处理?[英] How to convert a for loop into parallel processing in Python?

本文是小编为大家收集整理的关于如何将Python中的for循环转换为并行处理?的处理方法,想解了如何将Python中的for循环转换为并行处理?的问题怎么解决?如何将Python中的for循环转换为并行处理?问题的解决办法?那么可以参考本文帮助大家快速定位并解决问题。

问题描述

我仍处于学习 Python 的早期阶段.如果这个问题听起来很愚蠢,请提前道歉.

我有这组数据(表格格式),我想向其中添加一些计算列.基本上我有一些位置 lon/lat 和目的地 lon/lat,以及各自的数据时间,我正在计算每对之间的平均速度.

示例数据如下所示:

print(data_all.head(3))

   id    lon_evnt   lat_evnt          event_time  \
0   1 -179.942833  41.012467 2017-12-13 21:17:54   
1   2 -177.552817  41.416400 2017-12-14 03:16:00   
2   3 -175.096567  41.403650 2017-12-14 09:14:06   

  dest_data_generate_time   lat_dest    lon_dest  \
0 2017-12-13 22:33:37.980  37.798599 -121.292193   
1 2017-12-14 04:33:44.393  37.798599 -121.292193   
2 2017-12-14 10:33:51.629  37.798599 -121.292193  

                             address_fields_dest  \
0  {'address': 'Nestle Way', 'city': 'Lathrop...      
1  {'address': 'Nestle Way', 'city': 'Lathrop...      
2  {'address': 'Nestle Way', 'city': 'Lathrop...      

然后我将 lon/lat 压缩在一起:

data_all['ping_location'] = list(zip(data_all.lon_evnt, data_all.lat_evnt))
data_all['destination'] = list(zip(data_all.lon_dest, data_all.lat_dest))

然后我想计算每对位置 ping 之间的距离,并从一个字符串中获取一些地址信息(基本上是一个子字符串),然后计算速度:

 for idx, row in data_all.iterrows():
    dist = gcd.dist(row['destination'], row['ping_location'])
    data_all.loc[idx, 'gc_distance'] = dist

    temp_idx = str(row['address_fields_dest']).find(":")
    pos_start = temp_idx + 3
    pos_end = str(row['address_fields_dest']).find(",") - 2

    data_all.loc[idx, 'destination address'] = str(row['address_fields_dest'])[pos_start:pos_end]

    ##### calculate velocity which is: v = d/t
    ## time is the difference btwn destination time and the ping creation time
    timediff = abs(row['dest_data_generate_time'] - row['event_time'])
    data_all.loc[idx, 'velocity km/hr'] = 0

    ## check if the time dif btwn destination and event ping is more than a minute long
    if timediff > datetime.timedelta(minutes=1):
        data_all.loc[idx, 'velocity km/hr'] = dist / timediff.total_seconds() * 3600.0

现在,这个程序花了我将近 7 个小时来执行 333k ​​行数据!:(我有windows 10 2 core 16gb ram...虽然不多,但7小时肯定不行:(

如何让程序更高效地运行?我在想的一种方式是,由于数据及其计算彼此独立,我可以利用并行处理.

我读过很多帖子,但似乎大多数并行处理方法都是针对我只使用一个简单函数的情况;但在这里我添加了多个新列.

非常感谢任何帮助!或者告诉我这不可能让 pandas 进行并行处理(我相信我在某处读到过这样的说法,但我不完全确定它是否 100% 仍然正确).

读入的帖子示例:

Large Pandas Dataframe 并行处理

python pandas 数据帧到字典

如何并行化一个简单的 Python 循环?

如何在 Python 中进行并行编程

还有很多不在stackoverflow上的东西......

https://medium.com/@ageitgey/quick-tip-speed-up-your-python-data-processing-scripts-with-process-pools-cf275350163a

https://homes.cs.washington.edu/~jmschr/lectures/Parallel_Processing_in_Python.html

推荐答案

这是一个快速的解决方案 - 我根本没有尝试优化您的代码,只是将它输入到多处理池中.这将在每一行上单独运行您的函数,返回具有新属性的行,并从此输出创建一个新数据框.

import multiprocessing as mp
pool = mp.Pool(processes=mp.cpu_count())

def func( arg ):
    idx,row = arg

    dist = gcd.dist(row['destination'], row['ping_location'])
    row['gc_distance'] = dist

    temp_idx = str(row['address_fields_dest']).find(":")
    pos_start = temp_idx + 3
    pos_end = str(row['address_fields_dest']).find(",") - 2

    row['destination address'] = str(row['address_fields_dest'])[pos_start:pos_end]

    ##### calculate velocity which is: v = d/t
    ## time is the difference btwn destination time and the ping creation time
    timediff = abs(row['dest_data_generate_time'] - row['event_time'])
    row['velocity km/hr'] = 0

    ## check if the time dif btwn destination and event ping is more than a minute long
    if timediff > datetime.timedelta(minutes=1):
       row['velocity km/hr'] = dist / timediff.total_seconds() * 3600.0

    return row

new_rows = pool.map( func, [(idx,row) for idx,row in data_all.iterrows()])
data_all_new = pd.concat( new_rows )

本文地址:https://www.itbaoku.cn/post/1728212.html