Connessione ad Apache Kafka in Kotlin
In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In Kotlin.
Do per scontate che lo abbiate già installato e configurato.
Volendo potete usare anche docker (come in effetti ho fatto io).
Prima di tutto dobbiamo installare una libreria; se usate Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>
Qui sotto un pò di codice, in cui prima inviamo dei dati e poi li leggiamo:
package org.example
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration
import java.util.Properties
fun main() {
val topic = "test-topic-kotlin"
val host = "localhost:9092"
// PROPERTIES PER IL PRODUCER
val producerProps = Properties()
producerProps.put("bootstrap.servers", host)
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
KafkaProducer<String?, String?>(producerProps).use { producer ->
val record = ProducerRecord(topic, "chiave2", "valore2")
producer.send(record)
println("Messaggio inviato!")
}
// PROPERTIES PER IL CONSUMER
val consumerProps = Properties()
consumerProps.put("bootstrap.servers", host)
consumerProps.put("group.id", "test-group")
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProps.put("auto.offset.reset", "earliest")
KafkaConsumer<String?, String?>(consumerProps).use { consumer ->
consumer.subscribe(mutableListOf<String?>(topic))
while (true) {
val records = consumer.poll(Duration.ofMillis(100))
for (record in records) {
System.out.printf("Chiave: %s, Valore: %s%n", record.key(), record.value())
}
}
}
}
Enjoy!
kotlin maven kafka
Commentami!