From 9a5c3c17d7465c68c3c68d00f1004dfc1a0e961c Mon Sep 17 00:00:00 2001 From: Anshul Gupta Date: Sun, 21 Aug 2022 14:12:35 -0700 Subject: Downgrades messaging from module to file --- rust/scraper/src/messaging.rs | 158 ++++++++++++++++++++++++++++++++++++ rust/scraper/src/messaging/error.rs | 11 --- rust/scraper/src/messaging/mod.rs | 151 ---------------------------------- 3 files changed, 158 insertions(+), 162 deletions(-) create mode 100644 rust/scraper/src/messaging.rs delete mode 100644 rust/scraper/src/messaging/error.rs delete mode 100644 rust/scraper/src/messaging/mod.rs (limited to 'rust/scraper') diff --git a/rust/scraper/src/messaging.rs b/rust/scraper/src/messaging.rs new file mode 100644 index 0000000..eea343d --- /dev/null +++ b/rust/scraper/src/messaging.rs @@ -0,0 +1,158 @@ +use futures::StreamExt; +use lapin::uri::AMQPUri; +use lapin::{ + message::Delivery, + options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions}, + types::FieldTable, + Connection, ConnectionProperties, +}; +use std::borrow::Cow; +use thiserror::Error; +use tokio::{ + sync::mpsc, + sync::mpsc::{Receiver, Sender}, + sync::mpsc::error::SendError +}; +use tracing::{error, trace, warn}; + +pub struct RabbitMQClient { + uri: AMQPUri, + queue_name: Cow<'static, str>, + options: RabbitMQClientOptions, +} + +impl RabbitMQClient { + pub fn new(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result + where + S: Into>, + { + Ok(RabbitMQClient { + uri: uri.parse()?, + queue_name: queue_name.into(), + options: opts, + }) + } + + pub async fn start(&self) -> Receiver { + let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize); + + // Spawn worker thread/task to handle AMQP Messages + tokio::spawn(Self::run( + self.uri.clone(), + self.queue_name.clone(), + self.options.clone(), + tx, + )); + + rx + } + + async fn run( + uri: AMQPUri, + queue_name: Cow<'static, str>, + options: RabbitMQClientOptions, + tx: Sender, + ) { + let mut conn = None; + loop { + if let Some(c) = &conn { + match Self::inner_run(&c, &queue_name, &options, &tx).await { + Ok(_) => { + warn!("consumer unexpectedly canceled, remaking AMQP channel"); + } + Err(MessagingError::LapinError(err)) => { + error!(?err, "AMQP Error"); + drop(conn.take()); // Drop connection to reconnect below + } + Err(MessagingError::SendError(err)) => { + // Channel error: rx channel returned from RabbitMQClient::start has + // been dropped. This is unrecoverable. The thread will exit. + error!(?err, "mpsc channel closed"); + return; + } + } + } else { + // Try to connect with AMQP server with Exponential Backoff + let conn_res = + backoff::future::retry(backoff::ExponentialBackoff::default(), || async { + trace!(?uri, "connecting to AMQP server"); + Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?) + }) + .await; + if let Ok(c) = conn_res { + trace!(connection = ?c, "connection established"); + conn = Some(c); + } else { + error!("unable to connect to AMQP server"); + return; + } + } + } + } + + async fn inner_run( + conn: &Connection, + queue_name: &str, + opts: &RabbitMQClientOptions, + tx: &Sender, + ) -> Result<(), MessagingError> { + let channel = conn.create_channel().await?; + + let _ = channel + .queue_declare( + queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + // Set prefetch count to not exceed mpsc channel buffer capacity + channel + .basic_qos(opts.max_queue_size, BasicQosOptions::default()) + .await?; + + let mut consumer = channel + .basic_consume( + queue_name, + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + while let Some(delivery) = consumer.next().await { + trace!(?delivery, "delivery received"); + tx.send(delivery?).await? + } + + // If we get to here, the consumer was somehow canceled. + // The run() function will just restart the channel. + Ok(()) + } +} + +#[derive(Clone)] +pub struct RabbitMQClientOptions { + pub conn_opts: ConnectionProperties, + pub max_queue_size: u16, +} + +impl Default for RabbitMQClientOptions { + fn default() -> Self { + Self { + conn_opts: ConnectionProperties::default(), + max_queue_size: 10, + } + } +} + +#[derive(Error, Debug)] +enum MessagingError { + #[error(transparent)] + LapinError(#[from] lapin::Error), + #[error(transparent)] + SendError(#[from] SendError), +} diff --git a/rust/scraper/src/messaging/error.rs b/rust/scraper/src/messaging/error.rs deleted file mode 100644 index b68ea20..0000000 --- a/rust/scraper/src/messaging/error.rs +++ /dev/null @@ -1,11 +0,0 @@ -use lapin::message::Delivery; -use thiserror::Error; -use tokio::sync::mpsc::error::SendError; - -#[derive(Error, Debug)] -pub enum MessagingError { - #[error(transparent)] - LapinError(#[from] lapin::Error), - #[error(transparent)] - SendError(#[from] SendError), -} diff --git a/rust/scraper/src/messaging/mod.rs b/rust/scraper/src/messaging/mod.rs deleted file mode 100644 index 947cab4..0000000 --- a/rust/scraper/src/messaging/mod.rs +++ /dev/null @@ -1,151 +0,0 @@ -mod error; - -use crate::messaging::error::MessagingError; -use futures::StreamExt; -use lapin::uri::AMQPUri; -use lapin::{ - message::Delivery, - options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions}, - types::FieldTable, - Connection, ConnectionProperties, -}; -use std::borrow::Cow; -use tokio::{ - sync::mpsc, - sync::mpsc::{Receiver, Sender}, -}; -use tracing::{error, trace, warn}; - -pub struct RabbitMQClient { - uri: AMQPUri, - queue_name: Cow<'static, str>, - options: RabbitMQClientOptions, -} - -impl RabbitMQClient { - pub fn new(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result - where - S: Into>, - { - Ok(RabbitMQClient { - uri: uri.parse()?, - queue_name: queue_name.into(), - options: opts, - }) - } - - pub async fn start(&self) -> Receiver { - let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize); - - // Spawn worker thread/task to handle AMQP Messages - tokio::spawn(Self::run( - self.uri.clone(), - self.queue_name.clone(), - self.options.clone(), - tx, - )); - - rx - } - - async fn run( - uri: AMQPUri, - queue_name: Cow<'static, str>, - options: RabbitMQClientOptions, - tx: Sender, - ) { - let mut conn = None; - loop { - if let Some(c) = &conn { - match Self::inner_run(&c, &queue_name, &options, &tx).await { - Ok(_) => { - warn!("consumer unexpectedly canceled, remaking AMQP channel"); - } - Err(MessagingError::LapinError(err)) => { - error!(?err, "AMQP Error"); - drop(conn.take()); // Drop connection to reconnect below - } - Err(MessagingError::SendError(err)) => { - // Channel error: rx channel returned from RabbitMQClient::start has - // been dropped. This is unrecoverable. The thread will exit. - error!(?err, "mpsc channel closed"); - return; - } - } - } else { - // Try to connect with AMQP server with Exponential Backoff - let conn_res = - backoff::future::retry(backoff::ExponentialBackoff::default(), || async { - trace!(?uri, "connecting to AMQP server"); - Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?) - }) - .await; - if let Ok(c) = conn_res { - trace!(connection = ?c, "connection established"); - conn = Some(c); - } else { - error!("unable to connect to AMQP server"); - return; - } - } - } - } - - async fn inner_run( - conn: &Connection, - queue_name: &str, - opts: &RabbitMQClientOptions, - tx: &Sender, - ) -> Result<(), MessagingError> { - let channel = conn.create_channel().await?; - - let _ = channel - .queue_declare( - queue_name, - QueueDeclareOptions { - durable: true, - ..Default::default() - }, - FieldTable::default(), - ) - .await?; - - // Set prefetch count to not exceed mpsc channel buffer capacity - channel - .basic_qos(opts.max_queue_size, BasicQosOptions::default()) - .await?; - - let mut consumer = channel - .basic_consume( - queue_name, - "", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await?; - - while let Some(delivery) = consumer.next().await { - trace!(?delivery, "delivery received"); - tx.send(delivery?).await? - } - - // If we get to here, the consumer was somehow canceled. - // The run() function will just restart the channel. - Ok(()) - } -} - -#[derive(Clone)] -pub struct RabbitMQClientOptions { - pub conn_opts: ConnectionProperties, - pub max_queue_size: u16, -} - -impl Default for RabbitMQClientOptions { - fn default() -> Self { - Self { - conn_opts: ConnectionProperties::default(), - max_queue_size: 10, - } - } -} -- cgit v1.2.3