V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
sunhk25
V2EX  ›  Python

Celery 任务 prefork 多进程启动时,如何通过自定义类的__call__函数设置属性

  •  
  •   sunhk25 · 2022-02-16 13:09:46 +08:00 · 2578 次点击
    这是一个创建于 1011 天前的主题,其中的信息可能已经有所发展或是发生改变。

    任务被调用时__call__执行后,在调用函数(call_my_task)内部无法取得属性(_sid) 但是以-pool=solo 方式启动的话,没有问题。请教哪位同学见过类似问题吗?

    # pool 并发方式
    # worker_param = ['celery', '-A', 'app.celery', 'worker', '--pool=solo', '-l', 'INFO']
    worker_param = ['celery', '-A', 'app.celery', 'worker', '--pool=prefork', '-l', 'INFO']
    subprocess.Popen(worker_param)
    
    # task 类
    class AloneTask(celery.Task):
        def __init__(self, *args, **kwargs):
            super(AloneTask, self).__init__()
    
        def __call__(self, *args, **kwargs):
            print(f">>>self1={self}")
            self._sid = kwargs.get('sid', '1')
    
        def my_func(self):
            #  [ pool=solo ] ⇒self1==self2
            #  [ pool=prefork ] ⇒self1 !=self2
            print(f">>>self2={self}")
            return self._sid
    
    # 调用 call_my_task.delay("100")
    @celery.task(base=AloneTask)
    def call_my_task(sid):
        #  [ pool=solo ] ⇒OK
        #  [ pool=prefork ] ⇒AttributeError: 'call_my_task' object has no attribute '_sid'
        print(call_my_task.my_func())
    
    第 1 条附言  ·  2022-02-17 08:45:20 +08:00

    解决方案:通过一个全局变量来重置任务函数

    g_task = None
    
    def __call__(self, *args, **kwargs):
            print(f">>>self1={self}")
            self._sid = kwargs.get('sid', '1')
            global g_task
            g_task = self
    
    def call_my_task(sid)
        call_my_task = g_task
        print(call_my_task.my_func())
    
    2 条回复    2022-05-01 09:21:04 +08:00
    sunhk25
        1
    sunhk25  
    OP
       2022-02-16 18:53:45 +08:00
    以上代码是在 Windows 下执行的,Celery 从版本 4 之后虽然可以使用,但是不太维护。
    在 Ubuntu 上测试了一下,上面的代码没有问题。
    opengo
        2
    opengo  
       2022-05-01 09:21:04 +08:00
    @celery.task(bind=True, base=AloneTask)
    def call_my_task(self, sid):
    print(self._sid)
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   913 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 21:42 · PVG 05:42 · LAX 13:42 · JFK 16:42
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.