V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
CamD
V2EX  ›  Python

Python 异步编程 asyncio lock 如何让多个协程共享一把锁

  •  
  •   CamD · 2020-06-20 16:41:08 +08:00 · 2677 次点击
    这是一个创建于 1601 天前的主题,其中的信息可能已经有所发展或是发生改变。

    先上代码:

    import asyncio
    import aioredis
    
    # redis 池
    redis_pool = {
    
    }
    
    # redis 池读写锁
    _lock = asyncio.Lock()
    
    async def get_db(index:str="0"):
        global redis_pool
        await _lock.acquire() # 请求锁
        db = redis_pool.get(index)
        print(db)
        if db is None:
            specify_db = await aioredis.create_redis_pool("redis://localhost",db=int(index))
            redis_pool.update({index:specify_db})
            _lock.release() # 更新 redis 池完毕,释放锁
            return specify_db
        
        _lock.release() # 不需要生成 redis 连接,立即释放锁
        return db
    
    
    async def put_json(key:str,value:str,db_index:str="0",expire=-1):
        db = await get_db(db_index)
        await db.set(key,value,expire=expire)
        
    async def test():
        t1 = asyncio.create_task(get_db("0"))
        t2 = asyncio.create_task(get_db("0"))
        t3 = asyncio.create_task(get_db("0"))
        t4 = asyncio.create_task(get_db("0"))
        t5 = asyncio.create_task(get_db("0"))
        task_list = [t1, t2, t3, t4, t5]
        await asyncio.wait(task_list)
        
    
    if __name__ == '__main__':
        asyncio.get_event_loop().run_until_complete(test())
    
    

    这是我构建了一个简易的 aioredis 连接池,如果单纯执行这个 test 的时候 lock 可以正确使用,但一旦其他异步代码通过 from import 调用put_json()那么get_db()就会抛出... got Future <Future pending> attached to a different loop,说lock.acquire()和 其他代码不再同一个事务循环中

    而且查阅源码看到 asyncio.Lock()若不指定 loop 循环就会通过asyncio.get_event_loop()得到一个新循环,网上对于 asyncio.Lock 对象资料太少了。只有少量的 demo,demo 中是在 create_task 之前就创建好锁,然后参数传递给异步方法,这样做对 demo 有效,但对层次太多的代码很不友好

    我现在的问题是我不想从 create_task()前就创建一把锁然后一直用参数传递到 get_db()中,因为这把锁只针对 redis_pool 的读写操作,其他代码根本不需要这把锁,还有有什么其他的办法可以调度协程对某个对象的读写吗

    5 条回复    2020-07-07 15:03:37 +08:00
    yzk66880
        1
    yzk66880  
       2020-06-23 17:20:16 +08:00
    - 你觉得 create_task() 后切换时间循环了吗?
    - 为什么 aioredis 的读写需要锁?你认为他不是线程安全的吗?
    - aioredis 好像有实现连接池....
    CamD
        2
    CamD  
    OP
       2020-06-27 17:45:20 +08:00
    @yzk66880
    - 谢谢你的提醒,我认真看了文档,应该是 create_redis_pool()后用 select(db) 来切换 db 吧。
    - 我对 aioredis 并没有加锁,而是对 创建 aioredis 对象 这一步加了锁,因为如果有 200 个协程运行到 get_db 时,没有锁 200 个都读到的是 None,进而会创建 200 个 aioredis 对象,我对 aioredis 的 api 使用还不熟悉,还请多多指教
    yzk66880
        3
    yzk66880  
       2020-06-27 20:31:43 +08:00
    @CamD 把`await aioredis.create_redis_pool("redis://localhost",db=int(index))` 这个放在方面,设置一个全局的 redis 连接池就 ok,他在协成并发访问时是安全的不用加锁。
    dirtyDan
        4
    dirtyDan  
       2020-07-06 15:26:02 +08:00
    多个协程本来就是在同一个线程里,不存在变量共享的问题,所以搞个全局变量的锁,所有协程都能用。另,asyncio.lock 支持 with 语法,比手动获取,释放写起来舒服太多太多,安利一下。最后,aioredis 这个库小毛病挺多的,可以试试 aredis
    abersheeran
        5
    abersheeran  
       2020-07-07 15:03:37 +08:00
    https://github.com/abersheeran/a2wsgi/blob/master/a2wsgi/asgi.py#L97

    关于 Lock 的初始化,你可以看我的做法。用 `loop.call_soon_threadsafe` 给它丢到指定 loop 里初始化就完事了。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3657 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 29ms · UTC 10:32 · PVG 18:32 · LAX 02:32 · JFK 05:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.