summaryrefslogtreecommitdiff
path: root/rust/scraper/src
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@yahoo.com> 2022-08-21 14:09:50 -0700
committerGravatar Anshul Gupta <ansg191@yahoo.com> 2022-08-21 14:09:50 -0700
commit5f4151e3044dc3f6849a357c398939463f028af6 (patch)
tree8f8c2610544d8bf1df426e4e41cd6a6ea676618f /rust/scraper/src
parenta75e41e9cce5d2e4ede08ef2def8add124a49c73 (diff)
downloadtouchpad-5f4151e3044dc3f6849a357c398939463f028af6.tar.gz
touchpad-5f4151e3044dc3f6849a357c398939463f028af6.tar.zst
touchpad-5f4151e3044dc3f6849a357c398939463f028af6.zip
Adds RabbitMQ messaging and management hub
Diffstat (limited to 'rust/scraper/src')
-rw-r--r--rust/scraper/src/database/mod.rs24
-rw-r--r--rust/scraper/src/hub/mod.rs18
-rw-r--r--rust/scraper/src/main.rs32
-rw-r--r--rust/scraper/src/messaging/error.rs11
-rw-r--r--rust/scraper/src/messaging/mod.rs151
-rw-r--r--rust/scraper/src/touchpad/mod.rs2
6 files changed, 227 insertions, 11 deletions
diff --git a/rust/scraper/src/database/mod.rs b/rust/scraper/src/database/mod.rs
index d712259..ee15e3b 100644
--- a/rust/scraper/src/database/mod.rs
+++ b/rust/scraper/src/database/mod.rs
@@ -1,9 +1,12 @@
pub mod postgres;
pub mod error;
+use std::error::Error;
+use futures::TryFutureExt;
pub use error::DatabaseError;
use proto::touchpad::common::v1;
+use crate::hub::Output;
type Result<T> = std::result::Result<T, DatabaseError>;
@@ -22,3 +25,24 @@ pub trait DatabaseClient {
async fn get_event_by_number(&self, meet_id: u32, number: u32) -> Result<v1::Event>;
async fn add_event(&self, event: &v1::Event) -> Result<()>;
}
+
+#[async_trait::async_trait]
+impl<T> Output for T
+ where
+ T: DatabaseClient + Sync {
+ async fn add_swimmer(&self, swimmer: &v1::Swimmer) -> std::result::Result<(), Box<dyn Error>> {
+ self.add_swimmer(swimmer).err_into().await
+ }
+
+ async fn add_team(&self, team: &v1::Team) -> std::result::Result<(), Box<dyn Error>> {
+ self.add_team(team).err_into().await
+ }
+
+ async fn add_meet(&self, meet: &v1::SwimMeet) -> std::result::Result<(), Box<dyn Error>> {
+ self.add_meet(meet).err_into().await
+ }
+
+ async fn upsert_event(&self, event: &v1::Event) -> std::result::Result<(), Box<dyn std::error::Error>> {
+ self.add_event(event).err_into().await
+ }
+} \ No newline at end of file
diff --git a/rust/scraper/src/hub/mod.rs b/rust/scraper/src/hub/mod.rs
new file mode 100644
index 0000000..0720f7c
--- /dev/null
+++ b/rust/scraper/src/hub/mod.rs
@@ -0,0 +1,18 @@
+use lapin::message::Delivery;
+use tokio::sync::mpsc::Receiver;
+use proto::touchpad::common::v1;
+use crate::TouchpadLiveClient;
+
+#[async_trait::async_trait]
+pub trait Output {
+ async fn add_swimmer(&self, swimmer: &v1::Swimmer) -> Result<(), Box<dyn std::error::Error>>;
+ async fn add_team(&self, team: &v1::Team) -> Result<(), Box<dyn std::error::Error>>;
+ async fn add_meet(&self, meet: &v1::SwimMeet) -> Result<(), Box<dyn std::error::Error>>;
+ async fn upsert_event(&self, event: &v1::Event) -> Result<(), Box<dyn std::error::Error>>;
+}
+
+pub struct Hub<'a> {
+ source: TouchpadLiveClient<'a>,
+ outputs: Vec<Box<dyn Output>>,
+ messages: Receiver<Delivery>,
+}
diff --git a/rust/scraper/src/main.rs b/rust/scraper/src/main.rs
index 901a833..1ba8b36 100644
--- a/rust/scraper/src/main.rs
+++ b/rust/scraper/src/main.rs
@@ -1,23 +1,35 @@
+#![allow(dead_code)]
+
+use crate::messaging::{RabbitMQClient, RabbitMQClientOptions};
+use lapin::options::BasicAckOptions;
+use tracing::Level;
use touchpad::TouchpadLiveClient;
mod database;
+mod hub;
+mod messaging;
mod touchpad;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
- let client = TouchpadLiveClient::new();
+ let subscriber = tracing_subscriber::FmtSubscriber::builder()
+ .with_max_level(Level::DEBUG)
+ .finish();
- let meet_info = client.meet_info(18618).await?;
- println!("{:?}", meet_info);
+ tracing::subscriber::set_global_default(subscriber).unwrap();
- let events = client.events(18618).await?;
- println!("{:?}", events);
+ let client = RabbitMQClient::new(
+ "amqp://127.0.0.1:5672",
+ "scraper_queue",
+ RabbitMQClientOptions::default(),
+ )?;
- let swimmers = client.swimmers(18618).await?;
- println!("{:?}", swimmers);
+ let mut rx = client.start().await;
- let event = client.individual_event(18618, 1031911).await?;
- println!("{:?}", event);
+ while let Some(delivery) = rx.recv().await {
+ println!("\"{}\" - {:?}", std::str::from_utf8(&delivery.data)?, delivery);
+ delivery.ack(BasicAckOptions::default()).await.unwrap();
+ }
Ok(())
-} \ No newline at end of file
+}
diff --git a/rust/scraper/src/messaging/error.rs b/rust/scraper/src/messaging/error.rs
new file mode 100644
index 0000000..b68ea20
--- /dev/null
+++ b/rust/scraper/src/messaging/error.rs
@@ -0,0 +1,11 @@
+use lapin::message::Delivery;
+use thiserror::Error;
+use tokio::sync::mpsc::error::SendError;
+
+#[derive(Error, Debug)]
+pub enum MessagingError {
+ #[error(transparent)]
+ LapinError(#[from] lapin::Error),
+ #[error(transparent)]
+ SendError(#[from] SendError<Delivery>),
+}
diff --git a/rust/scraper/src/messaging/mod.rs b/rust/scraper/src/messaging/mod.rs
new file mode 100644
index 0000000..947cab4
--- /dev/null
+++ b/rust/scraper/src/messaging/mod.rs
@@ -0,0 +1,151 @@
+mod error;
+
+use crate::messaging::error::MessagingError;
+use futures::StreamExt;
+use lapin::uri::AMQPUri;
+use lapin::{
+ message::Delivery,
+ options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions},
+ types::FieldTable,
+ Connection, ConnectionProperties,
+};
+use std::borrow::Cow;
+use tokio::{
+ sync::mpsc,
+ sync::mpsc::{Receiver, Sender},
+};
+use tracing::{error, trace, warn};
+
+pub struct RabbitMQClient {
+ uri: AMQPUri,
+ queue_name: Cow<'static, str>,
+ options: RabbitMQClientOptions,
+}
+
+impl RabbitMQClient {
+ pub fn new<S>(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result<RabbitMQClient, String>
+ where
+ S: Into<Cow<'static, str>>,
+ {
+ Ok(RabbitMQClient {
+ uri: uri.parse()?,
+ queue_name: queue_name.into(),
+ options: opts,
+ })
+ }
+
+ pub async fn start(&self) -> Receiver<Delivery> {
+ let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize);
+
+ // Spawn worker thread/task to handle AMQP Messages
+ tokio::spawn(Self::run(
+ self.uri.clone(),
+ self.queue_name.clone(),
+ self.options.clone(),
+ tx,
+ ));
+
+ rx
+ }
+
+ async fn run(
+ uri: AMQPUri,
+ queue_name: Cow<'static, str>,
+ options: RabbitMQClientOptions,
+ tx: Sender<Delivery>,
+ ) {
+ let mut conn = None;
+ loop {
+ if let Some(c) = &conn {
+ match Self::inner_run(&c, &queue_name, &options, &tx).await {
+ Ok(_) => {
+ warn!("consumer unexpectedly canceled, remaking AMQP channel");
+ }
+ Err(MessagingError::LapinError(err)) => {
+ error!(?err, "AMQP Error");
+ drop(conn.take()); // Drop connection to reconnect below
+ }
+ Err(MessagingError::SendError(err)) => {
+ // Channel error: rx channel returned from RabbitMQClient::start has
+ // been dropped. This is unrecoverable. The thread will exit.
+ error!(?err, "mpsc channel closed");
+ return;
+ }
+ }
+ } else {
+ // Try to connect with AMQP server with Exponential Backoff
+ let conn_res =
+ backoff::future::retry(backoff::ExponentialBackoff::default(), || async {
+ trace!(?uri, "connecting to AMQP server");
+ Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?)
+ })
+ .await;
+ if let Ok(c) = conn_res {
+ trace!(connection = ?c, "connection established");
+ conn = Some(c);
+ } else {
+ error!("unable to connect to AMQP server");
+ return;
+ }
+ }
+ }
+ }
+
+ async fn inner_run(
+ conn: &Connection,
+ queue_name: &str,
+ opts: &RabbitMQClientOptions,
+ tx: &Sender<Delivery>,
+ ) -> Result<(), MessagingError> {
+ let channel = conn.create_channel().await?;
+
+ let _ = channel
+ .queue_declare(
+ queue_name,
+ QueueDeclareOptions {
+ durable: true,
+ ..Default::default()
+ },
+ FieldTable::default(),
+ )
+ .await?;
+
+ // Set prefetch count to not exceed mpsc channel buffer capacity
+ channel
+ .basic_qos(opts.max_queue_size, BasicQosOptions::default())
+ .await?;
+
+ let mut consumer = channel
+ .basic_consume(
+ queue_name,
+ "",
+ BasicConsumeOptions::default(),
+ FieldTable::default(),
+ )
+ .await?;
+
+ while let Some(delivery) = consumer.next().await {
+ trace!(?delivery, "delivery received");
+ tx.send(delivery?).await?
+ }
+
+ // If we get to here, the consumer was somehow canceled.
+ // The run() function will just restart the channel.
+ Ok(())
+ }
+}
+
+#[derive(Clone)]
+pub struct RabbitMQClientOptions {
+ pub conn_opts: ConnectionProperties,
+ pub max_queue_size: u16,
+}
+
+impl Default for RabbitMQClientOptions {
+ fn default() -> Self {
+ Self {
+ conn_opts: ConnectionProperties::default(),
+ max_queue_size: 10,
+ }
+ }
+}
diff --git a/rust/scraper/src/touchpad/mod.rs b/rust/scraper/src/touchpad/mod.rs
index 515ba92..2ed3d9e 100644
--- a/rust/scraper/src/touchpad/mod.rs
+++ b/rust/scraper/src/touchpad/mod.rs
@@ -3,7 +3,7 @@ mod request_response;
mod conversion;
use std::borrow::Cow;
-use futures::{TryStreamExt};
+use futures::{TryFutureExt};
use proto::touchpad::common::v1;
use reqwest::Client;
use std::collections::HashMap;