python – multiprocessing.Pool:使用apply_async的回调选项时调用辅助函数
apply_async的流程如何在调用iterable(?)函数和回调函数之间起作用?
设置:我正在读取2000文件目录中的所有文件的一些行,一些有数百万行,有些只有少数几行.提取一些标题/格式/日期数据以对每个文件进行特征化.这是在16 CPU机器上完成的,因此对它进行多处理是有意义的.
目前,预期的结果被发送到一个列表(ahlala),所以我可以打印出来;之后,这将被写入* .csv.这是我的代码的简化版本,最初基于this非常有用的帖子.
import multiprocessing as mp
def dirwalker(directory):
ahlala = []
# X() reads files and grabs lines, calls helper function to calculate
# info, and returns stuff to the callback function
def X(f):
fileinfo = Z(arr_of_lines)
return fileinfo
# Y() reads other types of files and does the same thing
def Y(f):
fileinfo = Z(arr_of_lines)
return fileinfo
# results() is the callback function
def results(r):
ahlala.extend(r) # or .append, haven't yet decided
# helper function
def Z(arr):
return fileinfo # to X() or Y()!
for _,_,files in os.walk(directory):
pool = mp.Pool(mp.cpu_count()
for f in files:
if (filetype(f) == filetypeX):
pool.apply_async(X, args=(f,), callback=results)
elif (filetype(f) == filetypeY):
pool.apply_async(Y, args=(f,), callback=results)
pool.close(); pool.join()
return ahlala
注意,如果我将所有Z()(辅助函数)放入X(),Y()或results()中,代码就可以工作,但这是重复还是可能慢?我知道每个函数调用都会调用回调函数,但是何时调用回调函数?是在pool.apply_async()之后…完成进程的所有作业?如果在第一个函数pool的范围(?)内调用这些辅助函数,那么它应该更快吗?apply_async()需要(在这种情况下,X())?如果没有,我应该把帮助函数放在results()中吗?
其他相关的想法:守护进程是否为什么没有出现?我也很困惑如何排队,如果这是问题. This seems like a place to start learning it,但是在使用apply_async时可以安全地忽略排队,或者只是在显着的时间效率低下?