Connessione ad Apache Kafka in Go
In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In Go.
Do per scontate che lo abbiate già installato e configurato.
Volendo potete usare anche docker (come in effetti ho fatto io).
Prima di tutto dobbiamo installare una libreria:
go get github.com/IBM/sarama
Qui sotto un pò di codice, in cui prima inviamo dei dati e poi li leggiamo:
package main
import (
"fmt"
"log"
"time"
"github.com/IBM/sarama"
)
func main() {
topic := "test-topic"
host := []string{"localhost:9092"}
config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer(host, config)
if err != nil {
log.Fatalf("Errore creazione producer: %v", err)
}
message := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("chiave1"),
Value: sarama.StringEncoder("valore1"),
}
_, _, err = producer.SendMessage(message)
if err != nil {
log.Fatalf("Errore invio messaggio: %v", err)
}
fmt.Println("Messaggio inviato!")
producer.Close()
time.Sleep(2 * time.Second)
// CONSUMER
consumerConfig := sarama.NewConfig()
consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
consumer, err := sarama.NewConsumer(host, consumerConfig)
if err != nil {
log.Fatalf("Errore creazione consumer: %v", err)
}
defer consumer.Close()
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
if err != nil {
log.Fatalf("Errore sottoscrizione: %v", err)
}
defer partitionConsumer.Close()
msg := <-partitionConsumer.Messages()
fmt.Printf("Chiave: %s, Valore: %sn", string(msg.Key), string(msg.Value))
}
Enjoy!
go kafka
Commentami!