Article updated on

Java Sockets Performance Quick Test Example (Asynchronous Bidirectional)

I was asked to develop a socket service that could send and receive requests asynchronously. The service was meant to cope with up to 300 concurrent users and up to send/receive 50/100 messages per second.
As I wasn't sure if I could do this with Java regarding to CPU and memory use I wrote the following test that consists of 2 parts (Server and Client). I'm using Open JDK 1.6 in a Pentium 4 Dual Core at 2.8Ghz.

1 - Server

Features:

  • All clients connect to one server.
  • The server sends asynchronous messages to random clients connected to the server.
  • Each client connection is assigned to one exclusive thread.
  • A ConcurrentHashMap is used to store all client connection objects.
  • The server will never stop by itself. Once all the clients are disconnected, some statistics will be shown.
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
public class Server {
    private static final int PORT = 9001;
    private static final long PAUSE_BETWEEEN_MSGS = 10; // millisecs
    // a map to store all the ThreadedCliHandlers
    private static ConcurrentHashMap<String, ThreadCliHandler> chm
        = new ConcurrentHashMap<String, ThreadCliHandler>();
    // for statistics
    private static int msg = 0;
    private static Date date = new Date();
    private static int threads_created = 0;
    public static void main(String[] args) throws Exception {
        System.out.println("Server OK port on " + PORT);
        ServerSocket socketServer = new ServerSocket(PORT);
        sendMsgsToRandomClients();
        try {
            while (true) {
                threads_created++;
                new ThreadCliHandler(socketServer.accept()).start();
            }
        } finally {
            socketServer.close();
        }
    }
    /**
     * This method sends messages to a random outPutStream
     */
    private static void sendMsgsToRandomClients() {
        new Thread("Send-to-Clients") {            
            public void run() {
                try {
                    boolean showInfo = true;
                    while (true) {
                        Random generator = new Random();
                        for (; chm.size() > 0; msg++) {
                            //gets a random Key from the list
                            String randomKey = new ArrayList<String>(
                                    chm.keySet()).get(generator.nextInt(chm
                                    .keySet().size()));
                            ThreadCliHandler cc = chm.get(randomKey);
                            //sends the message
                            if (!cc.socket.isClosed()) {
                                cc.out.println("From server to client "
                                        + randomKey + " MSG sent: " + msg + "\r");
                                cc.out.flush();
                            } else {
                                chm.remove(randomKey);
                            }
                            Thread.sleep(PAUSE_BETWEEEN_MSGS);
                            showInfo = true;
                        }
                        Thread.sleep(PAUSE_BETWEEEN_MSGS);
                        if (showInfo) {
                            System.out.println(
                                    "Array size:"     + chm.keySet().size()
                                    + " msgs sent:" + msg
                                    + " threads-created:" + threads_created
                                    + " time server since:"+ date);
                            showInfo = false;
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
    /**
     * All the requests received by the server are handled by a ThreadCliHandler
     */
    private static class ThreadCliHandler extends Thread {
        private Socket socket;
        private BufferedReader in;
        private PrintWriter out;
        private String nameSocket = "";
        public ThreadCliHandler(Socket socket) {
            this.socket = socket;
        }
        public void run() {
            try {
                // Create streams for the socket.
                in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                out = new PrintWriter(socket.getOutputStream(), true);
                out.print("Insert Client Name:");
                out.flush();
                // The first input line is the client name in this example
                nameSocket = in.readLine();
                // stores the hander in the list
                chm.put(nameSocket, this);
                System.out.println("SRV-REC new client: " + nameSocket);
                while (true) {
                    String inLine = in.readLine();
                    // its only null if something went wrong
                    if (inLine != null && inLine.toLowerCase().indexOf("bye") == -1) {
                        System.out.println("SRV-REC " + nameSocket + ": "
                                + inLine);
                    } else {
                        System.out.println("SRV-REC " + nameSocket + ": "
                                + inLine);
                        break;
                    }
                }
            } catch (SocketException es) {
                // happens if the client disconnects unexpectedly
                System.out.println(es);
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                chm.remove(nameSocket);
                try {
                    socket.close();
                } catch (IOException e) {}
            }
        }
    }
}

 

1.1 Run the Server

Run and Execute the example above. ex. javac Server.java to compile and  Java Server to execute. If it's working properly you should see this.

img/0/61/1.png

1.2 Test the Server

Use telnet or similar to verify that the client is working. Telnet the Server ex. telnet localhost 9001

img/0/61/2.png

Then insert a name for that client. This may help you track down how many messages are being sent/received.

img/0/61/telnet-reciviendo.png

Type "bye" and press INTRO to close the session with the server. You can use as many telnet clients from as many different computers as you like. There more clients connected to the server the less messages each client should receive (amount of messages is shared among clients) .

2 - Client

Features:

  • Creates a number of connections to the server. Randomly chooses one of these connections and sends a message to the server.
  • Each Client can receive messages from the server.
  • Locks the main thread until the clients start sending messages using wait() and notifyAll() methods.
  • Uses join() to make the main thread wait till all the threads finish.
  • Test duration can be fixed.
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
class Client {    
    private static final String HOST = "localhost";
    private static final int PORT = 9001;
    private static final int NUM_CLIENTS = 100;
    private static final long PAUSE_BETWEEEN_MSGS = 10;
    private static final long TEST_DURATION = 1000*60; //(millis)    
    private static int msgsSent = 0;
    private static int msgsRec = 0;
    //Map to store all clients
    private static ConcurrentHashMap<String, ThreadCliHandler> chm
                = new ConcurrentHashMap<String, ThreadCliHandler>();
    public static void main(String args[]) {
        long l = System.currentTimeMillis();
        //creates the socket connections to the server
        for (int i = 0; i < NUM_CLIENTS; i++) {
            new ThreadCliHandler("Client " + i).start();
        }
        Lock.waitThread();        
        //this code sends msgs to the server through a randomly chosen client PrintWriter
        Random generator = new Random();
        while(System.currentTimeMillis()<(l+TEST_DURATION)){                        
            String randomKey = new ArrayList<String>(chm.keySet()).get(generator.nextInt(chm.keySet().size()));                    
            ThreadCliHandler cc = chm.get(randomKey);
            if(!cc.socket.isClosed()){
                cc.out.println("from client "+ randomKey + " to Server MSG "+ msgsSent++);
                cc.out.flush();
            }
            try {
                Thread.sleep(PAUSE_BETWEEEN_MSGS);
            } catch (InterruptedException e) {e.printStackTrace();}
        }    
        //tells the server that it's going to disconnect
        for (ConcurrentHashMap.Entry<String, ThreadCliHandler> entry : chm.entrySet()) {
            ThreadCliHandler cc = entry.getValue();
            if(!cc.socket.isClosed()){
                cc.out.println("BYE");
                cc.out.flush();
                try {
                    //waits for all ThreadCliHandlers to die
                    cc.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }                
            }
        }    
        System.out.println("Threads: "+ chm.keySet().size()
                + " msgs-sent:"+ msgsSent  
                + " msgs-received:"+ msgsRec
                + " average msgs-sent/sec:" + (msgsSent*1000/(System.currentTimeMillis() - l))
                + " average msgs-received/sec:" + (msgsRec*1000/(System.currentTimeMillis() - l))
                + " time: "+ (System.currentTimeMillis() - l));
    }
    //Handles every socket client incoming messages in one single thread
    private static class ThreadCliHandler extends Thread {
        private Socket socket;
        private BufferedReader in; // AutoFlush
        private PrintWriter out;
        private String clientName;
        public ThreadCliHandler(String clientName) {
            this.clientName = clientName;
        }
        public void run() {
            try {
                socket = new Socket(HOST, PORT);
                in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                out = new PrintWriter(new OutputStreamWriter(
                        socket.getOutputStream()));
                // sends the clientName to the server
                out.println(clientName);
                out.flush();
                chm.put(clientName, this);
                // to notify the main thread of changes in clientsMap
                Lock.unlock();
                String inLine = null;
                while (true) {
                    // its only null if something went wrong
                    inLine = in.readLine();
                    if (inLine == null || inLine.indexOf("BYE") > -1) {
                        break;
                    } else {
                        System.out.println("CLI-REC: " + inLine);
                        msgsRec++;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    chm.remove(Thread.currentThread().getName());
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(clientName + " ends..");
        }
    }
}
class Lock {
    private static boolean isRunning = false;    
    public static synchronized void unlock() {
        if(isRunning)
            return;    
        isRunning = true;
        Lock.class.notifyAll();
    }
    public static synchronized void waitThread(){
        while(!isRunning){
            try {
                Lock.class.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            isRunning = true;
        }
    }
}

 

2.1 Run the Client

Compile and Run the client.

img/0/61/client-reciviendo.png

Adjust the client settings accordingly. When the client finishes the last line should show you the test results.  The test is set by default at 100 clients an a pause of 10 milliseconds between messages.

img/0/61/resultado.png

Results

I have tested this example with 2 different machines and I successfully get an average of 95 msgs/sec sent and received with up to 500 concurrent users using a server.  The CPU doesn't increase dramatically at this rate. My limit is about 800 messages/second  with a 1000 concurrent users (removing the thread sleep in both client and server).

img/0/61/r1.png

img/0/61/r2.png

If you use an exclusive thread per connexion model you memory an CPU consumption increases. If you need a big amount of concurrent users try the java.nio package . It's supposed to increase the latency (not dramatically in my case) but it reduces considerably the amount of CPU and memory used. See an server example here .

Conclusions and Notes

Java is OK to develop a socket server service in terms of performance and resources consumption.

I know several things could be fixed in order to improve accuracy but the source code was getting too extensive. If you have any ideas or suggestions please let me know