use futures::StreamExt; use lapin::uri::AMQPUri; use lapin::{ message::Delivery, options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions}, types::FieldTable, Connection, ConnectionProperties, }; use std::borrow::Cow; use thiserror::Error; use tokio::{ sync::mpsc, sync::mpsc::{Receiver, Sender}, sync::mpsc::error::SendError }; use tracing::{error, trace, warn}; pub struct RabbitMQClient { uri: AMQPUri, queue_name: Cow<'static, str>, options: RabbitMQClientOptions, } impl RabbitMQClient { pub fn new(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result where S: Into>, { Ok(RabbitMQClient { uri: uri.parse()?, queue_name: queue_name.into(), options: opts, }) } pub async fn start(&self) -> Receiver { let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize); // Spawn worker thread/task to handle AMQP Messages tokio::spawn(Self::run( self.uri.clone(), self.queue_name.clone(), self.options.clone(), tx, )); rx } async fn run( uri: AMQPUri, queue_name: Cow<'static, str>, options: RabbitMQClientOptions, tx: Sender, ) { let mut conn = None; loop { if let Some(c) = &conn { match Self::inner_run(&c, &queue_name, &options, &tx).await { Ok(_) => { warn!("consumer unexpectedly canceled, remaking AMQP channel"); } Err(MessagingError::LapinError(err)) => { error!(?err, "AMQP Error"); drop(conn.take()); // Drop connection to reconnect below } Err(MessagingError::SendError(err)) => { // Channel error: rx channel returned from RabbitMQClient::start has // been dropped. This is unrecoverable. The thread will exit. error!(?err, "mpsc channel closed"); return; } } } else { // Try to connect with AMQP server with Exponential Backoff let conn_res = backoff::future::retry(backoff::ExponentialBackoff::default(), || async { trace!(?uri, "connecting to AMQP server"); Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?) }) .await; if let Ok(c) = conn_res { trace!(connection = ?c, "connection established"); conn = Some(c); } else { error!("unable to connect to AMQP server"); return; } } } } async fn inner_run( conn: &Connection, queue_name: &str, opts: &RabbitMQClientOptions, tx: &Sender, ) -> Result<(), MessagingError> { let channel = conn.create_channel().await?; let _ = channel .queue_declare( queue_name, QueueDeclareOptions { durable: true, ..Default::default() }, FieldTable::default(), ) .await?; // Set prefetch count to not exceed mpsc channel buffer capacity channel .basic_qos(opts.max_queue_size, BasicQosOptions::default()) .await?; let mut consumer = channel .basic_consume( queue_name, "", BasicConsumeOptions::default(), FieldTable::default(), ) .await?; while let Some(delivery) = consumer.next().await { trace!(?delivery, "delivery received"); tx.send(delivery?).await? } // If we get to here, the consumer was somehow canceled. // The run() function will just restart the channel. Ok(()) } } #[derive(Clone)] pub struct RabbitMQClientOptions { pub conn_opts: ConnectionProperties, pub max_queue_size: u16, } impl Default for RabbitMQClientOptions { fn default() -> Self { Self { conn_opts: ConnectionProperties::default(), max_queue_size: 10, } } } #[derive(Error, Debug)] enum MessagingError { #[error(transparent)] LapinError(#[from] lapin::Error), #[error(transparent)] SendError(#[from] SendError), }