diff options
Diffstat (limited to 'rust/scraper/src/messaging')
-rw-r--r-- | rust/scraper/src/messaging/error.rs | 11 | ||||
-rw-r--r-- | rust/scraper/src/messaging/mod.rs | 151 |
2 files changed, 0 insertions, 162 deletions
diff --git a/rust/scraper/src/messaging/error.rs b/rust/scraper/src/messaging/error.rs deleted file mode 100644 index b68ea20..0000000 --- a/rust/scraper/src/messaging/error.rs +++ /dev/null @@ -1,11 +0,0 @@ -use lapin::message::Delivery; -use thiserror::Error; -use tokio::sync::mpsc::error::SendError; - -#[derive(Error, Debug)] -pub enum MessagingError { - #[error(transparent)] - LapinError(#[from] lapin::Error), - #[error(transparent)] - SendError(#[from] SendError<Delivery>), -} diff --git a/rust/scraper/src/messaging/mod.rs b/rust/scraper/src/messaging/mod.rs deleted file mode 100644 index 947cab4..0000000 --- a/rust/scraper/src/messaging/mod.rs +++ /dev/null @@ -1,151 +0,0 @@ -mod error; - -use crate::messaging::error::MessagingError; -use futures::StreamExt; -use lapin::uri::AMQPUri; -use lapin::{ - message::Delivery, - options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions}, - types::FieldTable, - Connection, ConnectionProperties, -}; -use std::borrow::Cow; -use tokio::{ - sync::mpsc, - sync::mpsc::{Receiver, Sender}, -}; -use tracing::{error, trace, warn}; - -pub struct RabbitMQClient { - uri: AMQPUri, - queue_name: Cow<'static, str>, - options: RabbitMQClientOptions, -} - -impl RabbitMQClient { - pub fn new<S>(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result<RabbitMQClient, String> - where - S: Into<Cow<'static, str>>, - { - Ok(RabbitMQClient { - uri: uri.parse()?, - queue_name: queue_name.into(), - options: opts, - }) - } - - pub async fn start(&self) -> Receiver<Delivery> { - let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize); - - // Spawn worker thread/task to handle AMQP Messages - tokio::spawn(Self::run( - self.uri.clone(), - self.queue_name.clone(), - self.options.clone(), - tx, - )); - - rx - } - - async fn run( - uri: AMQPUri, - queue_name: Cow<'static, str>, - options: RabbitMQClientOptions, - tx: Sender<Delivery>, - ) { - let mut conn = None; - loop { - if let Some(c) = &conn { - match Self::inner_run(&c, &queue_name, &options, &tx).await { - Ok(_) => { - warn!("consumer unexpectedly canceled, remaking AMQP channel"); - } - Err(MessagingError::LapinError(err)) => { - error!(?err, "AMQP Error"); - drop(conn.take()); // Drop connection to reconnect below - } - Err(MessagingError::SendError(err)) => { - // Channel error: rx channel returned from RabbitMQClient::start has - // been dropped. This is unrecoverable. The thread will exit. - error!(?err, "mpsc channel closed"); - return; - } - } - } else { - // Try to connect with AMQP server with Exponential Backoff - let conn_res = - backoff::future::retry(backoff::ExponentialBackoff::default(), || async { - trace!(?uri, "connecting to AMQP server"); - Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?) - }) - .await; - if let Ok(c) = conn_res { - trace!(connection = ?c, "connection established"); - conn = Some(c); - } else { - error!("unable to connect to AMQP server"); - return; - } - } - } - } - - async fn inner_run( - conn: &Connection, - queue_name: &str, - opts: &RabbitMQClientOptions, - tx: &Sender<Delivery>, - ) -> Result<(), MessagingError> { - let channel = conn.create_channel().await?; - - let _ = channel - .queue_declare( - queue_name, - QueueDeclareOptions { - durable: true, - ..Default::default() - }, - FieldTable::default(), - ) - .await?; - - // Set prefetch count to not exceed mpsc channel buffer capacity - channel - .basic_qos(opts.max_queue_size, BasicQosOptions::default()) - .await?; - - let mut consumer = channel - .basic_consume( - queue_name, - "", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await?; - - while let Some(delivery) = consumer.next().await { - trace!(?delivery, "delivery received"); - tx.send(delivery?).await? - } - - // If we get to here, the consumer was somehow canceled. - // The run() function will just restart the channel. - Ok(()) - } -} - -#[derive(Clone)] -pub struct RabbitMQClientOptions { - pub conn_opts: ConnectionProperties, - pub max_queue_size: u16, -} - -impl Default for RabbitMQClientOptions { - fn default() -> Self { - Self { - conn_opts: ConnectionProperties::default(), - max_queue_size: 10, - } - } -} |