diff options
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 52 |
1 files changed, 29 insertions, 23 deletions
diff --git a/src/client.rs b/src/client.rs index 1149bec..45d3d85 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,5 @@ use crate::config::{ClientConfig, ClientServiceConfig, Config, ServiceType, TransportType}; -use crate::config_watcher::ServiceChange; +use crate::config_watcher::{ClientServiceChange, ConfigChange}; use crate::helper::udp_connect; use crate::protocol::Hello::{self, *}; use crate::protocol::{ @@ -31,7 +31,7 @@ use crate::constants::{run_control_chan_backoff, UDP_BUFFER_SIZE, UDP_SENDQ_SIZE pub async fn run_client( config: Config, shutdown_rx: broadcast::Receiver<bool>, - service_rx: mpsc::Receiver<ServiceChange>, + update_rx: mpsc::Receiver<ConfigChange>, ) -> Result<()> { let config = config.client.ok_or_else(|| { anyhow!( @@ -42,13 +42,13 @@ pub async fn run_client( match config.transport.transport_type { TransportType::Tcp => { let mut client = Client::<TcpTransport>::from(config).await?; - client.run(shutdown_rx, service_rx).await + client.run(shutdown_rx, update_rx).await } TransportType::Tls => { #[cfg(feature = "tls")] { let mut client = Client::<TlsTransport>::from(config).await?; - client.run(shutdown_rx, service_rx).await + client.run(shutdown_rx, update_rx).await } #[cfg(not(feature = "tls"))] crate::helper::feature_not_compile("tls") @@ -57,7 +57,7 @@ pub async fn run_client( #[cfg(feature = "noise")] { let mut client = Client::<NoiseTransport>::from(config).await?; - client.run(shutdown_rx, service_rx).await + client.run(shutdown_rx, update_rx).await } #[cfg(not(feature = "noise"))] crate::helper::feature_not_compile("noise") @@ -91,7 +91,7 @@ impl<T: 'static + Transport> Client<T> { async fn run( &mut self, mut shutdown_rx: broadcast::Receiver<bool>, - mut service_rx: mpsc::Receiver<ServiceChange>, + mut update_rx: mpsc::Receiver<ConfigChange>, ) -> Result<()> { for (name, config) in &self.config.services { // Create a control channel for each service defined @@ -116,24 +116,9 @@ impl<T: 'static + Transport> Client<T> { } break; }, - e = service_rx.recv() => { + e = update_rx.recv() => { if let Some(e) = e { - match e { - ServiceChange::ClientAdd(s)=> { - let name = s.name.clone(); - let handle = ControlChannelHandle::new( - s, - self.config.remote_addr.clone(), - self.transport.clone(), - self.config.heartbeat_timeout - ); - let _ = self.service_handles.insert(name, handle); - }, - ServiceChange::ClientDelete(s)=> { - let _ = self.service_handles.remove(&s); - }, - _ => () - } + self.handle_hot_reload(e).await; } } } @@ -146,6 +131,27 @@ impl<T: 'static + Transport> Client<T> { Ok(()) } + + async fn handle_hot_reload(&mut self, e: ConfigChange) { + match e { + ConfigChange::ClientChange(client_change) => match client_change { + ClientServiceChange::Add(cfg) => { + let name = cfg.name.clone(); + let handle = ControlChannelHandle::new( + cfg, + self.config.remote_addr.clone(), + self.transport.clone(), + self.config.heartbeat_timeout, + ); + let _ = self.service_handles.insert(name, handle); + } + ClientServiceChange::Delete(s) => { + let _ = self.service_handles.remove(&s); + } + }, + ignored => warn!("Ignored {:?} since running as a client", ignored), + } + } } struct RunDataChannelArgs<T: Transport> { |