aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs48
1 files changed, 25 insertions, 23 deletions
diff --git a/src/server.rs b/src/server.rs
index 4c08640..6ad91ee 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,5 +1,5 @@
use crate::config::{Config, ServerConfig, ServerServiceConfig, ServiceType, TransportType};
-use crate::config_watcher::ServiceChange;
+use crate::config_watcher::{ConfigChange, ServerServiceChange};
use crate::constants::{listen_backoff, UDP_BUFFER_SIZE};
use crate::helper::retry_notify_with_deadline;
use crate::multi_map::MultiMap;
@@ -40,7 +40,7 @@ const HANDSHAKE_TIMEOUT: u64 = 5; // Timeout for transport handshake
pub async fn run_server(
config: Config,
shutdown_rx: broadcast::Receiver<bool>,
- service_rx: mpsc::Receiver<ServiceChange>,
+ update_rx: mpsc::Receiver<ConfigChange>,
) -> Result<()> {
let config = match config.server {
Some(config) => config,
@@ -52,13 +52,13 @@ pub async fn run_server(
match config.transport.transport_type {
TransportType::Tcp => {
let mut server = Server::<TcpTransport>::from(config).await?;
- server.run(shutdown_rx, service_rx).await?;
+ server.run(shutdown_rx, update_rx).await?;
}
TransportType::Tls => {
#[cfg(feature = "tls")]
{
let mut server = Server::<TlsTransport>::from(config).await?;
- server.run(shutdown_rx, service_rx).await?;
+ server.run(shutdown_rx, update_rx).await?;
}
#[cfg(not(feature = "tls"))]
crate::helper::feature_not_compile("tls")
@@ -67,7 +67,7 @@ pub async fn run_server(
#[cfg(feature = "noise")]
{
let mut server = Server::<NoiseTransport>::from(config).await?;
- server.run(shutdown_rx, service_rx).await?;
+ server.run(shutdown_rx, update_rx).await?;
}
#[cfg(not(feature = "noise"))]
crate::helper::feature_not_compile("noise")
@@ -124,7 +124,7 @@ impl<T: 'static + Transport> Server<T> {
pub async fn run(
&mut self,
mut shutdown_rx: broadcast::Receiver<bool>,
- mut service_rx: mpsc::Receiver<ServiceChange>,
+ mut update_rx: mpsc::Receiver<ConfigChange>,
) -> Result<()> {
// Listen at `server.bind_addr`
let l = self
@@ -198,7 +198,7 @@ impl<T: 'static + Transport> Server<T> {
info!("Shuting down gracefully...");
break;
},
- e = service_rx.recv() => {
+ e = update_rx.recv() => {
if let Some(e) = e {
self.handle_hot_reload(e).await;
}
@@ -211,24 +211,26 @@ impl<T: 'static + Transport> Server<T> {
Ok(())
}
- async fn handle_hot_reload(&mut self, e: ServiceChange) {
+ async fn handle_hot_reload(&mut self, e: ConfigChange) {
match e {
- ServiceChange::ServerAdd(s) => {
- let hash = protocol::digest(s.name.as_bytes());
- let mut wg = self.services.write().await;
- let _ = wg.insert(hash, s);
-
- let mut wg = self.control_channels.write().await;
- let _ = wg.remove1(&hash);
- }
- ServiceChange::ServerDelete(s) => {
- let hash = protocol::digest(s.as_bytes());
- let _ = self.services.write().await.remove(&hash);
+ ConfigChange::ServerChange(server_change) => match server_change {
+ ServerServiceChange::Add(cfg) => {
+ let hash = protocol::digest(cfg.name.as_bytes());
+ let mut wg = self.services.write().await;
+ let _ = wg.insert(hash, cfg);
+
+ let mut wg = self.control_channels.write().await;
+ let _ = wg.remove1(&hash);
+ }
+ ServerServiceChange::Delete(s) => {
+ let hash = protocol::digest(s.as_bytes());
+ let _ = self.services.write().await.remove(&hash);
- let mut wg = self.control_channels.write().await;
- let _ = wg.remove1(&hash);
- }
- _ => (),
+ let mut wg = self.control_channels.write().await;
+ let _ = wg.remove1(&hash);
+ }
+ },
+ ignored => warn!("Ignored {:?} since running as a server", ignored),
}
}
}