Connessione ad Apache Kafka in Dart
In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In C#.
Do per scontate che lo abbiate già installato e configurato.
Volendo potete usare anche docker (come in effetti ho fatto io).
Prima di tutto dobbiamo installare una libreria; qui c'è un pò di confusione sulle versioni supportate.
Io alla fine ho scelto kafka_dart:
dart pub add kafka_dart
Poi dobbiamo installare anche un libreria a livello di sistema.
Se siete su Arch e derivate:
# pacman -Sy librdkafka
Qui sotto un pò di codice:
import 'package:kafka_dart/kafka_dart.dart';
Future<void> main() async {
String topic = 'test-topic';
String host = 'localhost:9092';
// PRODUCER
final producer = await KafkaFactory.createAndInitializeProducer(
bootstrapServers: host,
);
try {
await producer.sendMessage(
topic: topic,
payload: 'valore1',
key: 'chiave1',
);
await producer.flush();
print('Messaggio inviato!');
} finally {
await producer.close();
}
await Future.delayed(Duration(seconds: 2));
// CONSUMER
final consumer = await KafkaFactory.createAndInitializeConsumer(
bootstrapServers: host,
groupId: 'my-consumer-group',
);
try {
await consumer.subscribe([topic]);
print('Consumer sottoscritto al topic');
while (true) {
final message = await consumer.pollMessage();
if (message != null) {
print(
'Chiave: ${message.key.hasValue ? message.key.value : 'null'}, Valore: ${message.payload.value}',
);
await consumer.commitAsync();
break;
}
await Future.delayed(Duration(milliseconds: 100));
}
} finally {
await consumer.close();
}
}
Enjoy!
dart kafka
Commentami!