V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
fghjghf
V2EX  ›  Python

急急急, Python 多进程,如何传递 epoll?

  •  
  •   fghjghf · 2019-06-06 17:14:29 +08:00 · 3005 次点击
    这是一个创建于 2022 天前的主题,其中的信息可能已经有所发展或是发生改变。

    前提:半同步半异步模式,主进程创建的 epollfd,要传递的其他进程。

    无论用消息队列 q = Manager().Queue(),还是 m=Manager()。都不行

    error:TypeError: can't pickle select.epoll objects

    各位 dalao 有没有遇到过?怎么解决的?

    11 条回复    2019-06-17 12:13:38 +08:00
    di94sh
        1
    di94sh  
       2019-06-06 21:39:07 +08:00 via Android
    把 socket 传递给子进程,epoll 是用来判断是什么事件的,判断完事件后把对应的处理过程和 socket 扔给子进程处理
    fghjghf
        2
    fghjghf  
    OP
       2019-06-07 15:21:40 +08:00
    @di94sh 因为要 modify,所以 fd 和 epoll 也是需要的
    NoAnyLove
        3
    NoAnyLove  
       2019-06-09 13:14:01 +08:00
    利用继承,在创建其他进程前先创建 epoll,创建其他进程 fork 的时候就自动传递到其他进程中了。
    fghjghf
        4
    fghjghf  
    OP
       2019-06-10 23:46:52 +08:00
    @NoAnyLove 我是用 apply_async 进程池的。apply_async(work,(pre1,per2...)) ,work 的父类有 epoll_fd。可是新进程 work 就没 epoll_fd 了,ipc 也传不了,通过函数参数也不行,继承也不管用。哭,还有别的办法吗
    1800x
        5
    1800x  
       2019-06-13 07:11:04 +08:00 via Android
    还真没遇到类似的需求
    文件描述符属于内核对象,用户空间持有的只是一个引用
    python 创建子进程时,子进程会自动继承引用
    或者创建进程时,可以通过向进程参数的文件描述符列表添加要指定继承的文件描述对象
    python 具体是怎么处理的,没去仔细研究
    NoAnyLove
        6
    NoAnyLove  
       2019-06-13 12:56:24 +08:00
    IPC 传递和向 worker 传递参数都需要将数据 pickle 成二进制之后再发送。对于 epoll 对象需要自己写函数来支持,还涉及到复制 fd,非常麻烦,又不是 Windows 下传递句柄,没必要这样折腾。

    继承怎么不行了?

    ```
    >>> import os
    >>> import select
    >>> import multiprocessing.pool
    >>> ep = select.epoll()
    >>> def init(epoll):
    ... print(f'PID: {os.getpid()}, epoll: {epoll}, fd: {epoll.fileno()}')
    ...
    >>> pool=multiprocessing.pool.Pool(5, init, [ep])
    PID: 28331, epoll: <select.epoll object at 0x7fc305f41588>, fd: 3
    PID: 28329, epoll: <select.epoll object at 0x7fc305f41588>, fd: 3
    PID: 28333, epoll: <select.epoll object at 0x7fc305f41588>, fd: 3
    PID: 28330, epoll: <select.epoll object at 0x7fc305f41588>, fd: 3
    PID: 28332, epoll: <select.epoll object at 0x7fc305f41588>, fd: 3
    ```
    fghjghf
        7
    fghjghf  
    OP
       2019-06-13 16:34:44 +08:00
    @NoAnyLove 这个我试过,是同一个内存地址。然后你再试一试,用你传过去的 epoll. modify.发现并不管用。
    其实我只是想做个:半同步半异步服务器。gil 锁所以用进程。主进程负责循环监听 socket,遇到可读可写,就放入进程池执行读写逻辑,非堵塞执行,遇到 eagain,需要 epoll.modify(out)然后结束。那么主进程也能检测到 fd 状态。如此循环...请问这个逻辑到底那里错了,或者需要怎么做,能大致点一下嘛
    fghjghf
        8
    fghjghf  
    OP
       2019-06-13 16:51:28 +08:00
    @NoAnyLove 直接 callback 这个方法可行嘛,读写进程处理完就回调,让同一个进程去对 epoll 进程操作
    NoAnyLove
        9
    NoAnyLove  
       2019-06-15 12:23:53 +08:00
    因为是 fork 啊,当然是同一个地址。你是怎么传递 socket 的? socket 是在创建 Pool 之后才创建的? socket 直接当做参数传递了?

    fork 之后创建的 socket 需要跨进程复制句柄的。

    话说,你这种“然后你再试一试,用你传过去的 epoll. modify.发现并不管用。”这种交流方式让人很难继续交流下去啊,没有示例代码和具体错误你想让我试啥?麻烦尽量简单准确的描述问题,用简短可以执行的代码和具体错误信息来描述问题。

    github.com/ryanhanwu/How-To-Ask-Questions-The-Smart-Way/blob/master/README-zh_CN.md
    fghjghf
        10
    fghjghf  
    OP
       2019-06-16 15:39:41 +08:00
    @NoAnyLove 好的好的,我查了下,说 IPC 或向 worker 参数传递的东西必须要能 pickle,不然就报错,那就是 select.epoll 是不支持的。error:TypeError: can't pickle select.epoll objects。
    代码大致如下:
    def listenProcess(serverSocket):
    process_num = 2*cpu_count()

    # 设置进程池、消息队列
    po = Pool(process_num)
    q = Manager().Queue() #ipc 方法 1
    m=Manager() # ipc 方法 2

    epoll_fd = select.epoll()
    epoll_fd.register(serverSocket.fileno(), select.EPOLLIN)


    # 保存中间数据
    connections = {}
    addresses = {}

    while True:
    epoll_list = epoll_fd.poll()
    for fd, events in epoll_list:
    print("fd is:%s events is%s"%(fd,events))
    if fd == serverSocket.fileno():

    conn, addr = serverSocket.accept()
    conn.setblocking(False)
    epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET )

    connections[conn.fileno()] = conn
    addresses[conn.fileno()] = addr

    elif events & select.EPOLLIN:
    # q.put(connections[fd])
    po.apply_async(workProcess4In,args=(connections[fd],epoll_fd,))

    elif events & select.EPOLLOUT:
    # q.put(fd)
    # d=m.dict({"epollfd":epoll_fd,"fd":fd})
    po.apply_async(workProcess4Out,(connections[fd,epoll_fd],))

    po.close()
    po.join()

    def workProcess4In(conn,epollfd):
    #print("epoll is:%s"%epollfd)
    #执行读操作,遇到 eagain 就 modify 状态 out
    epollfd.modify(conn.fileno(), select.EPOLLET | select.EPOLLOUT)

    执行结果:不执行 workProcess4In,如果把 epoll_fd 换成其他,则没问题

    请教 dalao,主进程需要如何才能把 epoll_fd 传过去。
    NoAnyLove
        11
    NoAnyLove  
       2019-06-17 12:13:38 +08:00   ❤️ 1
    通常 pickle 只是将对象转成二进制数据。`multiprocessing.reduction`中提供了对 socket 对象的 pickle 支持,实际上在二进制数据传输的背后还设计到了跨进程传递 fd,但是并未提供对 epoll 的支持。

    要实现传递 epoll,一种方法就是按照我前面的示例代码,在创建 Pool 前先创建 epoll,可以使用全局变量保存 epoll,这样 worker 进程中可以直接使用;或者使用局部变量,然后通过 Pool 函数的 initializer 和 initargs 传递给 worker 保存起来之后用。直接通过 apply 或者 apply_sync 之类的函数传递会要求实现 pickle。

    另外一种方法,通过`multiprocessing.reduction.send_handle`来发送句柄,参见 gist.github.com/bdarnell/1073945,一样的做法,只不过是把传递的内容换成 epoll.fileno,接受的时候使用 epoll.fromfd。

    此外,也可以参照`multiprocessing.reduction`中 reduce_socket/rebuild_socket 的做法实现 epoll 的支持。

    需要注意的是,跨进传递句柄会使得 fd 改变。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1005 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 22:18 · PVG 06:18 · LAX 14:18 · JFK 17:18
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.