Connessione ad Apache Kafka in Java

Mattepuffo's logo
Connessione ad Apache Kafka in Java

Connessione ad Apache Kafka in Java

In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In Java.

Do per scontate che lo abbiate già installato e configurato.

Volendo potete usare anche docker (come in effetti ho fatto io).

Prima di tutto dobbiamo installare una libreria; se usate Maven:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.8.0</version>
</dependency>

Qui sotto un pò di codice, in cui prima inviamo dei dati e poi li leggiamo:

package org.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Collections;
import java.util.Properties;
import java.util.Random;

public class Main {
  public static void main(String[] args) {
    String topic = "test-topic";
    String host = "localhost:9092";

    byte[] array = new byte[7];
    new Random().nextBytes(array);

    // PROPERTIES PER IL PRODUCER
    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", host);
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
      ProducerRecord<String, String> record = new ProducerRecord<>(topic, "chiave1", "valore1");
      producer.send(record);
      System.out.println("Messaggio inviato!");
    }

    // PROPERTIES PER IL CONSUMER
    Properties consumerProps = new Properties();
    consumerProps.put("bootstrap.servers", host);
    consumerProps.put("group.id", "test-group");
    consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProps.put("auto.offset.reset", "earliest");

    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
      consumer.subscribe(Collections.singletonList(topic));
      while (true) {
        ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
          System.out.printf("Chiave: %s, Valore: %s%n", record.key(), record.value());
        }
      }
    }
  }
}

Enjoy!


Condividi

Commentami!