V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
Drahcir
V2EX  ›  Python

Python asyncio 求助,要被搞疯了

  •  
  •   Drahcir · 2023-02-26 12:30:30 +08:00 · 3129 次点击
    这是一个创建于 665 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我最近在尝试使用 FastAPI + Prefect 2(2.8.3),目的是使用 FastAPI 构建的 RESTful API 触发 Prefect 构建的 Workflow 。 代码如下: main.py

    from fastapi import FastAPI
    from test_workflow import test
    
    app = FastAPI(
        title="Data Services",
        description="REST API for data services",
        version="0.1.0",
    )
    
    @app.get("/")
    async def root():
        return {"message": "Data Services"}
    
    @app.get("/test")
    def trigger():
        test()
        return "completed"
    

    test_workflow.py

    from prefect import flow, task
    import time
    
    @task
    def sleep1():
        print("sleep1")
        time.sleep(5)  //模拟需要一定运行时间的计算流
        return 1
        
    @flow()
    def test():
        task_1 = sleep1.submit()
        x = task_1.result()
        print(x)
    

    需首先运行下列命令开启服务:

    $ prefect server start

    $ uvicorn main:app --reload

    如果尝试 GET /test ,会得到如下错误:

    RuntimeError: <asyncio.locks.Event object at 0x7f0de4d19bd0 [unset]> is bound to a different event loop

    如果 FastAPI 中定义为异步函数async def trigger():, 则顺利运行不会报错,但问题是这样就丧失了并行性,FastAPI 一次只能处理一个 /test 请求,显然不符合需求。

    想了很多办法都没法解决,有没有懂这方面的帮忙看看?

    8 条回复    2023-02-26 14:25:33 +08:00
    louistayd2
        1
    louistayd2  
       2023-02-26 13:06:09 +08:00
    问问 ChatGPT:


    这个错误通常发生在应用程序中存在多个事件循环时。根据你的描述,这可能是因为 FastAPI 和 Prefect 分别使用不同的事件循环,导致了该错误。

    为了解决这个问题,你可以尝试使用一个事件循环来管理整个应用程序,这样就可以避免出现多个事件循环的情况。以下是一个可能的解决方案:

    main.py 中创建一个事件循环对象并将其作为参数传递给 test() 函数,确保 FastAPI 和 Prefect 使用相同的事件循环。

    ```python
    import asyncio

    loop = asyncio.get_event_loop()

    @app.get("/test")
    def trigger():
    loop.run_until_complete(test(loop))
    return "completed"
    ```

    test_workflow.py 中,将任务函数装饰器从 @task 改为 @task(run_on_executor=True),以确保它在一个新的线程中运行,从而不会与 FastAPI 使用的事件循环发生冲突。

    ```python
    from prefect import flow, task
    import time

    @task(run_on_executor=True)
    def sleep1():
    print("sleep1")
    time.sleep(5)
    return 1

    @flow
    def test(loop):
    with loop:
    task_1 = sleep1.submit()
    x = task_1.result()
    print(x)
    ```

    这样,通过将事件循环对象传递给 test() 函数并使用 @task(run_on_executor=True) 装饰器,你可以确保 FastAPI 和 Prefect 使用相同的事件循环,并且在运行任务时不会发生事件循环冲突。
    NCZkevin
        2
    NCZkevin  
       2023-02-26 13:19:16 +08:00   ❤️ 2
    没用过 Prefect 2(2.8.3),出现这种情况大概率是因为在异步里使用了同步。 可以考虑把 test() 放到 loop.run_in_executor 去异步调用。
    ```
    @app.get("/test")
    async def trigger():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, test)
    return "completed"
    ```
    Drahcir
        3
    Drahcir  
    OP
       2023-02-26 13:40:22 +08:00
    @louistayd2 虽然 AI 给的细节不对,比如 @task 并不存在 run_on_executor 这个参数。但是我感觉思路是对的,如果能让 Prefect 2 使用 FastAPI 的事件循环或许能解决这个问题。不过我不太了解 Prefect 2 源代码,暂时无解。
    shuimugan
        4
    shuimugan  
       2023-02-26 13:48:45 +08:00 via Android   ❤️ 1
    常见错误了,异步循环里别用同步的库,time.sleep 改 asyncio.sleep
    Drahcir
        5
    Drahcir  
    OP
       2023-02-26 13:50:58 +08:00
    @NCZkevin 你好,谢谢,用这种解决方案的话,确实 FastAPI 本身不会被阻塞。比如说如果我执行了 GET /test, 在 sleep 的同时,我可以继续访问 /docs 页面。
    不过,问题是如果我想触发同一个请求的话,还是会按顺序执行。比如说,如果我同时执行两个 GET /test ,那么还是会一个接一个执行,不能并行。
    实际上,我试了试 uvicorn 指定多个 worker ,发现即使是多进程下仍有这个问题。

    这次,直接抛开 Prefect 测试:

    ```Python
    def sleep():
    print("sleep", time.time())
    time.sleep(5)

    @app.get("/test")
    async def trigger():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(None, sleep)
    return "completed"

    ```
    Drahcir
        6
    Drahcir  
    OP
       2023-02-26 13:54:19 +08:00
    @shuimugan 我用 time.sleep 只是为了模拟一个需要一定运行时间的 blocking 计算过程。这个计算过程是阻塞的,如果用 async def 就会在这段时间内无响应。
    理论上,直接用 def 的话 FastAPI 会自动用多线程运行,但问题是出错 RuntimeError: <asyncio.locks.Event object at 0x7f0de4d19bd0 [unset]> is bound to a different event loop
    NCZkevin
        7
    NCZkevin  
       2023-02-26 14:03:07 +08:00   ❤️ 1
    @Drahcir 我试了下,就用这个代码,用 ab 测试同时发 10 个请求看了下结果是并行的呀。
    Drahcir
        8
    Drahcir  
    OP
       2023-02-26 14:25:33 +08:00
    @NCZkevin 不好意思,是我的问题😓
    我用 /docs 页面手动测试时有这个问题。不过我在命令行里面测试,发现的确是并行的。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   905 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 22:04 · PVG 06:04 · LAX 14:04 · JFK 17:04
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.