ZeroMQのJava実装のJeroMQを試す

ZeroMQ
http://zguide.zeromq.org/page:all


JeroMQ
https://github.com/zeromq/jeromq


ZeroMQはシンプルで高速なプロセス間通信を実現する
C/C++のライブラリ。ライブラリのラッパーは対応言語が豊富。
キューは永続化はされない。メモリで処理されるので高速。
ライセンスはLGPL


こちらのエントリが詳しい
ØMQ(zeromq)について調査する。


JeroMQはZeroMQがC/C++で書かれてるのに対してJavaで書かれている。
libzmq 3.2.2に基づいた0.3.0-SNAPSHOTを今回は使用する。
ZeroMQと違いJava実装なのでJVMとjarがあれば動いてしまう手軽さが良い。
またパフォーマンスも頑張っている。ライセンスはLGPL



では上記のような
Client REQUEST <-> ROUTER Broker DEALER <-> REPLY Worker
という1対Nの構成をJeroMQで実装してみる。


通信はMessagePackで文字列をシリアライズして渡し、
受け取り側でデシリアライズして文字列に戻す。


pom.xml

<?xml version="1.0" encoding="UTF-16"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>jp.teche</groupId>
	<artifactId>jeromq.sample</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>
	<name>JeroMQ-Sample</name>
	<description />

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>

	<repositories>
		<repository>
			<id>msgpack.org</id>
			<name>MessagePack Repository for Maven</name>
			<url>http://msgpack.org/maven2</url>
		</repository>
		<repository>
			<id>typesafe.com</id>
			<name>typesafe.com</name>
			<url>http://repo.typesafe.com/typesafe/repo</url>
		</repository>
	</repositories>

	<dependencies>
		<dependency>
			<groupId>org.msgpack</groupId>
			<artifactId>msgpack</artifactId>
			<version>0.6.7</version>
		</dependency>
		<dependency>
			<groupId>org.jeromq</groupId>
			<artifactId>jeromq</artifactId>
			<version>0.3.0-SNAPSHOT</version>
		</dependency>
	</dependencies>
	<plugins>
		<plugin>
			<inherited>true</inherited>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<configuration>
				<source>1.7</source>
				<target>1.7</target>
				<optimize>true</optimize>
				<debug>true</debug>
			</configuration>
		</plugin>
	</plugins>
</project>
package jp.techie.jeromq.sample.one2many;

import java.io.IOException;

import org.msgpack.MessagePack;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * JeroMQ Sample Client
 * @author bose999
 * 
 */
public class ClientMaker {

