2014年6月28日土曜日

rabbiqmqでMQTTプロトコルを試してみた

■環境
CentOS release 5.10 (Final)
RabbitMQ 3.0.4
※RabbitMQインストール方法
Java 1.7.0_05
Maven 3.1.0
※MQTT用のJavaのクライアントライブラリを使って接続テストをするために必要です

■MQTTの設定
rabbitmq-plugins enable rabbitmq_mqtt
rabbitmq-plugins list | grep rabbitmq_mqtt
[E] rabbitmq_mqtt                     3.0.4

/etc/init.d/rabbitmq-server start

Webの管理コンソールのOverviewを見てmqttプロトコルが有効になっていることを確認します

■MQTTを試してみる
Javaのクライアントライブラリがありましたのでそれを使ってみます

mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=com.sample.rabbitmq.mqtt -DartifactId=mqtt_test

emacs pom.xml

dependencies タグ内に以下を記載してください
<dependency>
  <groupId>org.fusesource.mqtt-client</groupId>
  <artifactId>mqtt-client</artifactId>
  <version>1.10</version>
  <scope>compile</scope>
</dependency>

project タグ内に以下を記載してください
<build>
  <plugins>
    <plugin>
      <groupId>org.codehaus.mojo</groupId>
      <artifactId>exec-maven-plugin</artifactId>
      <version>1.2.1</version>
      <configuration>
        <mainClass>com.sample.rabbitmq.mqtt.App</mainClass>
      </configuration>
    </plugin>
  </plugins>
</build>

emacs src/main/java/com/sample/rabbitmq/mqtt/App.java
package com.sample.rabbitmq.mqtt;

import java.net.URISyntaxException;

import org.fusesource.mqtt.client.*;

public class App {

public static void main( String[] args ) throws URISyntaxException, Exception {
        String topic = "test/topic";

        MQTT mqtt = new MQTT();
        mqtt.setHost("localhost", 1883);
        BlockingConnection bc = mqtt.blockingConnection();
        bc.connect();
        // subscribe
        Topic[] topics = {
            new Topic(topic, QoS.AT_LEAST_ONCE)
        };
        byte[] qoses = bc.subscribe(topics);
        for (byte qos: qoses) {
            System.out.println(qos);
        }

        // send message
        bc.publish(topic, "payloadA".getBytes(), QoS.AT_LEAST_ONCE, false);
        bc.publish(topic, "payloadB".getBytes(), QoS.AT_LEAST_ONCE, false);

        // receive message
        Message message = bc.receive();
        System.out.println(message.getTopic());
        byte[] payload = message.getPayload();
        message.ack();
        System.out.println(new String(payload, "UTF-8"));

        message = bc.receive();
        System.out.println(message.getTopic());
        payload = message.getPayload();
        message.ack();
        System.out.println(new String(payload, "UTF-8"));

        bc.disconnect();
    }

}

mvn clean compile exec:java
1
test/topic
payloadA
test/topic
payloadB
(・・・ここで停止する・・・)

■動作説明
接続前に subscribe することで MQTT で使用する Exchange とそれにバインドする Routing Key を決定します


本ライブラリを使った RabbitMQ での MQTT の場合は Exchange を経由してとメッセージのやりとりをするようです
subscribe 時に指定している Topic 名は Exchange にバインドするRouting Keyとして使用されます(何でもOKです)
ただ、publish するときや receive するときはこのバイティングされた Routing Key を指定する必要があります

publish するときの第一引数には subscribe 時に指定した Routing Key を指定します
receive 時には subscribe で指定した Routing Key を自動で判断してメッセージの受信を行います

MQTT でのメッセージのやり取りがあった場合には RabbitMQ 側に一時的なキューが作成されます
disconnect するとこのキューは削除されなくなります
(connect するたびにキューが作成されるようです)


サンプルでは2回 publish して3回 receive しています
receive 時にキューにメッセージがない場合は次のメッセージが到着するまで待ち続けます
MQTT 用に一時的に作られたキューに対して管理コンソールなどから publish message するとそのメッセージを受信して処理が終了します

サンプルでは同期処理でメッセージのやり取りをやっています
FutureConnection という機能がありこれを使うと非同期でメッセージのやり取りを行うことができます

以上です
とりあえず RabbitMQ を使って MQTT を動作させることができました
実際に HTTP や AMQP との比較は行っていないのでどれだけ軽量なのかわかっていませんが
せっかく動いたので次は比較でもできればなと思います

■参考サイト

0 件のコメント:

コメントを投稿