Usare RabbitMQ in Rust con AMPQ

Mattepuffo's logo
Usare RabbitMQ in Rust con AMPQ

Usare RabbitMQ in Rust con AMPQ

In un precedente articolo abbiamo già visto come usare RabbitMQ in Rust.

Ma li abbiamo usato gli stream, mentre in questo articolo useremo AMPQ classico.

Che differenze ci sono?

RabbitMQ Classic (AMQP)

Con le queue classiche:

  • Quando un messaggio viene consumato (ACK), sparisce dalla coda
  • Se il consumer si riconnette, quei messaggi già consumati non sono più disponibili

RabbitMQ Stream (rabbitmq-stream)

  • I dati NON vengono cancellati quando sono consumati.
  • Lo stream mantiene TUTTI i messaggi per la retention configurata (es: 5GB o X giorni).
  • I consumer leggono i messaggi a partire da un certo "offset" (come Kafka).
  • Più consumer possono leggere lo stesso stream, anche con offset diversi.
  • Un consumer può leggere più volte lo stream, partendo ad esempio dall’inizio (“First”) o dalla fine (“Last”).
  • La dashboard di RabbitMQ mostrerà che lo stream contiene tutti i messaggi, e i consumer non “svuotano” lo stream.

Queste le dipendenze che ci servono:

[dependencies]
lapin = "2.5.3"
tokio = { version = "1.45.0", features = ["full"] }
futures-util = "0.3.31"

Qui sotto il codice che ha entrambe le funzioni, producer e consumer:

use lapin::{
  options::{BasicAckOptions, BasicConsumeOptions, QueueDeclareOptions},
  types::FieldTable,
  BasicProperties, Connection, ConnectionProperties,
};
use futures_util::stream::StreamExt;
use lapin::options::BasicPublishOptions;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  let nome_stream = "test_ampq";
  producer(nome_stream, 10).await?;
  consumer(nome_stream).await?;
  Ok(())
}

pub async fn producer(queue: &str, num_messages: usize) -> Result<(), Box<dyn std::error::Error>> {
  let addr = "amqp://guest:guest@127.0.0.1:5672/%2f";
  let conn = Connection::connect(addr, ConnectionProperties::default()).await?;

  let channel = conn.create_channel().await?;

  channel
      .queue_declare(
        queue,
        QueueDeclareOptions::default(),
        FieldTable::default(),
      )
      .await?;

  for i in 1..=num_messages {
    let msg = format!("Messaggio numero {}", i);
    channel
        .basic_publish(
          "",
          queue,
          BasicPublishOptions::default(),
          msg.as_bytes(),
          BasicProperties::default(),
        )
        .await?
        .await?;

    println!("Messaggio {} inviato alla coda: {}", i, queue);
  }

  Ok(())
}

pub async fn consumer(queue: &str) -> Result<(), Box<dyn std::error::Error>> {
  let addr = "amqp://guest:guest@127.0.0.1:5672/%2f";
  let conn = Connection::connect(addr, ConnectionProperties::default()).await?;

  let channel = conn.create_channel().await?;

  channel
      .queue_declare(
        queue,
        QueueDeclareOptions::default(),
        FieldTable::default(),
      )
      .await?;

  let mut consumer = channel
      .basic_consume(
        queue,
        "my_consumer",
        BasicConsumeOptions::default(),
        FieldTable::default(),
      )
      .await?;

  println!("Aspettando messaggi su '{}'", queue);

  let mut count = 0;
  while let Some(delivery) = consumer.next().await {
    let delivery = delivery?;
    let msg = String::from_utf8_lossy(&delivery.data);
    println!("Ricevuto messaggio: {}", msg);
    delivery.ack(BasicAckOptions::default()).await?;
    count += 1;
  }

  println!("Consumer terminato. Messaggi ricevuti: {}", count);

  Ok(())
}

Enjoy!


Condividi

Commentami!