Connessione ad Apache Kafka in Kotlin

Mattepuffo's logo
Connessione ad Apache Kafka in Kotlin

Connessione ad Apache Kafka in Kotlin

In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In Kotlin.

Do per scontate che lo abbiate già installato e configurato.

Volendo potete usare anche docker (come in effetti ho fatto io).

Prima di tutto dobbiamo installare una libreria; se usate Maven:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.8.0</version>
</dependency>

Qui sotto un pò di codice, in cui prima inviamo dei dati e poi li leggiamo:

package org.example

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.util.Properties

fun main() {
  val topic = "test-topic-kotlin"
  val host = "localhost:9092"

  // PROPERTIES PER IL PRODUCER
  val producerProps = Properties()
  producerProps.put("bootstrap.servers", host)
  producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

  KafkaProducer<String?, String?>(producerProps).use { producer ->
    val record = ProducerRecord(topic, "chiave2", "valore2")
    producer.send(record)
    println("Messaggio inviato!")
  }

  // PROPERTIES PER IL CONSUMER
  val consumerProps = Properties()
  consumerProps.put("bootstrap.servers", host)
  consumerProps.put("group.id", "test-group")
  consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  consumerProps.put("auto.offset.reset", "earliest")

  KafkaConsumer<String?, String?>(consumerProps).use { consumer ->
    consumer.subscribe(mutableListOf<String?>(topic))
    while (true) {
      val records = consumer.poll(Duration.ofMillis(100))
      for (record in records) {
        System.out.printf("Chiave: %s, Valore: %s%n", record.key(), record.value())
      }
    }
  }
}

Enjoy!


Condividi

Commentami!