概要
KafkaはScalaで書かれたpub/subモデルの分散メッセージングプラットフォームです
有名なサービスだとLinkedInが導入しているようです
同じようなミドルウェアにKestrel(ケストレル)やRabbitMQがあります
そもそも、KafkaさんがどんなものなのかよくわかったなかったのでとりあえずQuickStartをやってみたので紹介します
環境
- Mac OS X 10.8.5
- kafka 0.8.2.0 (Scala 2.11)
インストール
公式で配布されているバイナリをダウンロードしてインストールしてみたいと思います
cd ~/Downloads
wget http://supergsego.com/apache/kafka/0.8.2.0/kafka_2.11-0.8.2.0.tgz
tar zvxf kafka_2.11-0.8.2.0.tgz
mv kafka_2.11-0.8.2.0 /usr/local/
cd /usr/local
ln -s kafka_2.11-0.8.2.0/ kafka
MacであればHomebrewでもインストールできそうです
インストールは上記で完了です
インストールが完了したら早速試してみましょう
試してみる
kafkaの起動
zookeeperの起動
cd /usr/local/kafka
sh bin/zookeeper-server-start.sh config/zookeeper.properties
kafkaサーバの起動
cd /usr/local/kafka
sh bin/kafka-server-start.sh config/server.properties
先にzookeeperのプロセスを起動してください
それぞれ別ターミナルで実行してください
シェルを叩いているだけですが、内部的にはjarファイルが実行されてJavaのプロセスが立ち上がっています
トピックの作成
メッセージを送信するためのトピックを作成します
cd /usr/local/kafka
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
testというトピックを作成しました
bin/kafka-topics.sh --zookeeper localhost:2181
の部分はテンプレ的な感じです
その他パラメータの説明は以下の通りです
- create・・・トピックを作成するためのオプションです、このオプションを変更することでトピックの一覧を表示したり削除したりすることができます
- replication-factor・・・kafkaクラスタ内に存在するkafkaブローカーの何台にトピックを作成するか指定できます、なのでkafkaブローカーが1つしかない場合は1以上は指定できません
- partitions・・・トピックをいくつのパーティションに分割するか指定します、この辺はちょっと自身がないですが1つのバーティション上にトピックを作成するとアクセス効率が悪いので複数のバーティションに分割することでアクセスを分散するためだと思います(The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has
a key, the partition logic or ordering of the messages will be affected) - topic・・・作成するトピック名を指定します
といった感じです
この辺のオプション周りは詳しくは調べていないので詳細は公式ドキュメントを見ていただければと思います
作成したトピックを確認するにはlist
を使います
cd /usr/local/kafka
sh bin/kafka-topics.sh --list --zookeeper localhost:2181
作成したトピックの一覧が表示されると思います
更に各トピックごとの詳細を知りたい場合は「–describe」を利用します
cd /usr/local/kafka
sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
以下のような感じでパーティションの情報やレプリケーションの情報を確認することができます
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
トピックの削除
ここで注意が必要ですが今回使っているkafka0.8.2.0ではトピックを削除するのに「–delete」というオプションを使えばいいのですが、release版の0.8.1.1では「–delete」オプションがありません
そのため作成したトピックは削除することができないようです
0.8.2.0では以下のようにしてトピックを削除します
cd /usr/local/kafka
echo 'delete.topic.enable=true' >> config/server.properties
まず「–delete」オプションを有効にする設定を追記します
そして再度kafkaサーバを起動し以下のコマンドを実行しま
cd /usr/local/kafka
bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181
これで作成したトピックの削除を実施することができます
メッセージを送信
作成したtestトピックにメッセージを送信してみましょう
cd /usr/local/kafka
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
メッセージを送信する際のポートは9092になります
上記を実行すると入力待ち状態になります
メッセージを入力してEnterを押すとメッセージを送信することができます
連続で何個もメッセージを送信することができます
終了する場合はCtrl+cで抜けます
メッセージを受信
では送信したメッセージを受信してみましょう
以下のコマンドで受信することができます
cd /usr/local/kafka
sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
先ほど送信したメッセージの一覧が表示されると思います
メッセージの受信も送信時と同じように待ち状態になります
この状態でメッセージの送信を別のターミナルで実行してみるとリアルタイムにメッセージを受信できると思います
「–from-beginning」を指定するとトピックに送信されたメッセージを最初からすべて取得することができます
接続した時点からのメッセージのみを取得したい場合は「–from-beginning」オプションを削除して受信してみてください
最後に
Kafkaはあくまでもメッセージを管理するための基盤で、スケーラビリティとか分散に特化したミドルウェアです
Kafka自体にはMQTTやWebSocket、HTTPなどのインタフェースを持っていなくて本当にメッセージを管理するだけの基盤になります
いろいろ調べるとkafkaと連携してMQTTインタフェースを持たせるブリッジ的なものを作成している方もいたので、インタフェース周りはエコシステム的に作られていくのかなと思いました
今回は基本的な部分しか触れていませんがQuickStartはこの後Kafkaクラスタを構築する手順になっています
本気でやろうとするとKafkaクラスタを構築するのはおそらく必須条件になると思います
0 件のコメント:
コメントを投稿