summaryrefslogtreecommitdiff
path: root/rust/scraper/src/messaging.rs
blob: eea343d0331cfea1cc50bdc62d2b5a67e0efadfa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
use futures::StreamExt;
use lapin::uri::AMQPUri;
use lapin::{
    message::Delivery,
    options::{BasicConsumeOptions, BasicQosOptions, QueueDeclareOptions},
    types::FieldTable,
    Connection, ConnectionProperties,
};
use std::borrow::Cow;
use thiserror::Error;
use tokio::{
    sync::mpsc,
    sync::mpsc::{Receiver, Sender},
    sync::mpsc::error::SendError
};
use tracing::{error, trace, warn};

pub struct RabbitMQClient {
    uri: AMQPUri,
    queue_name: Cow<'static, str>,
    options: RabbitMQClientOptions,
}

impl RabbitMQClient {
    pub fn new<S>(uri: &str, queue_name: S, opts: RabbitMQClientOptions) -> Result<RabbitMQClient, String>
        where
            S: Into<Cow<'static, str>>,
    {
        Ok(RabbitMQClient {
            uri: uri.parse()?,
            queue_name: queue_name.into(),
            options: opts,
        })
    }

    pub async fn start(&self) -> Receiver<Delivery> {
        let (tx, rx) = mpsc::channel(self.options.max_queue_size as usize);

        // Spawn worker thread/task to handle AMQP Messages
        tokio::spawn(Self::run(
            self.uri.clone(),
            self.queue_name.clone(),
            self.options.clone(),
            tx,
        ));

        rx
    }

    async fn run(
        uri: AMQPUri,
        queue_name: Cow<'static, str>,
        options: RabbitMQClientOptions,
        tx: Sender<Delivery>,
    ) {
        let mut conn = None;
        loop {
            if let Some(c) = &conn {
                match Self::inner_run(&c, &queue_name, &options, &tx).await {
                    Ok(_) => {
                        warn!("consumer unexpectedly canceled, remaking AMQP channel");
                    }
                    Err(MessagingError::LapinError(err)) => {
                        error!(?err, "AMQP Error");
                        drop(conn.take()); // Drop connection to reconnect below
                    }
                    Err(MessagingError::SendError(err)) => {
                        // Channel error: rx channel returned from RabbitMQClient::start has
                        // been dropped. This is unrecoverable. The thread will exit.
                        error!(?err, "mpsc channel closed");
                        return;
                    }
                }
            } else {
                // Try to connect with AMQP server with Exponential Backoff
                let conn_res =
                    backoff::future::retry(backoff::ExponentialBackoff::default(), || async {
                        trace!(?uri, "connecting to AMQP server");
                        Ok(Connection::connect_uri(uri.clone(), options.conn_opts.clone()).await?)
                    })
                        .await;
                if let Ok(c) = conn_res {
                    trace!(connection = ?c, "connection established");
                    conn = Some(c);
                } else {
                    error!("unable to connect to AMQP server");
                    return;
                }
            }
        }
    }

    async fn inner_run(
        conn: &Connection,
        queue_name: &str,
        opts: &RabbitMQClientOptions,
        tx: &Sender<Delivery>,
    ) -> Result<(), MessagingError> {
        let channel = conn.create_channel().await?;

        let _ = channel
            .queue_declare(
                queue_name,
                QueueDeclareOptions {
                    durable: true,
                    ..Default::default()
                },
                FieldTable::default(),
            )
            .await?;

        // Set prefetch count to not exceed mpsc channel buffer capacity
        channel
            .basic_qos(opts.max_queue_size, BasicQosOptions::default())
            .await?;

        let mut consumer = channel
            .basic_consume(
                queue_name,
                "",
                BasicConsumeOptions::default(),
                FieldTable::default(),
            )
            .await?;

        while let Some(delivery) = consumer.next().await {
            trace!(?delivery, "delivery received");
            tx.send(delivery?).await?
        }

        // If we get to here, the consumer was somehow canceled.
        // The run() function will just restart the channel.
        Ok(())
    }
}

#[derive(Clone)]
pub struct RabbitMQClientOptions {
    pub conn_opts: ConnectionProperties,
    pub max_queue_size: u16,
}

impl Default for RabbitMQClientOptions {
    fn default() -> Self {
        Self {
            conn_opts: ConnectionProperties::default(),
            max_queue_size: 10,
        }
    }
}

#[derive(Error, Debug)]
enum MessagingError {
    #[error(transparent)]
    LapinError(#[from] lapin::Error),
    #[error(transparent)]
    SendError(#[from] SendError<Delivery>),
}