Connessione ad Apache Kafka in Java
In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In Java.
Do per scontate che lo abbiate già installato e configurato.
Volendo potete usare anche docker (come in effetti ho fatto io).
Prima di tutto dobbiamo installare una libreria; se usate Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.8.0</version>
</dependency>
Qui sotto un pò di codice, in cui prima inviamo dei dati e poi li leggiamo:
package org.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Collections;
import java.util.Properties;
import java.util.Random;
public class Main {
public static void main(String[] args) {
String topic = "test-topic";
String host = "localhost:9092";
byte[] array = new byte[7];
new Random().nextBytes(array);
// PROPERTIES PER IL PRODUCER
Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", host);
producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "chiave1", "valore1");
producer.send(record);
System.out.println("Messaggio inviato!");
}
// PROPERTIES PER IL CONSUMER
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", host);
consumerProps.put("group.id", "test-group");
consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.put("auto.offset.reset", "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Chiave: %s, Valore: %s%n", record.key(), record.value());
}
}
}
}
}
Enjoy!
java maven kafka
Commentami!