ZeroMQのJava実装のJeroMQを試す
ZeroMQ
http://zguide.zeromq.org/page:all
JeroMQ
https://github.com/zeromq/jeromq
ZeroMQはシンプルで高速なプロセス間通信を実現する
C/C++のライブラリ。ライブラリのラッパーは対応言語が豊富。
キューは永続化はされない。メモリで処理されるので高速。
ライセンスはLGPL。
こちらのエントリが詳しい
ØMQ(zeromq)について調査する。
JeroMQはZeroMQがC/C++で書かれてるのに対してJavaで書かれている。
libzmq 3.2.2に基づいた0.3.0-SNAPSHOTを今回は使用する。
ZeroMQと違いJava実装なのでJVMとjarがあれば動いてしまう手軽さが良い。
またパフォーマンスも頑張っている。ライセンスはLGPL。
では上記のような
Client REQUEST <-> ROUTER Broker DEALER <-> REPLY Worker
という1対Nの構成をJeroMQで実装してみる。
通信はMessagePackで文字列をシリアライズして渡し、
受け取り側でデシリアライズして文字列に戻す。
pom.xml
<?xml version="1.0" encoding="UTF-16"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>jp.teche</groupId> <artifactId>jeromq.sample</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>JeroMQ-Sample</name> <description /> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <id>msgpack.org</id> <name>MessagePack Repository for Maven</name> <url>http://msgpack.org/maven2</url> </repository> <repository> <id>typesafe.com</id> <name>typesafe.com</name> <url>http://repo.typesafe.com/typesafe/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.7</version> </dependency> <dependency> <groupId>org.jeromq</groupId> <artifactId>jeromq</artifactId> <version>0.3.0-SNAPSHOT</version> </dependency> </dependencies> <plugins> <plugin> <inherited>true</inherited> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> <optimize>true</optimize> <debug>true</debug> </configuration> </plugin> </plugins> </project>
package jp.techie.jeromq.sample.one2many; import java.io.IOException; import org.msgpack.MessagePack; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; /** * JeroMQ Sample Client * @author bose999 * */ public class ClientMaker { /** * Clientの生成と100回キューのやり取りをする * @param args none */ public static void main(String[] args) { Context context = ZMQ.context(1); Socket request= context.socket(ZMQ.REQ); String requestAddress = "tcp://127.0.0.1:9001"; request.connect(requestAddress); System.out.println("client - requestAddress:" + requestAddress); for (int i = 0; i < 100; i++) { long startTime = System.nanoTime(); MessagePack msgpack = new MessagePack(); String sendString = "Hello ( requestAddress: " + requestAddress + ")"; byte[] sendStringBytes; try { // MessagePackでシリアライズして送る sendStringBytes = msgpack.write(sendString); request.send(sendStringBytes, 0); long endTime = System.nanoTime(); // MessagePackで返答を文字列にデシリアライズ byte[] replyBytes = request.recv(0); String reply = msgpack.read(replyBytes, String.class); long executeTime = endTime - startTime; System.out.println( "Received reply ( requestAddress:" + requestAddress + ") forCount:" + i + " [" + reply + "] " + executeTime + "ns" ); } catch (IOException e) { e.printStackTrace(); } } request.close(); context.term(); } }
package jp.techie.jeromq.sample.one2many; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Poller; import org.zeromq.ZMQ.Socket; /** * JeroMQ Sample Brocker * @author bose999 * */ public class BrockerMaker{ /** * Brokerの生成とポーリング実行 * @param args none */ public static void main (String[] args) { Context context = ZMQ.context(1); Socket router = context.socket(ZMQ.ROUTER); String routerAddress = "tcp://127.0.0.1:9001"; System.out.println("broker - routerAddress:" + routerAddress); router.bind(routerAddress); Socket dealer = context.socket(ZMQ.DEALER); String dealerAddressHead = "tcp://127.0.0.1:"; int dealerPort = 10001; for (int i = 0; i < 30; i++){ String backEndAddress = dealerAddressHead + dealerPort; dealer.bind(backEndAddress); System.out.println("broker - backEndAddress:" + backEndAddress); dealerPort++; } Poller items = new Poller (2); items.register(router, Poller.POLLIN); items.register(dealer, Poller.POLLIN); boolean more = false; byte[] message; while (!Thread.currentThread().isInterrupted()) { items.poll(); if (items.pollin(0)) { while (true) { message = router.recv(0); more = router.hasReceiveMore(); dealer.send(message, more ? ZMQ.SNDMORE : 0); if(!more){ break; } } } if (items.pollin(1)) { while (true) { message = dealer.recv(0); more = dealer.hasReceiveMore(); router.send(message, more ? ZMQ.SNDMORE : 0); if(!more){ break; } } } } router.close(); System.out.println("broker - close router."); dealer.close(); System.out.println("broker - close dealer."); } }
package jp.techie.jeromq.sample.one2many; import java.io.IOException; import org.msgpack.MessagePack; import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; /** * JeroMQ Sample Worker * @author bose999 * */ public class WorkerMaker { /** * Brokerの生成とポーリング実行 * @param arg */ public static void main (String[] args) { String replyAddressHead = "tcp://127.0.0.1:"; int replyPort = 10001; for (int i = 0; i < 30; i++){ // 30スレッド作ってポートを一意にしてWorkerとして処理を行わせる WorkerExecuter workerExecuter = new WorkerExecuter(); String replyAddress = replyAddressHead + replyPort; workerExecuter.setReplyAddress(replyAddress); new Thread(workerExecuter).start(); replyPort++; } } /** * Worker複数スレッド実行 * @author bose999 * */ private static class WorkerExecuter implements Runnable{ /** * Worker TCP Address */ private String replyAddress = null; /** * setter * @param replyAddress */ public void setReplyAddress(String replyAddress){ this.replyAddress = replyAddress; } /* (non-Javadoc) * @see java.lang.Runnable#run() */ public void run() { System.out.println("worker - replyAddress:" + this.replyAddress); Context context = ZMQ.context (1); Socket responder = context.socket (ZMQ.REP); responder.connect (this.replyAddress); System.out.println("worker - replyAddress:" + this.replyAddress); while (!Thread.currentThread ().isInterrupted ()) { // clientから受信したMessagePackシリアライズ byte[] recvBytes = responder.recv(0); MessagePack msgpack = new MessagePack(); String recvString = ""; try { // clientからbyte配列を受信してMessagePackでデシリアライズ recvString = msgpack.read(recvBytes, String.class); System.out.println( "worker - Received request ( replyAddress: " + this.replyAddress + "): " + "[" + recvString + "]" ); // clientに文字列をMessagePackでシリアライズして送信 String sendString = recvString + " World ( replyAddress: " + this.replyAddress + ")"; byte[] sendStringBytes = msgpack.write(sendString); responder.send(sendStringBytes,0); } catch (IOException e) { e.printStackTrace(); } } responder.close(); context.term(); System.out.println("worker - socket close ( replyAddress: " + this.replyAddress + ")"); } } }