Connessione ad Apache Kafka in Scala

Mattepuffo's logo
Connessione ad Apache Kafka in Scala

Connessione ad Apache Kafka in Scala

In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In Scala.

Specifico che ho usato la versione 3 di Scala.

Do per scontate che lo abbiate già installato e configurato.

Volendo potete usare anche docker (come in effetti ho fatto io).

Prima di tutto dobbiamo installare una libreria; se usate sbt:

libraryDependencies ++= Seq(
  "org.apache.kafka" % "kafka-clients" % "3.6.0"
)

Qui sotto un pò di codice, in cui prima inviamo dei dati e poi li leggiamo:

package org.example

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}
import java.util.Properties
import scala.jdk.CollectionConverters.*
import java.time.Duration

@main
def main(): Unit = {
  val topic = "test-topic"
  val host = "localhost:9092"

  // PRODUCER
  val producerProps = Properties()
  producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host)
  producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
  producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)

  val producer = KafkaProducer[String, String](producerProps)
  val record = ProducerRecord(topic, "chiave1", "valore1")

  producer.send(record)
  producer.flush()
  println("Messaggio inviato!")
  producer.close()

  Thread.sleep(2000)

  // CONSUMER
  val consumerProps = Properties()
  consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host)
  consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group")
  consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
  consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
  consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val consumer = KafkaConsumer[String, String](consumerProps)
  consumer.subscribe(List(topic).asJava)

  val records = consumer.poll(Duration.ofMillis(5000))
  records.asScala.foreach { record =>
    println(s"Chiave: ${record.key()}, Valore: ${record.value()}")
  }

  consumer.close()
}

Enjoy!


Condividi

Commentami!