aboutsummaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs52
1 files changed, 29 insertions, 23 deletions
diff --git a/src/client.rs b/src/client.rs
index 1149bec..45d3d85 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -1,5 +1,5 @@
use crate::config::{ClientConfig, ClientServiceConfig, Config, ServiceType, TransportType};
-use crate::config_watcher::ServiceChange;
+use crate::config_watcher::{ClientServiceChange, ConfigChange};
use crate::helper::udp_connect;
use crate::protocol::Hello::{self, *};
use crate::protocol::{
@@ -31,7 +31,7 @@ use crate::constants::{run_control_chan_backoff, UDP_BUFFER_SIZE, UDP_SENDQ_SIZE
pub async fn run_client(
config: Config,
shutdown_rx: broadcast::Receiver<bool>,
- service_rx: mpsc::Receiver<ServiceChange>,
+ update_rx: mpsc::Receiver<ConfigChange>,
) -> Result<()> {
let config = config.client.ok_or_else(|| {
anyhow!(
@@ -42,13 +42,13 @@ pub async fn run_client(
match config.transport.transport_type {
TransportType::Tcp => {
let mut client = Client::<TcpTransport>::from(config).await?;
- client.run(shutdown_rx, service_rx).await
+ client.run(shutdown_rx, update_rx).await
}
TransportType::Tls => {
#[cfg(feature = "tls")]
{
let mut client = Client::<TlsTransport>::from(config).await?;
- client.run(shutdown_rx, service_rx).await
+ client.run(shutdown_rx, update_rx).await
}
#[cfg(not(feature = "tls"))]
crate::helper::feature_not_compile("tls")
@@ -57,7 +57,7 @@ pub async fn run_client(
#[cfg(feature = "noise")]
{
let mut client = Client::<NoiseTransport>::from(config).await?;
- client.run(shutdown_rx, service_rx).await
+ client.run(shutdown_rx, update_rx).await
}
#[cfg(not(feature = "noise"))]
crate::helper::feature_not_compile("noise")
@@ -91,7 +91,7 @@ impl<T: 'static + Transport> Client<T> {
async fn run(
&mut self,
mut shutdown_rx: broadcast::Receiver<bool>,
- mut service_rx: mpsc::Receiver<ServiceChange>,
+ mut update_rx: mpsc::Receiver<ConfigChange>,
) -> Result<()> {
for (name, config) in &self.config.services {
// Create a control channel for each service defined
@@ -116,24 +116,9 @@ impl<T: 'static + Transport> Client<T> {
}
break;
},
- e = service_rx.recv() => {
+ e = update_rx.recv() => {
if let Some(e) = e {
- match e {
- ServiceChange::ClientAdd(s)=> {
- let name = s.name.clone();
- let handle = ControlChannelHandle::new(
- s,
- self.config.remote_addr.clone(),
- self.transport.clone(),
- self.config.heartbeat_timeout
- );
- let _ = self.service_handles.insert(name, handle);
- },
- ServiceChange::ClientDelete(s)=> {
- let _ = self.service_handles.remove(&s);
- },
- _ => ()
- }
+ self.handle_hot_reload(e).await;
}
}
}
@@ -146,6 +131,27 @@ impl<T: 'static + Transport> Client<T> {
Ok(())
}
+
+ async fn handle_hot_reload(&mut self, e: ConfigChange) {
+ match e {
+ ConfigChange::ClientChange(client_change) => match client_change {
+ ClientServiceChange::Add(cfg) => {
+ let name = cfg.name.clone();
+ let handle = ControlChannelHandle::new(
+ cfg,
+ self.config.remote_addr.clone(),
+ self.transport.clone(),
+ self.config.heartbeat_timeout,
+ );
+ let _ = self.service_handles.insert(name, handle);
+ }
+ ClientServiceChange::Delete(s) => {
+ let _ = self.service_handles.remove(&s);
+ }
+ },
+ ignored => warn!("Ignored {:?} since running as a client", ignored),
+ }
+ }
}
struct RunDataChannelArgs<T: Transport> {