Apache Kafka
オブジェクト指向カンファレンスでメッセージソーシングに触れトライしてみたいと思い、まずは、その実装要素である メッセージブローカー について検証してみました。
今回は代表的なメッセージブローカである Apache Kafka についてです。
Kafkaは、大量のデータをリアルタイムで処理できる分散ストリーミングプラットフォームであり、高速でスケーラブルなメッセージングシステムを提供します。
Amazon MSK のような、Kafka のフルマネージド型サービスも提供されています。
今回は、docker-compose で kafka を local pc に起動し、 Java の SDK を利用して Produce と Consume の様子を見ていきます。
環境準備
以下の docker-compose.yml を用意して起動しました。kafka 自体は UI を持たないため、初心者の私は kafka-ui も入れています。
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
kafka-ui:
image: provectuslabs/kafka-ui
ports:
- "8080:8080"
restart: always
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
Apache Kafka の構成要素
Kafka には「トピック」「ブローカー」などのワードが登場します。それらを端的に絵にすると以下のようになります。
データはトピックと呼ばれるカテゴリーに整理され、トピックはさらにパーティションに分割されてデータのスケーラビリティと並行性を向上させます。ブローカーはKafkaサーバーのインスタンスであり、クラスタ内のブローカー群がメッセージの保存と配信を担います。
Producer と Consumer と Partition
プロデューサーはデータをメッセージとしてKafkaトピックに送信します。このとき、プロデューサーはどのトピックにメッセージを送るかを指定し、必要に応じて特定のパーティションを指定することもできます。パーティションはトピック内でデータを分割し、保存する単位であり、これによりデータの並行処理とスケーラビリティが向上します。
コンシューマーはトピックをサブスクライブし、プロデューサーが送信したメッセージをリアルタイムに処理します。コンシューマーは、コンシューマーグループを形成し、同じグループ内のコンシューマーでトピックのパーティションを共有します。
実際にやってみた
なんとも言葉だけだと理解が難しかったので、実際に簡単なサンプルコードで動きを確認してみました。
トピックの作成
kafka 上にトピックを作成します。kafka のコンテナに入り下記コマンドを実行します。
kafka-topics --create --topic demo_kafka --partitions 3 --replication-factor 1 --bootstrap-server localhost:29092
demo_kafka
という名前のトピックを、パーティション 3 つで指定しました。
Producer
Producer を作成し、 Kafka のトピックにメッセージが送信されることを確認します。
public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
ProducerRecord<String, String> record = new ProducerRecord<>("demo_kafka", "key_1", "hello world");
producer.send(record);
}
}
}
上記を実行し、kafka-ui を確認すると、メッセージが送信されていました!
みづらくて恐縮ですが、、パーティションは指定していないですが、 partitin:1 に保存されたみたいです。
続いて、この Producer 下記のように修正し、10 のメッセージを送信するようにしました。
この時、キーは key_i(カウンタ) としています。
public class Producer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(properties)) {
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("demo_kafka", "key_" + i, "hello world");
producer.send(record);
}
}
}
}
kafka-ui をみてみるとメッセージが保存されていました。
ここで注目ポイントは、 key_1
というメッセージが 2 つありますが、どちらも partition:1 に保存されています。
つまり、同じキーを持つメッセージは、同じパーティションに保存されるようになっているのです!
Consumer
続いて、 Consumer を以下のように作成します。簡易コードです。
public class Consumer {
private static final Logger log = LoggerFactory.getLogger(Consumer.class);
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "demo-group-1");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
properties.put("auto.offset.reset", "earliest");
while (true) {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(List.of("demo_kafka"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
log.info("partition: " + record.partition() + " - " +
"key: " + record.key() + " - " +
"value: " + record.value());
}
}
}
}
}
そして Consumer 実行中に、もう一度 Producer を実行すると、以下のようなログが出力されました。
[main] INFO org.example.Consumer - partition: 2 - key: key_0 - value: hello world
[main] INFO org.example.Consumer - partition: 2 - key: key_6 - value: hello world
[main] INFO org.example.Consumer - partition: 2 - key: key_7 - value: hello world
[main] INFO org.example.Consumer - partition: 2 - key: key_8 - value: hello world
[main] INFO org.example.Consumer - partition: 2 - key: key_9 - value: hello world
[main] INFO org.example.Consumer - partition: 0 - key: key_2 - value: hello world
[main] INFO org.example.Consumer - partition: 0 - key: key_4 - value: hello world
[main] INFO org.example.Consumer - partition: 0 - key: key_5 - value: hello world
[main] INFO org.example.Consumer - partition: 1 - key: key_1 - value: hello world
[main] INFO org.example.Consumer - partition: 1 - key: key_3 - value: hello world
3 つすべてのパーティションからメッセージを取得していることがわかります。
複数の Consumer
Consumer を 2 プロセス同時に起動し、Producer を実行しました。
すると以下のようなログになりました。
- Consumer1
[main] INFO org.example.Consumer - partition: 0 - key: key_2 - value: hello world [main] INFO org.example.Consumer - partition: 0 - key: key_4 - value: hello world [main] INFO org.example.Consumer - partition: 0 - key: key_5 - value: hello world [main] INFO org.example.Consumer - partition: 1 - key: key_1 - value: hello world [main] INFO org.example.Consumer - partition: 1 - key: key_3 - value: hello world
- Consumer2
[main] INFO org.example.Consumer - partition: 2 - key: key_0 - value: hello world [main] INFO org.example.Consumer - partition: 2 - key: key_6 - value: hello world [main] INFO org.example.Consumer - partition: 2 - key: key_7 - value: hello world [main] INFO org.example.Consumer - partition: 2 - key: key_8 - value: hello world [main] INFO org.example.Consumer - partition: 2 - key: key_9 - value: hello world
Consumer ごとにパーティションが割り振られていることがわかります。
つまり、同一のパーティションは同一の Consumer がサブスクライブするようになっているのです。(すごい!)
まとめ
Kafka の検証は簡単なものであれば、 PC 上で簡単にできることがわかりました。
そして分かりづらかったトピックやパーティションの振る舞いですが、以下の要素を覚えることとしました。
- 同じキーを持つメッセージは同じパーティションに割り当てられる
- 1つのパーティションは1つの Consumer に割り当てられる
つまり、例えば key をアグリゲートIDにすることで、イベントの順序を担保した Consumer の処理ができることになると思います。
Kafka には Kafka Connect などの様々な機能があるようなので、次回はそちらにチャレンジしたいと思います。