diff options
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 48 |
1 files changed, 25 insertions, 23 deletions
diff --git a/src/server.rs b/src/server.rs index 4c08640..6ad91ee 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,5 @@ use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType}; -use crate::config_watcher::ServiceChange; +use crate::config_watcher::{ConfigChange, ServerServiceChange}; use crate::constants::{listen_backoff, UDP_BUFFER_SIZE}; use crate::helper::retry_notify_with_deadline; use crate::multi_map::MultiMap; @@ -40,7 +40,7 @@ const HANDSHAKE_TIMEOUT: u64 = 5; // Timeout for transport handshake pub async fn run_server( config: Config, shutdown_rx: broadcast::Receiver<bool>, - service_rx: mpsc::Receiver<ServiceChange>, + update_rx: mpsc::Receiver<ConfigChange>, ) -> Result<()> { let config = match config.server { Some(config) => config, @@ -52,13 +52,13 @@ pub async fn run_server( match config.transport.transport_type { TransportType::Tcp => { let mut server = Server::<TcpTransport>::from(config).await?; - server.run(shutdown_rx, service_rx).await?; + server.run(shutdown_rx, update_rx).await?; } TransportType::Tls => { #[cfg(feature = "tls")] { let mut server = Server::<TlsTransport>::from(config).await?; - server.run(shutdown_rx, service_rx).await?; + server.run(shutdown_rx, update_rx).await?; } #[cfg(not(feature = "tls"))] crate::helper::feature_not_compile("tls") @@ -67,7 +67,7 @@ pub async fn run_server( #[cfg(feature = "noise")] { let mut server = Server::<NoiseTransport>::from(config).await?; - server.run(shutdown_rx, service_rx).await?; + server.run(shutdown_rx, update_rx).await?; } #[cfg(not(feature = "noise"))] crate::helper::feature_not_compile("noise") @@ -124,7 +124,7 @@ impl<T: 'static + Transport> Server<T> { pub async fn run( &mut self, mut shutdown_rx: broadcast::Receiver<bool>, - mut service_rx: mpsc::Receiver<ServiceChange>, + mut update_rx: mpsc::Receiver<ConfigChange>, ) -> Result<()> { // Listen at `server.bind_addr` let l = self @@ -198,7 +198,7 @@ impl<T: 'static + Transport> Server<T> { info!("Shuting down gracefully..."); break; }, - e = service_rx.recv() => { + e = update_rx.recv() => { if let Some(e) = e { self.handle_hot_reload(e).await; } @@ -211,24 +211,26 @@ impl<T: 'static + Transport> Server<T> { Ok(()) } - async fn handle_hot_reload(&mut self, e: ServiceChange) { + async fn handle_hot_reload(&mut self, e: ConfigChange) { match e { - ServiceChange::ServerAdd(s) => { - let hash = protocol::digest(s.name.as_bytes()); - let mut wg = self.services.write().await; - let _ = wg.insert(hash, s); - - let mut wg = self.control_channels.write().await; - let _ = wg.remove1(&hash); - } - ServiceChange::ServerDelete(s) => { - let hash = protocol::digest(s.as_bytes()); - let _ = self.services.write().await.remove(&hash); + ConfigChange::ServerChange(server_change) => match server_change { + ServerServiceChange::Add(cfg) => { + let hash = protocol::digest(cfg.name.as_bytes()); + let mut wg = self.services.write().await; + let _ = wg.insert(hash, cfg); + + let mut wg = self.control_channels.write().await; + let _ = wg.remove1(&hash); + } + ServerServiceChange::Delete(s) => { + let hash = protocol::digest(s.as_bytes()); + let _ = self.services.write().await.remove(&hash); - let mut wg = self.control_channels.write().await; - let _ = wg.remove1(&hash); - } - _ => (), + let mut wg = self.control_channels.write().await; + let _ = wg.remove1(&hash); + } + }, + ignored => warn!("Ignored {:?} since running as a server", ignored), } } } |