summaryrefslogtreecommitdiff
path: root/rust/scraper/src/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'rust/scraper/src/messaging')
-rw-r--r--rust/scraper/src/messaging/error.rs11
-rw-r--r--rust/scraper/src/messaging/mod.rs151
2 files changed, 0 insertions, 162 deletions
diff --git a/rust/scraper/src/messaging/error.rs b/rust/scraper/src/messaging/error.rs
deleted file mode 100644
index b68ea20..0000000
--- a/rust/scraper/src/messaging/error.rs
+++ /dev/null
@@ -1,11 +0,0 @@
-use lapin::message::Delivery;
-use thiserror::Error;
-use tokio::sync::mpsc::error::SendError;
-
-#[derive(Error, Debug)]
-pub enum MessagingError {
- #[error(transparent)]
- LapinError(#[from] lapin::Error),
- #[error(transparent)]
- SendError(#[from] SendError<Delivery>),
-}
diff --git a/rust/scraper/src/messaging/mod.rs b/rust/scraper/src/messaging/mod.rs
deleted file mode 100644
index 947cab4..0000000
--- a/rust/scraper/src/messaging/mod.rs
+++ /dev/null
@@ -1,151 +0,0 @@
-mod error;
-
-use crate::messaging::error::MessagingError;
-use futures::StreamExt;
-use lapin::uri::AMQPUri;
-use lapin::{
- message::Delivery,
- options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions},
- types::FieldTable,
- Connection, ConnectionProperties,
-};
-use std::borrow::Cow;
-use tokio::{
- sync::mpsc,
- sync::mpsc::{Receiver, Sender},
-};
-use tracing::{error, trace, warn};
-
-pub struct RabbitMQClient {
- uri: AMQPUri,
- queue_name: Cow<'static, str>,
- options: RabbitMQClientOptions,
-}
-
-impl RabbitMQClient {
- pub fn new<S>(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result<RabbitMQClient, String>
- where
- S: Into<Cow<'static, str>>,
- {
- Ok(RabbitMQClient {
- uri: uri.parse()?,
- queue_name: queue_name.into(),
- options: opts,
- })
- }
-
- pub async fn start(&self) -> Receiver<Delivery> {
- let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize);
-
- // Spawn worker thread/task to handle AMQP Messages
- tokio::spawn(Self::run(
- self.uri.clone(),
- self.queue_name.clone(),
- self.options.clone(),
- tx,
- ));
-
- rx
- }
-
- async fn run(
- uri: AMQPUri,
- queue_name: Cow<'static, str>,
- options: RabbitMQClientOptions,
- tx: Sender<Delivery>,
- ) {
- let mut conn = None;
- loop {
- if let Some(c) = &conn {
- match Self::inner_run(&c, &queue_name, &options, &tx).await {
- Ok(_) => {
- warn!("consumer unexpectedly canceled, remaking AMQP channel");
- }
- Err(MessagingError::LapinError(err)) => {
- error!(?err, "AMQP Error");
- drop(conn.take()); // Drop connection to reconnect below
- }
- Err(MessagingError::SendError(err)) => {
- // Channel error: rx channel returned from RabbitMQClient::start has
- // been dropped. This is unrecoverable. The thread will exit.
- error!(?err, "mpsc channel closed");
- return;
- }
- }
- } else {
- // Try to connect with AMQP server with Exponential Backoff
- let conn_res =
- backoff::future::retry(backoff::ExponentialBackoff::default(), || async {
- trace!(?uri, "connecting to AMQP server");
- Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?)
- })
- .await;
- if let Ok(c) = conn_res {
- trace!(connection = ?c, "connection established");
- conn = Some(c);
- } else {
- error!("unable to connect to AMQP server");
- return;
- }
- }
- }
- }
-
- async fn inner_run(
- conn: &Connection,
- queue_name: &str,
- opts: &RabbitMQClientOptions,
- tx: &Sender<Delivery>,
- ) -> Result<(), MessagingError> {
- let channel = conn.create_channel().await?;
-
- let _ = channel
- .queue_declare(
- queue_name,
- QueueDeclareOptions {
- durable: true,
- ..Default::default()
- },
- FieldTable::default(),
- )
- .await?;
-
- // Set prefetch count to not exceed mpsc channel buffer capacity
- channel
- .basic_qos(opts.max_queue_size, BasicQosOptions::default())
- .await?;
-
- let mut consumer = channel
- .basic_consume(
- queue_name,
- "",
- BasicConsumeOptions::default(),
- FieldTable::default(),
- )
- .await?;
-
- while let Some(delivery) = consumer.next().await {
- trace!(?delivery, "delivery received");
- tx.send(delivery?).await?
- }
-
- // If we get to here, the consumer was somehow canceled.
- // The run() function will just restart the channel.
- Ok(())
- }
-}
-
-#[derive(Clone)]
-pub struct RabbitMQClientOptions {
- pub conn_opts: ConnectionProperties,
- pub max_queue_size: u16,
-}
-
-impl Default for RabbitMQClientOptions {
- fn default() -> Self {
- Self {
- conn_opts: ConnectionProperties::default(),
- max_queue_size: 10,
- }
- }
-}