python – multiprocessing.Pool:使用apply_async的回调选项时调用辅助函数

weixin_38061608 2019-09-12 01:27:04
apply_async的流程如何在调用iterable(?)函数和回调函数之间起作用? 设置:我正在读取2000文件目录中的所有文件的一些行,一些有数百万行,有些只有少数几行.提取一些标题/格式/日期数据以对每个文件进行特征化.这是在16 CPU机器上完成的,因此对它进行多处理是有意义的. 目前,预期的结果被发送到一个列表(ahlala),所以我可以打印出来;之后,这将被写入* .csv.这是我的代码的简化版本,最初基于this非常有用的帖子. import multiprocessing as mp def dirwalker(directory): ahlala = [] # X() reads files and grabs lines, calls helper function to calculate # info, and returns stuff to the callback function def X(f): fileinfo = Z(arr_of_lines) return fileinfo # Y() reads other types of files and does the same thing def Y(f): fileinfo = Z(arr_of_lines) return fileinfo # results() is the callback function def results(r): ahlala.extend(r) # or .append, haven't yet decided # helper function def Z(arr): return fileinfo # to X() or Y()! for _,_,files in os.walk(directory): pool = mp.Pool(mp.cpu_count() for f in files: if (filetype(f) == filetypeX): pool.apply_async(X, args=(f,), callback=results) elif (filetype(f) == filetypeY): pool.apply_async(Y, args=(f,), callback=results) pool.close(); pool.join() return ahlala 注意,如果我将所有Z()(辅助函数)放入X(),Y()或results()中,代码就可以工作,但这是重复还是可能慢?我知道每个函数调用都会调用回调函数,但是何时调用回调函数?是在pool.apply_async()之后…完成进程的所有作业?如果在第一个函数pool的范围(?)内调用这些辅助函数,那么它应该更快吗?apply_async()需要(在这种情况下,X())?如果没有,我应该把帮助函数放在results()中吗? 其他相关的想法:守护进程是否为什么没有出现?我也很困惑如何排队,如果这是问题. This seems like a place to start learning it,但是在使用apply_async时可以安全地忽略排队,或者只是在显着的时间效率低下?
...全文
2637 1 打赏 收藏 转发到动态 举报
写回复
用AI写文章
1 条回复
切换为时间正序
请发表友善的回复…
发表回复
weixin_38066613 2019-09-12
  • 打赏
  • 举报
