Article updated on

Java NIO Server Example (Asynchronous Bidirectional)

I needed to create a server for up to 1000 concurrent users. Using a single-threaded-per-socket connection was taking too much CPU and memory. NIO is supposed to increase your latency but it wasn't too much in my case.

1 - Server NIO

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class Server {
    private static int PORT = 8000;
    private static final long PAUSE_BETWEEEN_MSGS = 10; // millisecs
    private static ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
    private static ConcurrentHashMap<Integer, SocketChannel> chm
                        = new ConcurrentHashMap<Integer, SocketChannel>();
    private static int msg = 0;
    public static void main(String args[]) throws Exception {
        // Create a new selector
        Selector selector = Selector.open();
        // Open a listener on each port, and register each one
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ServerSocket ss = ssc.socket();            
        InetSocketAddress address = new InetSocketAddress(PORT);
        ss.bind(address);
        //registers ACCEPT
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Going to listen on " + PORT);
        sendMsgsToRandomClients();        
        while (true) {
            selector.select();
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();            
            String msg = new String();            
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    // Accept the new connection
                    ServerSocketChannel sscNew = (ServerSocketChannel) key
                            .channel();
                    SocketChannel sc = sscNew.accept();
                    sc.configureBlocking(false);
                    // Add the new connection to the selector                    
                    sc.register(selector, SelectionKey.OP_READ);
                    // Add the socket channel to the list
                    chm.put(sc.hashCode(), sc);
                    it.remove();
                } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                    // Read the data
                    SocketChannel sc = (SocketChannel) key.channel();            
                    int code = 0;
                    while ((code = sc.read(echoBuffer)) > 0) {
                        byte b[] = new byte[echoBuffer.position()];
                        echoBuffer.flip();
                        echoBuffer.get(b);
                        msg+=new String(b, "UTF-8");
                    }
                    // removes the new line
                    if(msg.length()>1)
                           msg = msg.substring(0, msg.length()-2);
                    
                    if (code == -1 ||
                        msg.toUpperCase().indexOf("BYE")>-1){
                        chm.remove(sc.hashCode());
                        sc.close();
                    } else {
                        echoBuffer.clear();
                    }
                   System.out.println("msg: " + msg  + " from: " + sc + "code:  " + code );
                   it.remove();
                }
            }
        }        
    }        
    /**
     * This method sends messages to a random outPutStream
     */
    private static void sendMsgsToRandomClients() {
        new Thread("Send-to-Clients") {
            public void run() {
                try {
                    while (true) {
                        Random generator = new Random();
                        if(chm.keySet().size()>0){
                            Integer randomKey = new ArrayList<Integer>(
                                    chm.keySet()).get(generator.nextInt(chm.keySet().size()));
                            SocketChannel sc = chm.get(randomKey);
                            try {
                                msg++;                                
                                ByteBuffer buf = ByteBuffer.wrap(("From server to Client msg nÂș - "+ msg + "\n").getBytes());
                                sc.write(buf);
                            } catch (IOException e) {
                                e.printStackTrace();
                                chm.remove(randomKey);
                            }                            
                        }
                        Thread.sleep(PAUSE_BETWEEEN_MSGS);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}

 

1.1 Run the Server

Run and Execute the example above. ex. javac Server.java to compile and  Java Server to execute. If it's working properly you should see this.

1.2 Test the Server

Use telnet or similar to verify that the client is working. Telnet the Server ex. telnet localhost 8000

Type "bye" and press INTRO to close the session with the server. You can use as many telnet clients from as many different computers as you like. There more clients connected to the server the less messages each client should receive (amount of messages is shared among clients).

Notes

  • If you get an Too many Open files error in Linux use ulimit -n 4096.
  • Works OK with 1000 concurrent users. If you have any ideas or suggestions please let me know