Connessione ad Apache Kafka in Scala
In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In Scala.
Specifico che ho usato la versione 3 di Scala.
Do per scontate che lo abbiate già installato e configurato.
Volendo potete usare anche docker (come in effetti ho fatto io).
Prima di tutto dobbiamo installare una libreria; se usate sbt:
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-clients" % "3.6.0"
)
Qui sotto un pò di codice, in cui prima inviamo dei dati e poi li leggiamo:
package org.example
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import java.util.Properties
import scala.jdk.CollectionConverters.*
import java.time.Duration
@main
def main(): Unit = {
val topic = "test-topic"
val host = "localhost:9092"
// PRODUCER
val producerProps = Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
val producer = KafkaProducer[String, String](producerProps)
val record = ProducerRecord(topic, "chiave1", "valore1")
producer.send(record)
producer.flush()
println("Messaggio inviato!")
producer.close()
Thread.sleep(2000)
// CONSUMER
val consumerProps = Properties()
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host)
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val consumer = KafkaConsumer[String, String](consumerProps)
consumer.subscribe(List(topic).asJava)
val records = consumer.poll(Duration.ofMillis(5000))
records.asScala.foreach { record =>
println(s"Chiave: ${record.key()}, Valore: ${record.value()}")
}
consumer.close()
}
Enjoy!
scala sbt kafka
Commentami!