	 /**
     * Clientの生成と100回キューのやり取りをする
     * @param args none
     */
	public static void main(String[] args) {
		Context context = ZMQ.context(1);
		Socket request= context.socket(ZMQ.REQ);
		String requestAddress = "tcp://127.0.0.1:9001";
		request.connect(requestAddress);
		System.out.println("client - requestAddress:" + requestAddress);
		for (int i = 0; i < 100; i++) {
			long startTime = System.nanoTime();
			MessagePack msgpack = new MessagePack();
			String sendString = "Hello ( requestAddress: " + requestAddress + ")";
			byte[] sendStringBytes;
			try {
				// MessagePackでシリアライズして送る
				sendStringBytes = msgpack.write(sendString);
				request.send(sendStringBytes, 0);
				long endTime = System.nanoTime();
				
				// MessagePackで返答を文字列にデシリアライズ
				byte[] replyBytes = request.recv(0);
				String reply = msgpack.read(replyBytes, String.class);
				long executeTime = endTime - startTime;
				System.out.println(
						"Received reply ( requestAddress:" + requestAddress + ") forCount:" + i
						+ " [" + reply + "] " + executeTime + "ns"
				        );
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		request.close();
		context.term();
	}
}
package jp.techie.jeromq.sample.one2many;

import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;

/**
 * JeroMQ Sample Brocker
 * @author bose999
 *
 */
public class BrockerMaker{

    /**
     * Brokerの生成とポーリング実行
     * @param args none
     */
    public static void main (String[] args) {
    	Context context = ZMQ.context(1);
        Socket router = context.socket(ZMQ.ROUTER);
		String routerAddress = "tcp://127.0.0.1:9001";
		System.out.println("broker - routerAddress:" + routerAddress);
		router.bind(routerAddress);
		
		Socket dealer = context.socket(ZMQ.DEALER);
		String dealerAddressHead = "tcp://127.0.0.1:";
        int dealerPort = 10001;
        
		for (int i = 0; i < 30; i++){
	        	String backEndAddress = dealerAddressHead + dealerPort;
	        	dealer.bind(backEndAddress);
	        	System.out.println("broker - backEndAddress:" + backEndAddress);
	        	dealerPort++;
	    }
		
        Poller items = new Poller (2);
        items.register(router, Poller.POLLIN);
        items.register(dealer, Poller.POLLIN);

        boolean more = false;
        byte[] message;

        while (!Thread.currentThread().isInterrupted()) {            
            items.poll();
            if (items.pollin(0)) {
                while (true) {
                    message = router.recv(0);
                    more = router.hasReceiveMore();
                    dealer.send(message, more ? ZMQ.SNDMORE : 0);
                    if(!more){
                        break;
                    }
                }
            }
            if (items.pollin(1)) {
                while (true) {
                    message = dealer.recv(0);
                    more = dealer.hasReceiveMore();
                    router.send(message,  more ? ZMQ.SNDMORE : 0);
                    if(!more){
                        break;
                    }
                }
            }
        }
        router.close();
        System.out.println("broker - close router.");	
        dealer.close();
        System.out.println("broker - close dealer.");	
    }
}
package jp.techie.jeromq.sample.one2many;

import java.io.IOException;

import org.msgpack.MessagePack;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Context;
import org.zeromq.ZMQ.Socket;

/**
 * JeroMQ Sample Worker
 * @author bose999
 *
 */
public class WorkerMaker {
	
	/**
     * Brokerの生成とポーリング実行
     * @param arg
     */
	public static void main (String[] args) {
        String replyAddressHead = "tcp://127.0.0.1:";
        int replyPort = 10001;        
        for (int i = 0; i < 30; i++){
        	// 30スレッド作ってポートを一意にしてWorkerとして処理を行わせる
        	WorkerExecuter workerExecuter = new WorkerExecuter();
        	String replyAddress = replyAddressHead + replyPort;
        	workerExecuter.setReplyAddress(replyAddress);
        	new Thread(workerExecuter).start();
        	replyPort++;
        }
    }
	
    /**
     * Worker複数スレッド実行
     * @author bose999
     *
     */
    private static class WorkerExecuter implements Runnable{
    	
    	/**
    	 * Worker TCP Address
    	 */
    	private String replyAddress = null;
    	    	
    	/**
    	 * setter
    	 * @param replyAddress
    	 */
    	public void setReplyAddress(String replyAddress){
    		this.replyAddress = replyAddress;
    	}
    	
		/* (non-Javadoc)
		 * @see java.lang.Runnable#run()
		 */
		public void run() {
			System.out.println("worker - replyAddress:" + this.replyAddress);
			Context context = ZMQ.context (1);
	        Socket responder = context.socket (ZMQ.REP);
	        responder.connect (this.replyAddress);
	        System.out.println("worker - replyAddress:" + this.replyAddress);
	        
	        while (!Thread.currentThread ().isInterrupted ()) {
	        	// clientから受信したMessagePackシリアライズ
	        	byte[] recvBytes = responder.recv(0);
	        	MessagePack msgpack = new MessagePack();
	            String recvString = "";
				try {
					// clientからbyte配列を受信してMessagePackでデシリアライズ
					recvString = msgpack.read(recvBytes, String.class);
					System.out.println(
							"worker - Received request ( replyAddress: " + this.replyAddress + "): " +
					         "[" + recvString + "]"
					         );
		            // clientに文字列をMessagePackでシリアライズして送信
					String sendString = recvString + " World ( replyAddress: " + this.replyAddress + ")";
					byte[] sendStringBytes = msgpack.write(sendString);
		            responder.send(sendStringBytes,0);
				} catch (IOException e) {
					e.printStackTrace();
				}
	        }
	        responder.close();
	        context.term();
	        System.out.println("worker - socket close ( replyAddress: " + this.replyAddress + ")");
	    }
    }
}