Connessione ad Apache Kafka in Dart

Mattepuffo's logo
Connessione ad Apache Kafka in Dart

Connessione ad Apache Kafka in Dart

In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In C#.

Do per scontate che lo abbiate già installato e configurato.

Volendo potete usare anche docker (come in effetti ho fatto io).

Prima di tutto dobbiamo installare una libreria; qui c'è un pò di confusione sulle versioni supportate.

Io alla fine ho scelto kafka_dart:

dart pub add kafka_dart

Poi dobbiamo installare anche un libreria a livello di sistema.

Se siete su Arch e derivate:

# pacman -Sy librdkafka

Qui sotto un pò di codice:

import 'package:kafka_dart/kafka_dart.dart';

Future<void> main() async {
  String topic = 'test-topic';
  String host = 'localhost:9092';

  // PRODUCER
  final producer = await KafkaFactory.createAndInitializeProducer(
    bootstrapServers: host,
  );

  try {
    await producer.sendMessage(
      topic: topic,
      payload: 'valore1',
      key: 'chiave1',
    );

    await producer.flush();
    print('Messaggio inviato!');
  } finally {
    await producer.close();
  }

  await Future.delayed(Duration(seconds: 2));

  // CONSUMER
  final consumer = await KafkaFactory.createAndInitializeConsumer(
    bootstrapServers: host,
    groupId: 'my-consumer-group',
  );

  try {
    await consumer.subscribe([topic]);
    print('Consumer sottoscritto al topic');

    while (true) {
      final message = await consumer.pollMessage();
      if (message != null) {
        print(
          'Chiave: ${message.key.hasValue ? message.key.value : 'null'}, Valore: ${message.payload.value}',
        );
        await consumer.commitAsync();
        break;
      }

      await Future.delayed(Duration(milliseconds: 100));
    }
  } finally {
    await consumer.close();
  }
}

Enjoy!


Condividi

Commentami!