CentOS release 5.10 (Final)
RabbitMQ 3.0.4
※RabbitMQインストール方法
Java 1.7.0_05
Maven 3.1.0
※MQTT用のJavaのクライアントライブラリを使って接続テストをするために必要です
RabbitMQ 3.0.4
※RabbitMQインストール方法
Java 1.7.0_05
Maven 3.1.0
※MQTT用のJavaのクライアントライブラリを使って接続テストをするために必要です
■MQTTの設定
rabbitmq-plugins enable rabbitmq_mqtt
rabbitmq-plugins list | grep rabbitmq_mqtt
/etc/init.d/rabbitmq-server start
Webの管理コンソールのOverviewを見てmqttプロトコルが有効になっていることを確認します
rabbitmq-plugins list | grep rabbitmq_mqtt
[E] rabbitmq_mqtt 3.0.4
/etc/init.d/rabbitmq-server start
Webの管理コンソールのOverviewを見てmqttプロトコルが有効になっていることを確認します
■MQTTを試してみる
Javaのクライアントライブラリがありましたのでそれを使ってみます
mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=com.sample.rabbitmq.mqtt -DartifactId=mqtt_test
emacs pom.xml
dependencies タグ内に以下を記載してください
project タグ内に以下を記載してください
emacs src/main/java/com/sample/rabbitmq/mqtt/App.java
mvn clean compile exec:java
mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=com.sample.rabbitmq.mqtt -DartifactId=mqtt_test
emacs pom.xml
dependencies タグ内に以下を記載してください
<dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.10</version> <scope>compile</scope> </dependency>
project タグ内に以下を記載してください
<build> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <configuration> <mainClass>com.sample.rabbitmq.mqtt.App</mainClass> </configuration> </plugin> </plugins> </build>
emacs src/main/java/com/sample/rabbitmq/mqtt/App.java
package com.sample.rabbitmq.mqtt; import java.net.URISyntaxException; import org.fusesource.mqtt.client.*; public class App { public static void main( String[] args ) throws URISyntaxException, Exception { String topic = "test/topic"; MQTT mqtt = new MQTT(); mqtt.setHost("localhost", 1883); BlockingConnection bc = mqtt.blockingConnection(); bc.connect(); // subscribe Topic[] topics = { new Topic(topic, QoS.AT_LEAST_ONCE) }; byte[] qoses = bc.subscribe(topics); for (byte qos: qoses) { System.out.println(qos); } // send message bc.publish(topic, "payloadA".getBytes(), QoS.AT_LEAST_ONCE, false); bc.publish(topic, "payloadB".getBytes(), QoS.AT_LEAST_ONCE, false); // receive message Message message = bc.receive(); System.out.println(message.getTopic()); byte[] payload = message.getPayload(); message.ack(); System.out.println(new String(payload, "UTF-8")); message = bc.receive(); System.out.println(message.getTopic()); payload = message.getPayload(); message.ack(); System.out.println(new String(payload, "UTF-8")); bc.disconnect(); } }
mvn clean compile exec:java
1 test/topic payloadA test/topic payloadB (・・・ここで停止する・・・)
■動作説明
接続前に subscribe することで MQTT で使用する Exchange とそれにバインドする Routing Key を決定します
本ライブラリを使った RabbitMQ での MQTT の場合は Exchange を経由してとメッセージのやりとりをするようです
subscribe 時に指定している Topic 名は Exchange にバインドするRouting Keyとして使用されます(何でもOKです)
ただ、publish するときや receive するときはこのバイティングされた Routing Key を指定する必要があります
publish するときの第一引数には subscribe 時に指定した Routing Key を指定します
receive 時には subscribe で指定した Routing Key を自動で判断してメッセージの受信を行います
MQTT でのメッセージのやり取りがあった場合には RabbitMQ 側に一時的なキューが作成されます
disconnect するとこのキューは削除されなくなります
(connect するたびにキューが作成されるようです)
サンプルでは2回 publish して3回 receive しています
receive 時にキューにメッセージがない場合は次のメッセージが到着するまで待ち続けます
MQTT 用に一時的に作られたキューに対して管理コンソールなどから publish message するとそのメッセージを受信して処理が終了します
サンプルでは同期処理でメッセージのやり取りをやっています
FutureConnection という機能がありこれを使うと非同期でメッセージのやり取りを行うことができます
本ライブラリを使った RabbitMQ での MQTT の場合は Exchange を経由してとメッセージのやりとりをするようです
subscribe 時に指定している Topic 名は Exchange にバインドするRouting Keyとして使用されます(何でもOKです)
ただ、publish するときや receive するときはこのバイティングされた Routing Key を指定する必要があります
publish するときの第一引数には subscribe 時に指定した Routing Key を指定します
receive 時には subscribe で指定した Routing Key を自動で判断してメッセージの受信を行います
MQTT でのメッセージのやり取りがあった場合には RabbitMQ 側に一時的なキューが作成されます
disconnect するとこのキューは削除されなくなります
(connect するたびにキューが作成されるようです)
サンプルでは2回 publish して3回 receive しています
receive 時にキューにメッセージがない場合は次のメッセージが到着するまで待ち続けます
MQTT 用に一時的に作られたキューに対して管理コンソールなどから publish message するとそのメッセージを受信して処理が終了します
サンプルでは同期処理でメッセージのやり取りをやっています
FutureConnection という機能がありこれを使うと非同期でメッセージのやり取りを行うことができます
以上です
とりあえず RabbitMQ を使って MQTT を動作させることができました
実際に HTTP や AMQP との比較は行っていないのでどれだけ軽量なのかわかっていませんが
せっかく動いたので次は比較でもできればなと思います
■参考サイト
0 件のコメント:
コメントを投稿