回复
你在这里询问了很多不同的东西,所以我会尽力覆盖它: 一旦工作进程返回其结果,您传递给回调的函数将在主进程(而不是worker)中执行.它在Pool对象内部创建的线程中执行.该线程使用result_queue中的对象,该对象用于从所有工作进程获取结果.线程将结果从队列中拉出后,它会执行回调.当您的回调正在执行时,不能从队列中提取其他结果,因此回调快速完成非常重要.在您的示例中,只要您通过apply_async对X或Y进行的一次调用完成,结果将由worker进程放入result_queue,然后结果处理线程将结果从result_queue中提取,并且您的回调将被执行. 其次,我怀疑你没有看到你的示例代码发生任何事情的原因是因为所有的工作函数调用都失败了.如果worker函数失败,则永远不会执行回调.除非您尝试从apply_async调用返回的AsyncResult对象中获取结果,否则根本不会报告失败.但是,由于您没有保存任何这些对象,因此您永远不会知道发生的故障.如果我是你,我会在你测试时尝试使用pool.apply,这样你就会在发生错误时立即看到错误. 工人可能失败的原因(至少在你提供的示例代码中)是因为X和Y被定义为另一个函数内的函数.多处理将函数和对象传递给工作进程,方法是在主进程中对它们进行pickle,并在工作进程中对它们进行unpickling.在其他函数内定义的函数不可选,这意味着多处理将无法在工作进程中成功取消它们.要解决此问题,请在模块的顶层定义两个函数,而不是嵌入insirwalker函数. 你肯定应该继续从X和Y调用Z,而不是结果.这样,Z可以在所有工作进程中同时运行,而不必在主进程中一次运行一个调用.请记住,您的回调函数应该尽可能快,因此您不会保留处理结果.在那里执行Z会减慢速度. 这里有一些简单的示例代码,与您正在执行的操作类似,希望能让您了解代码的外观: import multiprocessing as mp import os # X() reads files and grabs lines, calls helper function to calculate # info, and returns stuff to the callback function def X(f): fileinfo = Z(f) return fileinfo # Y() reads other types of files and does the same thing def Y(f): fileinfo = Z(f) return fileinfo # helper function def Z(arr): return arr + "zzz" def dirwalker(directory): ahlala = [] # results() is the callback function def results(r): ahlala.append(r) # or .append, haven't yet decided for _,_,files in os.walk(directory): pool = mp.Pool(mp.cpu_count()) for f in files: if len(f) > 5: # Just an arbitrary thing to split up the list with pool.apply_async(X, args=(f,), callback=results) # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :( else: pool.apply_async(Y, args=(f,), callback=results) pool.close() pool.join() return ahlala if __name__ == "__main__": print(dirwalker("/usr/bin")) 输出: ['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ] 编辑: 您可以使用multiprocessing.Manager类创建在父进程和子进程之间共享的dict对象: pool = mp.Pool(mp.cpu_count()) m = multiprocessing.Manager() helper_dict = m.dict() for f in files: if len(f) > 5: pool.apply_async(X, args=(f, helper_dict), callback=results) else: pool.apply_async(Y, args=(f, helper_dict), callback=results) 然后让X和Y接受一个名为helper_dict的第二个参数(或者你想要的任何名字),然后你就完成了. 需要注意的是,这可以通过创建包含普通dict的服务器进程来工作,并且所有其他进程通过Proxy对象与该dict进行通信.因此,每当您阅读或写入字典时,您都在进行IPC.这使它比真正的字典慢很多.
【重要提示】本资源设置为0积分下载,若非0积分请勿轻易下载 亲爱的CSDN用户: 首先感谢你点进这个资源页面。我需要提前说明一个重要情况: 本资源原本已设置为“0积分下载”,即作者希望完全免费共享。但CSDN平台有会根据文件的下载热度、文件大小、用户权限等因素,自动将部分资源的积分调整为非0数值(如1积分、2积分、5积分等)。这是平台系统的自动行为,而非作者本人的设定。 因此,如果你当前看到该资源的下载所需积分不是0(例如显示为1、2、3……),请谨慎决定是否下载。 如果你按照非0积分支付并下载后发现资源内容不符合预期、链接失效,或者实际上该资源本应是免费的,作者无法为此承担积分损失或退还操作。强烈建议:仅在页面显示为0积分进行下载。 另外,本资源描述中并未直接提供具体的下载地址或外部链接,因为它本身是一个通过CSDN官方上传通道提交的文件/内容包。如果你看到描述中没有外部网盘地址,这是正常的——资源文件应通过CSDN内置的“下载”按钮获取。若因平台积分显示异常导致你支付了积分,请优先联系CSDN客服咨询积分退还政策,作者没有权限修改平台自动设定的积分值。 感谢你的理解与支持。技术分享本应开放,但受限于平台规则,特此提醒如上。祝学习进步!
内容概要:本文系统介绍了基于最小势能原理(即能量法)的物理信息神经网络(PINNs)在求解固体力学二维问题中的理论框架与应用实践,并提供了完整的PyTorch代码实现案例。该方法通过将物理系统的总势能泛函嵌入神经网络的损失函数中,利用深度学习框架直接求解满足控制方程和边界条件的位移场近似解,避免了传统数值方法对网格划分的依赖。文章重点剖析了基于变分原理的能量形式如何替代强形式偏微分方程构建损失项,提升了求解的稳定性与泛化能力。同,研究对比了不同PINNs架构与训练策略在处理复杂几何形状、非均匀材料属性及非线性力学行为的精度、收敛性与计算效率,验证了其在处理经典弹性力学问题(如平面应力/应变问题)中的有效性与潜力。配套代码便于读者复现结果并拓展至更广泛的工程应用场景。; 适合人群:具备一定深度学习基础和固体力学知识的研究生、科研人员及工程技术从业者,特别适用于从事计算力学、智能仿真、物理驱动建模、结构分析等方向的研究者。; 使用场景及目标:①掌握基于能量法的PINNs建模范式,理解其相较于传统有限元法的优势与局限;②研究物理信息神经网络在无网格求解复杂边界与非线性问题中的能力;③对比不同神经网络结构对求解精度与收敛速度的影响,推动PINNs在工程实际中的落地应用。; 阅读建议:建议读者结合所提供的PyTorch代码逐模块分析网络构建、能量泛函定义、边界条件施加及训练流程设计,深入理解物理约束与机器学习模型的融合机制,并鼓励在自定义问题中调整网络参数、采样策略与损失权重以优化性能。
【重要提示】本资源设置为0积分下载,若非0积分请勿轻易下载 亲爱的CSDN用户: 首先感谢你点进这个资源页面。我需要提前说明一个重要情况: 本资源原本已设置为“0积分下载”,即作者希望完全免费共享。但CSDN平台有会根据文件的下载热度、文件大小、用户权限等因素,自动将部分资源的积分调整为非0数值(如1积分、2积分、5积分等)。这是平台系统的自动行为,而非作者本人的设定。 因此,如果你当前看到该资源的下载所需积分不是0(例如显示为1、2、3……),请谨慎决定是否下载。 如果你按照非0积分支付并下载后发现资源内容不符合预期、链接失效,或者实际上该资源本应是免费的,作者无法为此承担积分损失或退还操作。强烈建议:仅在页面显示为0积分进行下载。 另外,本资源描述中并未直接提供具体的下载地址或外部链接,因为它本身是一个通过CSDN官方上传通道提交的文件/内容包。如果你看到描述中没有外部网盘地址,这是正常的——资源文件应通过CSDN内置的“下载”按钮获取。若因平台积分显示异常导致你支付了积分,请优先联系CSDN客服咨询积分退还政策,作者没有权限修改平台自动设定的积分值。 感谢你的理解与支持。技术分享本应开放,但受限于平台规则,特此提醒如上。祝学习进步!

477

社区成员

发帖
与我相关
我的任务
社区描述
其他技术讨论专区
其他 技术论坛(原bbs)
社区管理员
  • 其他技术讨论专区社区
加入社区
  • 近7日
  • 近30日
  • 至今
社区公告
暂无公告

试试用AI创作助手写篇文章吧