diff options
author | 2022-08-21 14:09:50 -0700 | |
---|---|---|
committer | 2022-08-21 14:09:50 -0700 | |
commit | 5f4151e3044dc3f6849a357c398939463f028af6 (patch) | |
tree | 8f8c2610544d8bf1df426e4e41cd6a6ea676618f /rust/scraper/src | |
parent | a75e41e9cce5d2e4ede08ef2def8add124a49c73 (diff) | |
download | touchpad-5f4151e3044dc3f6849a357c398939463f028af6.tar.gz touchpad-5f4151e3044dc3f6849a357c398939463f028af6.tar.zst touchpad-5f4151e3044dc3f6849a357c398939463f028af6.zip |
Adds RabbitMQ messaging and management hub
Diffstat (limited to 'rust/scraper/src')
-rw-r--r-- | rust/scraper/src/database/mod.rs | 24 | ||||
-rw-r--r-- | rust/scraper/src/hub/mod.rs | 18 | ||||
-rw-r--r-- | rust/scraper/src/main.rs | 32 | ||||
-rw-r--r-- | rust/scraper/src/messaging/error.rs | 11 | ||||
-rw-r--r-- | rust/scraper/src/messaging/mod.rs | 151 | ||||
-rw-r--r-- | rust/scraper/src/touchpad/mod.rs | 2 |
6 files changed, 227 insertions, 11 deletions
diff --git a/rust/scraper/src/database/mod.rs b/rust/scraper/src/database/mod.rs index d712259..ee15e3b 100644 --- a/rust/scraper/src/database/mod.rs +++ b/rust/scraper/src/database/mod.rs @@ -1,9 +1,12 @@ pub mod postgres; pub mod error; +use std::error::Error; +use futures::TryFutureExt; pub use error::DatabaseError; use proto::touchpad::common::v1; +use crate::hub::Output; type Result<T> = std::result::Result<T, DatabaseError>; @@ -22,3 +25,24 @@ pub trait DatabaseClient { async fn get_event_by_number(&self, meet_id: u32, number: u32) -> Result<v1::Event>; async fn add_event(&self, event: &v1::Event) -> Result<()>; } + +#[async_trait::async_trait] +impl<T> Output for T + where + T: DatabaseClient + Sync { + async fn add_swimmer(&self, swimmer: &v1::Swimmer) -> std::result::Result<(), Box<dyn Error>> { + self.add_swimmer(swimmer).err_into().await + } + + async fn add_team(&self, team: &v1::Team) -> std::result::Result<(), Box<dyn Error>> { + self.add_team(team).err_into().await + } + + async fn add_meet(&self, meet: &v1::SwimMeet) -> std::result::Result<(), Box<dyn Error>> { + self.add_meet(meet).err_into().await + } + + async fn upsert_event(&self, event: &v1::Event) -> std::result::Result<(), Box<dyn std::error::Error>> { + self.add_event(event).err_into().await + } +}
\ No newline at end of file diff --git a/rust/scraper/src/hub/mod.rs b/rust/scraper/src/hub/mod.rs new file mode 100644 index 0000000..0720f7c --- /dev/null +++ b/rust/scraper/src/hub/mod.rs @@ -0,0 +1,18 @@ +use lapin::message::Delivery; +use tokio::sync::mpsc::Receiver; +use proto::touchpad::common::v1; +use crate::TouchpadLiveClient; + +#[async_trait::async_trait] +pub trait Output { + async fn add_swimmer(&self, swimmer: &v1::Swimmer) -> Result<(), Box<dyn std::error::Error>>; + async fn add_team(&self, team: &v1::Team) -> Result<(), Box<dyn std::error::Error>>; + async fn add_meet(&self, meet: &v1::SwimMeet) -> Result<(), Box<dyn std::error::Error>>; + async fn upsert_event(&self, event: &v1::Event) -> Result<(), Box<dyn std::error::Error>>; +} + +pub struct Hub<'a> { + source: TouchpadLiveClient<'a>, + outputs: Vec<Box<dyn Output>>, + messages: Receiver<Delivery>, +} diff --git a/rust/scraper/src/main.rs b/rust/scraper/src/main.rs index 901a833..1ba8b36 100644 --- a/rust/scraper/src/main.rs +++ b/rust/scraper/src/main.rs @@ -1,23 +1,35 @@ +#![allow(dead_code)] + +use crate::messaging::{RabbitMQClient, RabbitMQClientOptions}; +use lapin::options::BasicAckOptions; +use tracing::Level; use touchpad::TouchpadLiveClient; mod database; +mod hub; +mod messaging; mod touchpad; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { - let client = TouchpadLiveClient::new(); + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(Level::DEBUG) + .finish(); - let meet_info = client.meet_info(18618).await?; - println!("{:?}", meet_info); + tracing::subscriber::set_global_default(subscriber).unwrap(); - let events = client.events(18618).await?; - println!("{:?}", events); + let client = RabbitMQClient::new( + "amqp://127.0.0.1:5672", + "scraper_queue", + RabbitMQClientOptions::default(), + )?; - let swimmers = client.swimmers(18618).await?; - println!("{:?}", swimmers); + let mut rx = client.start().await; - let event = client.individual_event(18618, 1031911).await?; - println!("{:?}", event); + while let Some(delivery) = rx.recv().await { + println!("\"{}\" - {:?}", std::str::from_utf8(&delivery.data)?, delivery); + delivery.ack(BasicAckOptions::default()).await.unwrap(); + } Ok(()) -}
\ No newline at end of file +} diff --git a/rust/scraper/src/messaging/error.rs b/rust/scraper/src/messaging/error.rs new file mode 100644 index 0000000..b68ea20 --- /dev/null +++ b/rust/scraper/src/messaging/error.rs @@ -0,0 +1,11 @@ +use lapin::message::Delivery; +use thiserror::Error; +use tokio::sync::mpsc::error::SendError; + +#[derive(Error, Debug)] +pub enum MessagingError { + #[error(transparent)] + LapinError(#[from] lapin::Error), + #[error(transparent)] + SendError(#[from] SendError<Delivery>), +} diff --git a/rust/scraper/src/messaging/mod.rs b/rust/scraper/src/messaging/mod.rs new file mode 100644 index 0000000..947cab4 --- /dev/null +++ b/rust/scraper/src/messaging/mod.rs @@ -0,0 +1,151 @@ +mod error; + +use crate::messaging::error::MessagingError; +use futures::StreamExt; +use lapin::uri::AMQPUri; +use lapin::{ + message::Delivery, + options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions}, + types::FieldTable, + Connection, ConnectionProperties, +}; +use std::borrow::Cow; +use tokio::{ + sync::mpsc, + sync::mpsc::{Receiver, Sender}, +}; +use tracing::{error, trace, warn}; + +pub struct RabbitMQClient { + uri: AMQPUri, + queue_name: Cow<'static, str>, + options: RabbitMQClientOptions, +} + +impl RabbitMQClient { + pub fn new<S>(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result<RabbitMQClient, String> + where + S: Into<Cow<'static, str>>, + { + Ok(RabbitMQClient { + uri: uri.parse()?, + queue_name: queue_name.into(), + options: opts, + }) + } + + pub async fn start(&self) -> Receiver<Delivery> { + let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize); + + // Spawn worker thread/task to handle AMQP Messages + tokio::spawn(Self::run( + self.uri.clone(), + self.queue_name.clone(), + self.options.clone(), + tx, + )); + + rx + } + + async fn run( + uri: AMQPUri, + queue_name: Cow<'static, str>, + options: RabbitMQClientOptions, + tx: Sender<Delivery>, + ) { + let mut conn = None; + loop { + if let Some(c) = &conn { + match Self::inner_run(&c, &queue_name, &options, &tx).await { + Ok(_) => { + warn!("consumer unexpectedly canceled, remaking AMQP channel"); + } + Err(MessagingError::LapinError(err)) => { + error!(?err, "AMQP Error"); + drop(conn.take()); // Drop connection to reconnect below + } + Err(MessagingError::SendError(err)) => { + // Channel error: rx channel returned from RabbitMQClient::start has + // been dropped. This is unrecoverable. The thread will exit. + error!(?err, "mpsc channel closed"); + return; + } + } + } else { + // Try to connect with AMQP server with Exponential Backoff + let conn_res = + backoff::future::retry(backoff::ExponentialBackoff::default(), || async { + trace!(?uri, "connecting to AMQP server"); + Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?) + }) + .await; + if let Ok(c) = conn_res { + trace!(connection = ?c, "connection established"); + conn = Some(c); + } else { + error!("unable to connect to AMQP server"); + return; + } + } + } + } + + async fn inner_run( + conn: &Connection, + queue_name: &str, + opts: &RabbitMQClientOptions, + tx: &Sender<Delivery>, + ) -> Result<(), MessagingError> { + let channel = conn.create_channel().await?; + + let _ = channel + .queue_declare( + queue_name, + QueueDeclareOptions { + durable: true, + ..Default::default() + }, + FieldTable::default(), + ) + .await?; + + // Set prefetch count to not exceed mpsc channel buffer capacity + channel + .basic_qos(opts.max_queue_size, BasicQosOptions::default()) + .await?; + + let mut consumer = channel + .basic_consume( + queue_name, + "", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + while let Some(delivery) = consumer.next().await { + trace!(?delivery, "delivery received"); + tx.send(delivery?).await? + } + + // If we get to here, the consumer was somehow canceled. + // The run() function will just restart the channel. + Ok(()) + } +} + +#[derive(Clone)] +pub struct RabbitMQClientOptions { + pub conn_opts: ConnectionProperties, + pub max_queue_size: u16, +} + +impl Default for RabbitMQClientOptions { + fn default() -> Self { + Self { + conn_opts: ConnectionProperties::default(), + max_queue_size: 10, + } + } +} diff --git a/rust/scraper/src/touchpad/mod.rs b/rust/scraper/src/touchpad/mod.rs index 515ba92..2ed3d9e 100644 --- a/rust/scraper/src/touchpad/mod.rs +++ b/rust/scraper/src/touchpad/mod.rs @@ -3,7 +3,7 @@ mod request_response; mod conversion; use std::borrow::Cow; -use futures::{TryStreamExt}; +use futures::{TryFutureExt}; use proto::touchpad::common::v1; use reqwest::Client; use std::collections::HashMap; |