python – multiprocessing.Pool:使用apply_async的回调选项时调用辅助函数

weixin_38061608 2019-09-12 01:27:04
apply_async的流程如何在调用iterable(?)函数和回调函数之间起作用? 设置:我正在读取2000文件目录中的所有文件的一些行,一些有数百万行,有些只有少数几行.提取一些标题/格式/日期数据以对每个文件进行特征化.这是在16 CPU机器上完成的,因此对它进行多处理是有意义的. 目前,预期的结果被发送到一个列表(ahlala),所以我可以打印出来;之后,这将被写入* .csv.这是我的代码的简化版本,最初基于this非常有用的帖子. import multiprocessing as mp def dirwalker(directory): ahlala = [] # X() reads files and grabs lines, calls helper function to calculate # info, and returns stuff to the callback function def X(f): fileinfo = Z(arr_of_lines) return fileinfo # Y() reads other types of files and does the same thing def Y(f): fileinfo = Z(arr_of_lines) return fileinfo # results() is the callback function def results(r): ahlala.extend(r) # or .append, haven't yet decided # helper function def Z(arr): return fileinfo # to X() or Y()! for _,_,files in os.walk(directory): pool = mp.Pool(mp.cpu_count() for f in files: if (filetype(f) == filetypeX): pool.apply_async(X, args=(f,), callback=results) elif (filetype(f) == filetypeY): pool.apply_async(Y, args=(f,), callback=results) pool.close(); pool.join() return ahlala 注意,如果我将所有Z()(辅助函数)放入X(),Y()或results()中,代码就可以工作,但这是重复还是可能慢?我知道每个函数调用都会调用回调函数,但是何时调用回调函数?是在pool.apply_async()之后…完成进程的所有作业?如果在第一个函数pool的范围(?)内调用这些辅助函数,那么它应该更快吗?apply_async()需要(在这种情况下,X())?如果没有,我应该把帮助函数放在results()中吗? 其他相关的想法:守护进程是否为什么没有出现?我也很困惑如何排队,如果这是问题. This seems like a place to start learning it,但是在使用apply_async时可以安全地忽略排队,或者只是在显着的时间效率低下?
...全文
2586 1 打赏 收藏 转发到动态 举报
写回复
用AI写文章
1 条回复
切换为时间正序
请发表友善的回复…
发表回复
weixin_38066613 2019-09-12
  • 打赏
  • 举报
