Connessione ad Apache Kafka in Go

Mattepuffo's logo
Connessione ad Apache Kafka in Go

Connessione ad Apache Kafka in Go

In questo articolo vediamo come connetterci, inviare e ricevere dati da Apache Kafka In Go.

Do per scontate che lo abbiate già installato e configurato.

Volendo potete usare anche docker (come in effetti ho fatto io).

Prima di tutto dobbiamo installare una libreria:

go get github.com/IBM/sarama

Qui sotto un pò di codice, in cui prima inviamo dei dati e poi li leggiamo:

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/IBM/sarama"
)

func main() {
	topic := "test-topic"
	host := []string{"localhost:9092"}

	config := sarama.NewConfig()
	config.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer(host, config)

	if err != nil {
		log.Fatalf("Errore creazione producer: %v", err)
	}

	message := &sarama.ProducerMessage{
		Topic: topic,
		Key:   sarama.StringEncoder("chiave1"),
		Value: sarama.StringEncoder("valore1"),
	}

	_, _, err = producer.SendMessage(message)

	if err != nil {
		log.Fatalf("Errore invio messaggio: %v", err)
	}

	fmt.Println("Messaggio inviato!")
	producer.Close()

	time.Sleep(2 * time.Second)

	// CONSUMER
	consumerConfig := sarama.NewConfig()
	consumerConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

	consumer, err := sarama.NewConsumer(host, consumerConfig)

	if err != nil {
		log.Fatalf("Errore creazione consumer: %v", err)
	}

	defer consumer.Close()

	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)

	if err != nil {
		log.Fatalf("Errore sottoscrizione: %v", err)
	}

	defer partitionConsumer.Close()

	msg := <-partitionConsumer.Messages()
	fmt.Printf("Chiave: %s, Valore: %sn", string(msg.Key), string(msg.Value))
}

Enjoy!


Condividi

Commentami!