V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
Aruforce
V2EX  ›  程序员

NIO 如下代码怎么绕过死锁?还是说我写的不对?如下代码

  •  1
     
  •   Aruforce · 2020-06-04 18:19:41 +08:00 · 2954 次点击
    这是一个创建于 1633 天前的主题,其中的信息可能已经有所发展或是发生改变。
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    
    public class NIO2 {
        private static volatile  boolean keepRunning = true;
        public static void main(String[] args) throws Exception {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(8090), 128);
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            Selector socketChannelSelector = Selector.open();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (keepRunning) {
                            int select = selector.select();
                            if (select > 0) {
                                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                                while (iterator.hasNext()) {
                                    SelectionKey next = iterator.next();
                                    if (next.isValid()){
                                        if(next.isAcceptable()) {
                                            SocketChannel accept = ((ServerSocketChannel) next.channel()).accept();
                                            accept.configureBlocking(false);
                                            // thread block 怎么绕过 还是说我用的不对
                                            accept.register(socketChannelSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                        }
                                    }else {
                                        socketChannelSelector.keys().remove(next);
                                    }
                                    iterator.remove();
                                }
                            }
                        }
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                }
            }).start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (keepRunning) {
                            int select = socketChannelSelector.select();
                            if (select > 0) {
                                Set<SelectionKey> selectedKeys = socketChannelSelector.selectedKeys();
                                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                                while (iterator.hasNext()) {
                                    SelectionKey next = iterator.next();
                                    if (next.isReadable()) {
    
                                    } else if (next.isWritable()) {
                                        ByteBuffer byteBuffe = ByteBuffer.allocate(4);
                                        int l = (int)(System.currentTimeMillis() / 1000L + 2208988800L);
                                        byteBuffe.put(fromInt(l));
                                        byteBuffe.flip();
                                        ((SocketChannel) next.channel()).write(byteBuffe);
                                        ((SocketChannel) next.channel()).close();
                                    }
                                    iterator.remove();
                                }
                            }
                        }
                    }catch (IOException e){
                        e.printStackTrace();
                    }
                }
            }).start();
    
        }
    
        public static byte[] fromInt(int i) {
            byte[] result = new byte[4];
            result[3] = (byte) (i);
            result[2] = (byte) (i >> 8);
            result[1] = (byte) (i >> 16);
            result[0] = (byte) (i >> 24);
            return result;
        }
    
        public static int fromByteArray(byte[] bytes) {
            if (bytes.length != 4) {
                throw new IllegalArgumentException("bytes array length = " + bytes.length);
            }
            //
            int a = bytes[3] & 0x000000ff;
            a |= (bytes[2] << 8) & 0x0000ffff;
            a |= (bytes[1] << 16) & 0x00ffffff;
            a |= (bytes[0] << 24) & 0xFFffffff;
            return a;
        }
    }
    
    11 条回复    2020-06-08 11:38:00 +08:00
    Aruforce
        1
    Aruforce  
    OP
       2020-06-04 18:50:20 +08:00
    select 和 registe 都要获取 Selector publicKeys 的锁....
    是不是不该这么用啊...
    badteeth
        2
    badteeth  
       2020-06-04 19:53:09 +08:00
    在 selector 迭代时是不能注册新的 channel 的;你可以把 accept 的所有注册操作合并到一次 selector 的迭代循环之后;
    Aruforce
        3
    Aruforce  
    OP
       2020-06-04 21:17:48 +08:00 via Android
    @badteeth ……我是想一个 selector 负责 serversocketchannel 建连接…一个 selector 负责 socketchannel 读写状态…但是同一个 selector select 和 registe 是不能并行的 都要滚去 selector publishkey 的锁……会死锁……我绕不过去了……
    Aruforce
        4
    Aruforce  
    OP
       2020-06-04 21:22:11 +08:00 via Android
    @badteeth 正常情况应该是一个端口一个 selector…单线程负责当前端口的全部的建连接及 socketchannel 的 registe……还有个线程池负责真正的读写数据和处理业务逻辑… 我想问的是怎么绕过去…selector.wakeup 不好使…
    arloor
        5
    arloor  
       2020-06-04 21:56:56 +08:00
    问题确实是 select 和 register 都要获取同样的锁。

    你现在是启动两个线程,一个 select-accept-register,另一个 select-read/write 。除了锁的问题哈,还有你会丢失 accept 和读写事件,线程 1 会忽略读写,线程 2 会忽略 accept 。

    楼主知道 reactor 模式吗?一个线程负责接收事件( accept 、read 、write ),后面一个线程池负责处理这些事件。

    应该长这样:一个线程 select-accept/read/write->由线程池来处理不同事件( accept 可以直接在原线程进行 register )

    PS:实践到这里差不多了,可以看 netty 了。先用用 netty,然后看下 netty 源码,相信就能清楚。
    MoHen9
        6
    MoHen9  
       2020-06-04 22:48:02 +08:00
    Selector 用两个,分为主从,主 Selector 专门负责 ServerSocketChannel 的 accept 事件,因为建立连接只有一次,从 Selector 负责 SocketChannel 的读写事件,主 ServerSocketChannel 是没有读写事件的; Netty 里面是每个 SocketChannel 线程都有一个 Selector,ServerSocketChannel 的 Selector 只用了一个。
    Aruforce
        7
    Aruforce  
    OP
       2020-06-05 13:07:35 +08:00
    @arloor socketchannelSelector 不需要处理 accept 啊... serverSocketChannel 不需要处理 read write 。。。

    你说的 reator 模式 ( 1 个线程循环处理 selector.selectedKeys 处理好 accept 及 registe 并且将 read/write dispatch 给线程池来处理)

    我是想把 所有的 socketchannel 的 read/write 交给另一个 selector... 但是碰到了死锁。。。想写个 Wrapper 但是没写出来
    Aruforce
        8
    Aruforce  
    OP
       2020-06-05 13:13:40 +08:00
    @MoHen9 主 ServerSocketChannel accept 的 socketChannel 向 从 selector registe 的时候会碰到 锁竞争。。。 我不知道怎么绕过去 从 selector.wakeup() 并不好用 能注册上是运气好 CPU 先执行了主 Selector 的线程...还有像一部份没办法注册上的。。。
    从 selector.select(1) 这样 其实也不对...如果主 selector accept 的 keyset 很大的话 就走不下去了
    arloor
        9
    arloor  
       2020-06-05 14:39:44 +08:00
    @Aruforce 看错了,没看到你用了两个 selector 。

    你现在问题是线程 1 register 、线程 2 select,死锁了。解决很简单,用个生产者消费者模型,把要 register 的 channel 放到队列中,线程二在每次 select 前先 register 队列中的 channel 。这时候你可能又要问了,万一线程 2 一直阻塞在 select 时,怎么办? 答案:用 selectNow()或 select ( timeout )。


    刚刚看了 netty 源码中怎么进行 register 的,他也是生产者消费者模型,由进行 select 的线程来 register 。代码如下:

    AbstractChannel.java (Netty 4.1):

    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
    promise.setFailure(new IllegalStateException("registered to an event loop already"));
    return;
    }
    if (!isCompatible(eventLoop)) {
    promise.setFailure(
    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
    register0(promise); // 这里不会走到
    } else {
    try {
    // eventLoop 就是 netty 中线程,提交一个 register 任务,后面会被执行
    eventLoop.execute(new Runnable() {
    @Override
    public void run() {
    register0(promise);
    }
    });
    } catch (Throwable t) {
    logger.warn(
    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    AbstractChannel.this, t);
    closeForcibly();
    closeFuture.setClosed();
    safeSetFailure(promise, t);
    }
    }
    }
    Aruforce
        10
    Aruforce  
    OP
       2020-06-08 09:43:35 +08:00
    @arloor
    0. Wrapper 这条路 实在没走通;
    1. 遵从你的建议...serverSocketChannelSelector 不要直接给了放缓存对队列...socketChannelSelector 自己来拿吧...
    3. 但是 socketChannelSelector 有空转的问题而且如果 blockingDeque 缓存的比较多的话...感觉有问题

    ```
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.LinkedBlockingDeque;

    public class NIO2 {
    private static volatile boolean keepRunning = true;
    private static LinkedBlockingDeque<SocketChannel> blockingDeque = new LinkedBlockingDeque<SocketChannel>();
    public static void main(String[] args) throws Exception {
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(8090), 128);
    Selector selector = Selector.open();
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    Selector socketChannelSelector = Selector.open();
    new Thread(new Runnable() {
    @Override
    public void run() {
    try {
    while (keepRunning) {
    int select = selector.select();
    if (select > 0) {
    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = selectedKeys.iterator();
    while (iterator.hasNext()) {
    SelectionKey next = iterator.next();
    if (next.isValid()) {
    if (next.isAcceptable()) {
    SocketChannel accept = ((ServerSocketChannel) next.channel()).accept();
    accept.configureBlocking(false);
    blockingDeque.add(accept);
    }
    } else {
    socketChannelSelector.keys().remove(next);
    }
    iterator.remove();
    }
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }).start();
    new Thread(new Runnable() {
    @Override
    public void run() {
    try {
    while (keepRunning) {
    if (!blockingDeque.isEmpty()) {
    blockingDeque.removeIf(value->{
    try {
    value.register(socketChannelSelector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
    } catch (ClosedChannelException e) {
    e.printStackTrace();
    return false;
    }
    return true;
    });
    }
    // 又是在空转。。。
    int select = socketChannelSelector.select(1);
    if (select > 0) {
    Set<SelectionKey> selectedKeys = socketChannelSelector.selectedKeys();
    Iterator<SelectionKey> iterator = selectedKeys.iterator();
    while (iterator.hasNext()) {
    SelectionKey next = iterator.next();
    if (next.isReadable()) {

    } else if (next.isWritable()) {
    ByteBuffer byteBuffe = ByteBuffer.allocate(4);
    int l = (int) (System.currentTimeMillis() / 1000L + 2208988800L);
    byteBuffe.put(fromInt(l));
    byteBuffe.flip();
    ((SocketChannel) next.channel()).write(byteBuffe);
    ((SocketChannel) next.channel()).close();
    }
    iterator.remove();
    }
    }
    }
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }).start();

    }

    public static byte[] fromInt(int i) {
    byte[] result = new byte[4];
    result[3] = (byte) (i);
    result[2] = (byte) (i >> 8);
    result[1] = (byte) (i >> 16);
    result[0] = (byte) (i >> 24);
    return result;
    }

    public static int fromByteArray(byte[] bytes) {
    if (bytes.length != 4) {
    throw new IllegalArgumentException("bytes array length = " + bytes.length);
    }
    //
    int a = bytes[3] & 0x000000ff;
    a |= (bytes[2] << 8) & 0x0000ffff;
    a |= (bytes[1] << 16) & 0x00ffffff;
    a |= (bytes[0] << 24) & 0xFFffffff;
    return a;
    }
    }
    ```
    arloor
        11
    arloor  
       2020-06-08 11:38:00 +08:00
    @Aruforce 这代码我运行了下,用 nc localhost 8090 测试了下,没有问题啊

    你说的空转是什么意思?

    不要总是感觉有问题,永远没有最优解,都是权衡和妥协。最怕你觉得这样有问题,然后就什么都不做。

    优化是后面的事。

    另外,说真的,就是这样了,不要觉得有啥问题。真可以增加的就是线程池了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2814 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 07:55 · PVG 15:55 · LAX 23:55 · JFK 02:55
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.