diff options
Diffstat (limited to 'rust/scraper/src/messaging.rs')
-rw-r--r-- | rust/scraper/src/messaging.rs | 158 |
1 files changed, 158 insertions, 0 deletions
diff --git a/rust/scraper/src/messaging.rs b/rust/scraper/src/messaging.rs new file mode 100644 index 0000000..eea343d --- /dev/null +++ b/rust/scraper/src/messaging.rs @@ -0,0 +1,158 @@ +use futures::StreamExt; +use lapin::uri::AMQPUri; +use lapin::{ + message::Delivery, + options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions}, + types::FieldTable, + Connection, ConnectionProperties, +}; +use std::borrow::Cow; +use thiserror::Error; +use tokio::{ + sync::mpsc, + sync::mpsc::{Receiver, Sender}, + sync::mpsc::error::SendError +}; +use tracing::{error, trace, warn}; + +pub struct RabbitMQClient { + uri: AMQPUri, + queue_name: Cow<'static, str>, + options: RabbitMQClientOptions, +} + +impl RabbitMQClient { + pub fn new<S>(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result<RabbitMQClient, String> + where + S: Into<Cow<'static, str>>, + { + Ok(RabbitMQClient { + uri: uri.parse()?, + queue_name: queue_name.into(), + options: opts, + }) + } + + pub async fn start(&self) -> Receiver<Delivery> { + let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize); + + // Spawn worker thread/task to handle AMQP Messages + tokio::spawn(Self::run( + self.uri.clone(), + self.queue_name.clone(), + self.options.clone(), + tx, + )); + + rx + } + + async fn run( + uri: AMQPUri, + queue_name: Cow<'static, str>, + options: RabbitMQClientOptions, + tx: Sender<Delivery>, + ) { + let mut conn = None; + loop { + if let Some(c) = &conn { + match Self::inner_run(&c, &queue_name, &options, &tx).await { + Ok(_) => { + warn!("consumer unexpectedly canceled, remaking AMQP channel"); + } + Err(MessagingError::LapinError(err)) => { + error!(?err, "AMQP Error"); + drop(conn.take()); // Drop connection to reconnect below + } + Err(MessagingError::SendError(err)) => { + // Channel error: rx channel returned from RabbitMQClient::start has + // been dropped. This is unrecoverable. The thread will exit. + error!(?err, "mpsc channel closed"); + return; + } + } + } else { + // Try to connect with AMQP server with Exponential Backoff + let conn_res = + backoff::future::retry(backoff::ExponentialBackoff::default(), || async { + trace!(?uri, "connecting to AMQP server"); + Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?) + }) + .await; + if let Ok(c) = conn_res { + trace!(connection = ?c, "connection established"); + conn = Some(c); + } else { + error!("unable to connect to AMQP server"); + return; + } + } + } + } + + async fn inner_run( + conn: &Connection, + queue_name: &str, + opts: &RabbitMQClientOptions, + tx: &Sender<Delivery>, + ) -> Result<(), MessagingError> { + let channel = conn.create_channel().await?; + + let _ = channel + .queue_declare( + queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + // Set prefetch count to not exceed mpsc channel buffer capacity + channel + .basic_qos(opts.max_queue_size, BasicQosOptions::default()) + .await?; + + let mut consumer = channel + .basic_consume( + queue_name, + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + while let Some(delivery) = consumer.next().await { + trace!(?delivery, "delivery received"); + tx.send(delivery?).await? + } + + // If we get to here, the consumer was somehow canceled. + // The run() function will just restart the channel. + Ok(()) + } +} + +#[derive(Clone)] +pub struct RabbitMQClientOptions { + pub conn_opts: ConnectionProperties, + pub max_queue_size: u16, +} + +impl Default for RabbitMQClientOptions { + fn default() -> Self { + Self { + conn_opts: ConnectionProperties::default(), + max_queue_size: 10, + } + } +} + +#[derive(Error, Debug)] +enum MessagingError { + #[error(transparent)] + LapinError(#[from] lapin::Error), + #[error(transparent)] + SendError(#[from] SendError<Delivery>), +} |