V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
qazwsxkevin
V2EX  ›  Python

concurrent.futures 如何从外部停止进程池里的其中一个进程,再进行启用?

  •  
  •   qazwsxkevin · 2019-11-24 23:53:19 +08:00 · 794 次点击
    这是一个创建于 1853 天前的主题,其中的信息可能已经有所发展或是发生改变。

    处理函数写得不太稳健,会吊死而导致
    FutureRetList[i].running()一直认为是 True
    粗暴做了个超时认定
    看了 concurrent.futures 的一些方法,看不出有什么方法能在外部把进程池里某个进程推倒?

    #是否进行多线程处理
    MultiProcFlag == True
    #定义进程数量
    ProcessAmount = 12
    
    
        if MultiProcFlag == True:
            FutureList = []
            FutureRetList = []
            FutureStartTimeList = []
            FutureProcSuit = []
    
            for i in range(ProcessAmount):
                FutureList.append(futures.ProcessPoolExecutor(max_workers=1))
    
            for i in range(ProcessAmount):
                FutureRetList.append(futures.Future())
    
            for i in range(ProcessAmount):
                FutureProcSuit.append('0')
    
            for i in range(ProcessAmount):
                FutureStartTimeList.append(time.time())
            
            pass
     
    
    
                # 多进程处理
                if MultiProcFlag == True:
                    insertPoolFlag = False # 在 while 循环中判断是否成功加入进程池
                    while insertPoolFlag == False:
                        for i in range(ProcessAmount):
                            Process_i = str(i + 1)
                            if FutureRetList[i].running() == True:
                                #进行时间计算,如果某进程超时,关闭进程,重新提交任务
                                tmpTime = (time.clock() - FutureStartTimeList[i])
                                # 如果进程持续时间超过 15 分钟,认定任务失败,需要重新进行
                                if tmpTime > 900:
                                    FutureList[i].shutdown() # <-恐怕不是这样乱来的?
                                    #FutureList[i].xxxx? # <-- how?
                                    time.sleep(5)
                                    FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,FutureProcSuit[i],countt,ErrorLogFilePath)
                                    print("进程池:[" + Process_i + "] [重新提交]了: [" + FutureProcIDs[i] + "]",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}")
                                else:
                                    print("进程池:[" + Process_i + "] 已经运行了 [" + '[%.2f] 秒' % (time.clock() - FutureStartTimeList[i]))
                                time.sleep(1)
                                pass
                            else: # 见到有空闲的进程就提交任务
                                FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)
                                FutureStartTimeList[i] = time.time()
                                FutureProcSuit[i] = Task[j] # 记下这个任务,准备在失败的时候,再调出进行重新提交,反正是死磕到任务成功为止
                                print("进程池:[" + Process_i + "] 提交了: [" + countt + "] 是第 [" + str(countt) + "] 个任务.",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}")
                                insertPoolFlag = True
                                break
                                pass
                            time.sleep(2)
                else: # 单进程处理
                	ProcessRet = SProcessFunc("1",TaskDict,Task[j],countt,ErrorLogFilePath)
    
    8 条回复    2019-11-25 17:17:36 +08:00
    ManjusakaL
        1
    ManjusakaL  
       2019-11-25 01:11:57 +08:00 via Android
    直接记录 PID,然后 kill 不就完了。。。
    qazwsxkevin
        2
    qazwsxkevin  
    OP
       2019-11-25 08:43:05 +08:00
    @ManjusakaL 可以对线程启动的内容纪录 PID ? 假如各个进程跑的都是是 Chrome.exe ,12 个进程跑满,内存会有 N 个 python.exe ,N 个 Chrome.exe ,怎么分辨谁是谁? 能零失误杀进程?
    以前只试过用 psutil.pids()来获取 PID 杀进程。。。
    ManjusakaL
        3
    ManjusakaL  
       2019-11-25 10:45:44 +08:00
    @qazwsxkevin 如果怕误杀进程的话,可以在提交任务的时候,传入一个 unique 的参数,然后开个队列,将 unique 和 pid 回传,然后在 main process 做一个 map,直接对想杀的组合发送 SIGKILL 就行了
    qazwsxkevin
        4
    qazwsxkevin  
    OP
       2019-11-25 11:17:49 +08:00
    @ManjusakaL 明白思路了,是一个办法。。。,暂时无法理解一个“unique 的参数”?是什么,得琢磨琢磨。。。^_^

    看看有没有其它更好的办法?
    Latin
        5
    Latin  
       2019-11-25 11:28:42 +08:00
    executor = ProcessPoolExecutor(1)
    executor.submit(xxx,xxx)
    pid = list(executor._processes.keys())[0]

    就是记录 然后调用 shell 命令 Kill 找了好久,没有优雅的解决方案
    ManjusakaL
        6
    ManjusakaL  
       2019-11-25 11:35:07 +08:00
    @qazwsxkevin 就是一个唯一的标识,可以理解是任务 ID,这样你可以将具体的任务和 PID 绑定
    ClericPy
        7
    ClericPy  
       2019-11-25 14:35:34 +08:00
    qazwsxkevin
        8
    qazwsxkevin  
    OP
       2019-11-25 17:17:36 +08:00
    做了一个简单测试,情况不妙,估计不好。。。

    else: # 见到有空闲的进程就提交任务
    FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)
    FutureStartTimeList[i] = time.time()
    FutureProcSuit[i] = Task[j] # 记下这个任务,准备在失败的时候,再调出进行重新提交,反正是死磕到任务成功为止
    print("进程池:[" + Process_i + "] 提交了: [" + countt + "] 是第 [" + str(countt) + "] 个任务.",f"{time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())}")


    pid = (list(FutureList[i]._processes.keys()))[0]
    print(pid)
    time.sleep(20)
    #进行 20 秒左右后杀进程
    exeCstr = "taskkill -f -pid " + str(pid)
    os.system(exeCstr)
    time.sleep(10)
    #再次提交
    FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)




    在杀进程后,直接就抛出异常了,再次提交也是不行的,直接报 1 码结束了主程序,整体结束。

    outut:

    Traceback (most recent call last):
    File "D:/Work//SPFromDB.py", line 309, in <module>
    FutureRetList[i] = FutureList[i].submit(SProcessFunc, str(i + 1), SomeDict,Task[j],countt,ErrorLogFilePath)
    File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\concurrent\futures\process.py", line 452, in submit
    raise BrokenProcessPool('A child process terminated '
    concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

    进程已结束,退出代码 1
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2751 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 14:21 · PVG 22:21 · LAX 06:21 · JFK 09:21
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.