我最近在尝试使用 FastAPI + Prefect 2(2.8.3),目的是使用 FastAPI 构建的 RESTful API 触发 Prefect 构建的 Workflow 。 代码如下: main.py
from fastapi import FastAPI
from test_workflow import test
app = FastAPI(
title="Data Services",
description="REST API for data services",
version="0.1.0",
)
@app.get("/")
async def root():
return {"message": "Data Services"}
@app.get("/test")
def trigger():
test()
return "completed"
from prefect import flow, task
import time
@task
def sleep1():
print("sleep1")
time.sleep(5) //模拟需要一定运行时间的计算流
return 1
@flow()
def test():
task_1 = sleep1.submit()
x = task_1.result()
print(x)
需首先运行下列命令开启服务:
$ prefect server start
$ uvicorn main:app --reload
如果尝试 GET /test ,会得到如下错误:
RuntimeError: <asyncio.locks.Event object at 0x7f0de4d19bd0 [unset]> is bound to a different event loop
如果 FastAPI 中定义为异步函数async def trigger():
, 则顺利运行不会报错,但问题是这样就丧失了并行性,FastAPI 一次只能处理一个 /test 请求,显然不符合需求。
想了很多办法都没法解决,有没有懂这方面的帮忙看看?
1
louistayd2 2023-02-26 13:06:09 +08:00
问问 ChatGPT:
这个错误通常发生在应用程序中存在多个事件循环时。根据你的描述,这可能是因为 FastAPI 和 Prefect 分别使用不同的事件循环,导致了该错误。 为了解决这个问题,你可以尝试使用一个事件循环来管理整个应用程序,这样就可以避免出现多个事件循环的情况。以下是一个可能的解决方案: 在 main.py 中创建一个事件循环对象并将其作为参数传递给 test() 函数,确保 FastAPI 和 Prefect 使用相同的事件循环。 ```python import asyncio loop = asyncio.get_event_loop() @app.get("/test") def trigger(): loop.run_until_complete(test(loop)) return "completed" ``` 在 test_workflow.py 中,将任务函数装饰器从 @task 改为 @task(run_on_executor=True),以确保它在一个新的线程中运行,从而不会与 FastAPI 使用的事件循环发生冲突。 ```python from prefect import flow, task import time @task(run_on_executor=True) def sleep1(): print("sleep1") time.sleep(5) return 1 @flow def test(loop): with loop: task_1 = sleep1.submit() x = task_1.result() print(x) ``` 这样,通过将事件循环对象传递给 test() 函数并使用 @task(run_on_executor=True) 装饰器,你可以确保 FastAPI 和 Prefect 使用相同的事件循环,并且在运行任务时不会发生事件循环冲突。 |
2
NCZkevin 2023-02-26 13:19:16 +08:00 2
没用过 Prefect 2(2.8.3),出现这种情况大概率是因为在异步里使用了同步。 可以考虑把 test() 放到 loop.run_in_executor 去异步调用。
``` @app.get("/test") async def trigger(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, test) return "completed" ``` |
3
Drahcir OP @louistayd2 虽然 AI 给的细节不对,比如 @task 并不存在 run_on_executor 这个参数。但是我感觉思路是对的,如果能让 Prefect 2 使用 FastAPI 的事件循环或许能解决这个问题。不过我不太了解 Prefect 2 源代码,暂时无解。
|
4
shuimugan 2023-02-26 13:48:45 +08:00 via Android 1
常见错误了,异步循环里别用同步的库,time.sleep 改 asyncio.sleep
|
5
Drahcir OP @NCZkevin 你好,谢谢,用这种解决方案的话,确实 FastAPI 本身不会被阻塞。比如说如果我执行了 GET /test, 在 sleep 的同时,我可以继续访问 /docs 页面。
不过,问题是如果我想触发同一个请求的话,还是会按顺序执行。比如说,如果我同时执行两个 GET /test ,那么还是会一个接一个执行,不能并行。 实际上,我试了试 uvicorn 指定多个 worker ,发现即使是多进程下仍有这个问题。 这次,直接抛开 Prefect 测试: ```Python def sleep(): print("sleep", time.time()) time.sleep(5) @app.get("/test") async def trigger(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, sleep) return "completed" ``` |