Usare RabbitMQ in Rust
In questo articolo vediamo come usare RabbitMQ in Rust.
Do per scontato che abbiate già RabbitMQ funzionante.
In caso contrario questo il comando che ho usato per docker:
docker run -dit --name rabbitmq-server -p 5552:5552 -p 15672:15672 -p 5672:5672 -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' rabbitmq:4-management bash -c "rabbitmq-plugins enable rabbitmq_stream && rabbitmq-server"
Queste sono le dipendenze che sto usando da aggiungere al cargo.toml:
rabbitmq-stream-client = "0.4.4"
tokio = { version = "1.43.1", features = ["full"] }
futures = "0.3.31"
Qui sotto un codice completo in cui lanciamo prima un producer e poi un consumer:
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{ByteCapacity, Message, OffsetSpecification, ResponseCode};
use rabbitmq_stream_client::Environment;
use tokio::task;
use std::io::stdin;
use futures::{StreamExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let nome_stream = "test_stream";
producer(nome_stream, 10).await?;
consumer(nome_stream).await?;
Ok(())
}
async fn producer(stream: &str, num_messages: usize) -> Result<(), Box<dyn std::error::Error>> {
let environment = Environment::builder().build().await?;
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;
if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
let producer = environment.producer().build(stream).await?;
for i in 1..=num_messages {
let body = format!("Messaggio numero {}", i);
producer
.send_with_confirm(Message::builder().body(body.as_bytes()).build())
.await?;
println!("Sent message {} to stream: {}", i, stream);
}
producer.close().await?;
Ok(())
}
async fn consumer(stream: &str) -> Result<(), Box<dyn std::error::Error>> {
let environment = Environment::builder().build().await?;
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;
if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream)
.await
.unwrap();
let handle = consumer.handle();
task::spawn(async move {
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();
println!(
"Got message: {:#?} with offset: {}",
d.message()
.data()
.map(|data| String::from_utf8(data.to_vec()).unwrap()),
d.offset(),
);
}
});
println!("Premi invio per chiudere il consumer");
let _ = stdin().read_line(&mut String::new());
handle.close().await?;
println!("consumer closed successfully");
Ok(())
}
Enjoy!
rust rabbitmq docker
Commentami!