回复
你在这里询问了很多不同的东西,所以我会尽力覆盖它: 一旦工作进程返回其结果,您传递给回调的函数将在主进程(而不是worker)中执行.它在Pool对象内部创建的线程中执行.该线程使用result_queue中的对象,该对象用于从所有工作进程获取结果.线程将结果从队列中拉出后,它会执行回调.当您的回调正在执行时,不能从队列中提取其他结果,因此回调快速完成非常重要.在您的示例中,只要您通过apply_async对X或Y进行的一次调用完成,结果将由worker进程放入result_queue,然后结果处理线程将结果从result_queue中提取,并且您的回调将被执行. 其次,我怀疑你没有看到你的示例代码发生任何事情的原因是因为所有的工作函数调用都失败了.如果worker函数失败,则永远不会执行回调.除非您尝试从apply_async调用返回的AsyncResult对象中获取结果,否则根本不会报告失败.但是,由于您没有保存任何这些对象,因此您永远不会知道发生的故障.如果我是你,我会在你测试时尝试使用pool.apply,这样你就会在发生错误时立即看到错误. 工人可能失败的原因(至少在你提供的示例代码中)是因为X和Y被定义为另一个函数内的函数.多处理将函数和对象传递给工作进程,方法是在主进程中对它们进行pickle,并在工作进程中对它们进行unpickling.在其他函数内定义的函数不可选,这意味着多处理将无法在工作进程中成功取消它们.要解决此问题,请在模块的顶层定义两个函数,而不是嵌入insirwalker函数. 你肯定应该继续从X和Y调用Z,而不是结果.这样,Z可以在所有工作进程中同时运行,而不必在主进程中一次运行一个调用.请记住,您的回调函数应该尽可能快,因此您不会保留处理结果.在那里执行Z会减慢速度. 这里有一些简单的示例代码,与您正在执行的操作类似,希望能让您了解代码的外观: import multiprocessing as mp import os # X() reads files and grabs lines, calls helper function to calculate # info, and returns stuff to the callback function def X(f): fileinfo = Z(f) return fileinfo # Y() reads other types of files and does the same thing def Y(f): fileinfo = Z(f) return fileinfo # helper function def Z(arr): return arr + "zzz" def dirwalker(directory): ahlala = [] # results() is the callback function def results(r): ahlala.append(r) # or .append, haven't yet decided for _,_,files in os.walk(directory): pool = mp.Pool(mp.cpu_count()) for f in files: if len(f) > 5: # Just an arbitrary thing to split up the list with pool.apply_async(X, args=(f,), callback=results) # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :( else: pool.apply_async(Y, args=(f,), callback=results) pool.close() pool.join() return ahlala if __name__ == "__main__": print(dirwalker("/usr/bin")) 输出: ['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ] 编辑: 您可以使用multiprocessing.Manager类创建在父进程和子进程之间共享的dict对象: pool = mp.Pool(mp.cpu_count()) m = multiprocessing.Manager() helper_dict = m.dict() for f in files: if len(f) > 5: pool.apply_async(X, args=(f, helper_dict), callback=results) else: pool.apply_async(Y, args=(f, helper_dict), callback=results) 然后让X和Y接受一个名为helper_dict的第二个参数(或者你想要的任何名字),然后你就完成了. 需要注意的是,这可以通过创建包含普通dict的服务器进程来工作,并且所有其他进程通过Proxy对象与该dict进行通信.因此,每当您阅读或写入字典时,您都在进行IPC.这使它比真正的字典慢很多.
打开下面链接,直接免费下载资源: https://renmaiwang.cn/s/v34wj 在Python编程中,异步进程和回调函数是提高程序效率和并发能力的重要工具。本文将深入探讨Python3中异步进程回调函数(callback())的概念、工作原理以及如何在实际编程中运用。让我们理解什么是异步进程。在同步模式下,程序会顺序执行,如果某个操作需要等待(如I/O操作,如读写文件或网络通信),那么程序会暂停执行,直到这个操作完成。而异步进程则不同,它允许程序在等待操作完成继续执行其他任务,提高程序的并发性。异步处理通常与事件循环(event loop)和回调函数结合使用回调函数在这种情况下起到关键作用。当一个异步任务完成后,它不会直接返回结果,而是调用预先定义好的回调函数,并将结果传递给这个回调函数。这样,主程序可以在执行其他任务的同,等待异步任务的结果,并在结果可用进行处理。在给出的示例代码中,我们看到一个使用`multiprocessing.Pool`创建的进程池来执行异步任务。`Pool.apply_async`方法用于提交任务到进程池,其中`callback`参数就是回调函数。在这个例子中,`download`函数是被异步调用的任务,当它执行完毕后,返回的结果会被传递给`alterUser`回调函数。```pythonp = Pool(3)p.apply_async(func=download, callback=alterUser)```这里的`alterUser`函数接收`download`函数的返回值并打印出来。可以看到,当`download`函数运行结束后,控制权回到了主进程,然后调用`alterUser`回调函数,处理`download`的返回结果。此外,代码还展示了如何在多线程环境中使用回调函数。`thread.start_new_thread

476

社区成员

发帖
与我相关
我的任务
社区描述
其他技术讨论专区
其他 技术论坛(原bbs)
社区管理员
  • 其他技术讨论专区社区
加入社区
  • 近7日
  • 近30日
  • 至今
社区公告
暂无公告

试试用AI创作助手写篇文章吧