summaryrefslogtreecommitdiff
path: root/rust/scraper/src/messaging.rs
diff options
context:
space:
mode:
Diffstat (limited to 'rust/scraper/src/messaging.rs')
-rw-r--r--rust/scraper/src/messaging.rs158
1 files changed, 158 insertions, 0 deletions
diff --git a/rust/scraper/src/messaging.rs b/rust/scraper/src/messaging.rs
new file mode 100644
index 0000000..eea343d
--- /dev/null
+++ b/rust/scraper/src/messaging.rs
@@ -0,0 +1,158 @@
+use futures::StreamExt;
+use lapin::uri::AMQPUri;
+use lapin::{
+ message::Delivery,
+ options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions},
+ types::FieldTable,
+ Connection, ConnectionProperties,
+};
+use std::borrow::Cow;
+use thiserror::Error;
+use tokio::{
+ sync::mpsc,
+ sync::mpsc::{Receiver, Sender},
+ sync::mpsc::error::SendError
+};
+use tracing::{error, trace, warn};
+
+pub struct RabbitMQClient {
+ uri: AMQPUri,
+ queue_name: Cow<'static, str>,
+ options: RabbitMQClientOptions,
+}
+
+impl RabbitMQClient {
+ pub fn new<S>(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result<RabbitMQClient, String>
+ where
+ S: Into<Cow<'static, str>>,
+ {
+ Ok(RabbitMQClient {
+ uri: uri.parse()?,
+ queue_name: queue_name.into(),
+ options: opts,
+ })
+ }
+
+ pub async fn start(&self) -> Receiver<Delivery> {
+ let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize);
+
+ // Spawn worker thread/task to handle AMQP Messages
+ tokio::spawn(Self::run(
+ self.uri.clone(),
+ self.queue_name.clone(),
+ self.options.clone(),
+ tx,
+ ));
+
+ rx
+ }
+
+ async fn run(
+ uri: AMQPUri,
+ queue_name: Cow<'static, str>,
+ options: RabbitMQClientOptions,
+ tx: Sender<Delivery>,
+ ) {
+ let mut conn = None;
+ loop {
+ if let Some(c) = &conn {
+ match Self::inner_run(&c, &queue_name, &options, &tx).await {
+ Ok(_) => {
+ warn!("consumer unexpectedly canceled, remaking AMQP channel");
+ }
+ Err(MessagingError::LapinError(err)) => {
+ error!(?err, "AMQP Error");
+ drop(conn.take()); // Drop connection to reconnect below
+ }
+ Err(MessagingError::SendError(err)) => {
+ // Channel error: rx channel returned from RabbitMQClient::start has
+ // been dropped. This is unrecoverable. The thread will exit.
+ error!(?err, "mpsc channel closed");
+ return;
+ }
+ }
+ } else {
+ // Try to connect with AMQP server with Exponential Backoff
+ let conn_res =
+ backoff::future::retry(backoff::ExponentialBackoff::default(), || async {
+ trace!(?uri, "connecting to AMQP server");
+ Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?)
+ })
+ .await;
+ if let Ok(c) = conn_res {
+ trace!(connection = ?c, "connection established");
+ conn = Some(c);
+ } else {
+ error!("unable to connect to AMQP server");
+ return;
+ }
+ }
+ }
+ }
+
+ async fn inner_run(
+ conn: &Connection,
+ queue_name: &str,
+ opts: &RabbitMQClientOptions,
+ tx: &Sender<Delivery>,
+ ) -> Result<(), MessagingError> {
+ let channel = conn.create_channel().await?;
+
+ let _ = channel
+ .queue_declare(
+ queue_name,
+ QueueDeclareOptions {
+ durable: true,
+ ..Default::default()
+ },
+ FieldTable::default(),
+ )
+ .await?;
+
+ // Set prefetch count to not exceed mpsc channel buffer capacity
+ channel
+ .basic_qos(opts.max_queue_size, BasicQosOptions::default())
+ .await?;
+
+ let mut consumer = channel
+ .basic_consume(
+ queue_name,
+ "",
+ BasicConsumeOptions::default(),
+ FieldTable::default(),
+ )
+ .await?;
+
+ while let Some(delivery) = consumer.next().await {
+ trace!(?delivery, "delivery received");
+ tx.send(delivery?).await?
+ }
+
+ // If we get to here, the consumer was somehow canceled.
+ // The run() function will just restart the channel.
+ Ok(())
+ }
+}
+
+#[derive(Clone)]
+pub struct RabbitMQClientOptions {
+ pub conn_opts: ConnectionProperties,
+ pub max_queue_size: u16,
+}
+
+impl Default for RabbitMQClientOptions {
+ fn default() -> Self {
+ Self {
+ conn_opts: ConnectionProperties::default(),
+ max_queue_size: 10,
+ }
+ }
+}
+
+#[derive(Error, Debug)]
+enum MessagingError {
+ #[error(transparent)]
+ LapinError(#[from] lapin::Error),
+ #[error(transparent)]
+ SendError(#[from] SendError<Delivery>),
+}