问题描述
我正在使用很少会崩溃的外部C库运行一些代码。
我已经使用multiprocessing.Pool
封装了这些代码以并行运行。
我希望能够检测池中的某个进程是否已隔离。
从出发听起来我需要使用multiprocessing.Process
重新实现池,这样我就可以检查is_alive()
但是我不确定该怎么做。
例如:
import multiprocessing
import time
def fn(arg):
if arg == 4:
# this should segfault
# http://codegolf.stackexchange.com/questions/4399/shortest-code-that-return-sigsegv
import ctypes;ctypes.string_at(0)
time.sleep(2)
return arg**2
def main():
pool = multiprocessing.Pool()
inputs = range(10)
results = pool.map_async(fn, inputs)
while True:
if results.ready():
break
time.sleep(0.5)
print "status: still running..."
# ... some async code ...
# detect failed process here?
outputs = results.get()
print outputs
if __name__ == '__main__':
main()
我该如何重写以检测段错误? (此外,我尝试使用python 3.5.0运行它,但似乎没有在另一个问题中建议抛出异常。)
1楼
pool._pool是进程列表,作为multiprocessing.Process对象。
您可以检查他们的返回码
pool._pool[0].return_code
请参阅: :
编辑后的评论
import multiprocessing
import time
def fn(arg):
if arg == 4:
# this should segfault
# http://codegolf.stackexchange.com/questions/4399/shortest-code-that-return-sigsegv
#pass
import ctypes;ctypes.string_at(0)
time.sleep(1)
return (arg, arg**2)
def main():
MAX_TOTAL_DURATION = 40
MAX_TASK_DURATION = 10
pool = multiprocessing.Pool()
inputs = range(10)
start_time = time.time()
results_iterator = pool.imap_unordered(fn, inputs, 1)
got_result_for = []
while len(got_result_for) < len(inputs) and (time.time() - start_time) < MAX_TOTAL_DURATION:
try:
argument, result = results_iterator.next(MAX_TASK_DURATION)
got_result_for.append(argument)
print("status: got results for: " + str(got_result_for))
except multiprocessing.TimeoutError:
print("one of the tasks timeouted")
if len(got_result_for) < len(inputs):
print("missing results: " + str(set(inputs).difference(set(got_result_for))))
if __name__ == '__main__':
main()