diff options
Diffstat (limited to 'src/config_watcher.rs')
-rw-r--r-- | src/config_watcher.rs | 332 |
1 files changed, 245 insertions, 87 deletions
diff --git a/src/config_watcher.rs b/src/config_watcher.rs index 79e84f4..c7c2ff5 100644 --- a/src/config_watcher.rs +++ b/src/config_watcher.rs @@ -1,5 +1,5 @@ use crate::{ - config::{ClientServiceConfig, ServerServiceConfig}, + config::{ClientConfig, ClientServiceConfig, ServerConfig, ServerServiceConfig}, Config, }; use anyhow::{Context, Result}; @@ -13,22 +13,87 @@ use tracing::{error, info, instrument}; #[cfg(feature = "notify")] use notify::{event::ModifyKind, EventKind, RecursiveMode, Watcher}; -#[derive(Debug)] -pub enum ConfigChangeEvent { +#[derive(Debug, PartialEq)] +pub enum ConfigChange { General(Box<Config>), // Trigger a full restart - ServiceChange(ServiceChangeEvent), + ServiceChange(ServiceChange), } -#[derive(Debug)] -pub enum ServiceChangeEvent { +#[derive(Debug, PartialEq)] +pub enum ServiceChange { ClientAdd(ClientServiceConfig), ClientDelete(String), ServerAdd(ServerServiceConfig), ServerDelete(String), } +impl From<ClientServiceConfig> for ServiceChange { + fn from(c: ClientServiceConfig) -> Self { + ServiceChange::ClientAdd(c) + } +} + +impl From<ServerServiceConfig> for ServiceChange { + fn from(c: ServerServiceConfig) -> Self { + ServiceChange::ServerAdd(c) + } +} + +trait InstanceConfig: Clone { + type ServiceConfig: Into<ServiceChange> + PartialEq + Clone; + fn equal_without_service(&self, rhs: &Self) -> bool; + fn to_service_change_delete(s: String) -> ServiceChange; + fn get_services(&self) -> &HashMap<String, Self::ServiceConfig>; +} + +impl InstanceConfig for ServerConfig { + type ServiceConfig = ServerServiceConfig; + fn equal_without_service(&self, rhs: &Self) -> bool { + let left = ServerConfig { + services: Default::default(), + ..self.clone() + }; + + let right = ServerConfig { + services: Default::default(), + ..rhs.clone() + }; + + left == right + } + fn to_service_change_delete(s: String) -> ServiceChange { + ServiceChange::ServerDelete(s) + } + fn get_services(&self) -> &HashMap<String, Self::ServiceConfig> { + &self.services + } +} + +impl InstanceConfig for ClientConfig { + type ServiceConfig = ClientServiceConfig; + fn equal_without_service(&self, rhs: &Self) -> bool { + let left = ClientConfig { + services: Default::default(), + ..self.clone() + }; + + let right = ClientConfig { + services: Default::default(), + ..rhs.clone() + }; + + left == right + } + fn to_service_change_delete(s: String) -> ServiceChange { + ServiceChange::ClientDelete(s) + } + fn get_services(&self) -> &HashMap<String, Self::ServiceConfig> { + &self.services + } +} + pub struct ConfigWatcherHandle { - pub event_rx: mpsc::Receiver<ConfigChangeEvent>, + pub event_rx: mpsc::Receiver<ConfigChange>, } impl ConfigWatcherHandle { @@ -39,7 +104,7 @@ impl ConfigWatcherHandle { // Initial start event_tx - .send(ConfigChangeEvent::General(Box::new(origin_cfg.clone()))) + .send(ConfigChange::General(Box::new(origin_cfg.clone()))) .await .unwrap(); @@ -59,30 +124,33 @@ impl ConfigWatcherHandle { async fn config_watcher( _path: PathBuf, mut shutdown_rx: broadcast::Receiver<bool>, - _cfg_event_tx: mpsc::Sender<ConfigChangeEvent>, + _event_tx: mpsc::Sender<ConfigChange>, _old: Config, ) -> Result<()> { - // Do nothing except wating for ctrl-c + // Do nothing except waiting for ctrl-c let _ = shutdown_rx.recv().await; Ok(()) } #[cfg(feature = "notify")] -#[instrument(skip(shutdown_rx, cfg_event_tx, old))] +#[instrument(skip(shutdown_rx, event_tx, old))] async fn config_watcher( path: PathBuf, mut shutdown_rx: broadcast::Receiver<bool>, - cfg_event_tx: mpsc::Sender<ConfigChangeEvent>, + event_tx: mpsc::Sender<ConfigChange>, mut old: Config, ) -> Result<()> { let (fevent_tx, mut fevent_rx) = mpsc::channel(16); - let mut watcher = notify::recommended_watcher(move |res| match res { - Ok(event) => { - let _ = fevent_tx.blocking_send(event); - } - Err(e) => error!("watch error: {:?}", e), - })?; + let mut watcher = + notify::recommended_watcher(move |res: Result<notify::Event, _>| match res { + Ok(e) => { + if let EventKind::Modify(ModifyKind::Data(_)) = e.kind { + let _ = fevent_tx.blocking_send(true); + } + } + Err(e) => error!("watch error: {:?}", e), + })?; watcher.watch(&path, RecursiveMode::NonRecursive)?; info!("Start watching the config"); @@ -91,12 +159,7 @@ async fn config_watcher( tokio::select! { e = fevent_rx.recv() => { match e { - Some(e) => { - if let EventKind::Modify(kind) = e.kind { - match kind { - ModifyKind::Data(_) => (), - _ => continue - } + Some(_) => { info!("Rescan the configuration"); let new = match Config::from_file(&path).await.with_context(|| "The changed configuration is invalid. Ignored") { Ok(v) => v, @@ -107,12 +170,11 @@ async fn config_watcher( } }; - for event in calculate_event(&old, &new) { - cfg_event_tx.send(event).await?; + for event in calculate_events(&old, &new) { + event_tx.send(event).await?; } old = new; - } }, None => break } @@ -126,74 +188,170 @@ async fn config_watcher( Ok(()) } -fn calculate_event(old: &Config, new: &Config) -> Vec<ConfigChangeEvent> { - let mut ret = Vec::new(); - - if old != new { - if old.server.is_some() && new.server.is_some() { - let mut e: Vec<ConfigChangeEvent> = calculate_service_delete_event( - &old.server.as_ref().unwrap().services, - &new.server.as_ref().unwrap().services, - ) - .into_iter() - .map(|x| ConfigChangeEvent::ServiceChange(ServiceChangeEvent::ServerDelete(x))) - .collect(); - ret.append(&mut e); - - let mut e: Vec<ConfigChangeEvent> = calculate_service_add_event( - &old.server.as_ref().unwrap().services, - &new.server.as_ref().unwrap().services, - ) - .into_iter() - .map(|x| ConfigChangeEvent::ServiceChange(ServiceChangeEvent::ServerAdd(x))) - .collect(); - - ret.append(&mut e); - } else if old.client.is_some() && new.client.is_some() { - let mut e: Vec<ConfigChangeEvent> = calculate_service_delete_event( - &old.client.as_ref().unwrap().services, - &new.client.as_ref().unwrap().services, - ) - .into_iter() - .map(|x| ConfigChangeEvent::ServiceChange(ServiceChangeEvent::ClientDelete(x))) - .collect(); - ret.append(&mut e); - - let mut e: Vec<ConfigChangeEvent> = calculate_service_add_event( - &old.client.as_ref().unwrap().services, - &new.client.as_ref().unwrap().services, - ) - .into_iter() - .map(|x| ConfigChangeEvent::ServiceChange(ServiceChangeEvent::ClientAdd(x))) - .collect(); - - ret.append(&mut e); +fn calculate_events(old: &Config, new: &Config) -> Vec<ConfigChange> { + if old == new { + vec![] + } else if old.server != new.server { + if old.server.is_some() != new.server.is_some() { + vec![ConfigChange::General(Box::new(new.clone()))] + } else { + match calculate_instance_config_events( + old.server.as_ref().unwrap(), + new.server.as_ref().unwrap(), + ) { + Some(v) => v, + None => vec![ConfigChange::General(Box::new(new.clone()))], + } + } + } else if old.client != new.client { + if old.client.is_some() != new.client.is_some() { + vec![ConfigChange::General(Box::new(new.clone()))] } else { - ret.push(ConfigChangeEvent::General(Box::new(new.clone()))); + match calculate_instance_config_events( + old.client.as_ref().unwrap(), + new.client.as_ref().unwrap(), + ) { + Some(v) => v, + None => vec![ConfigChange::General(Box::new(new.clone()))], + } } + } else { + vec![] + } +} + +// None indicates a General change needed +fn calculate_instance_config_events<T: InstanceConfig>( + old: &T, + new: &T, +) -> Option<Vec<ConfigChange>> { + if !old.equal_without_service(new) { + return None; } - ret + let old = old.get_services(); + let new = new.get_services(); + + let mut v = vec![]; + v.append(&mut calculate_service_delete_events::<T>(old, new)); + v.append(&mut calculate_service_add_events(old, new)); + + Some(v.into_iter().map(ConfigChange::ServiceChange).collect()) } -fn calculate_service_delete_event<T: PartialEq>( - old_services: &HashMap<String, T>, - new_services: &HashMap<String, T>, -) -> Vec<String> { - old_services - .keys() - .filter(|&name| old_services.get(name) != new_services.get(name)) - .map(|x| x.to_owned()) +fn calculate_service_delete_events<T: InstanceConfig>( + old: &HashMap<String, T::ServiceConfig>, + new: &HashMap<String, T::ServiceConfig>, +) -> Vec<ServiceChange> { + old.keys() + .filter(|&name| new.get(name).is_none()) + .map(|x| T::to_service_change_delete(x.to_owned())) .collect() } -fn calculate_service_add_event<T: PartialEq + Clone>( - old_services: &HashMap<String, T>, - new_services: &HashMap<String, T>, -) -> Vec<T> { - new_services - .iter() - .filter(|(name, _)| old_services.get(*name) != new_services.get(*name)) - .map(|(_, c)| c.clone()) +fn calculate_service_add_events<T: PartialEq + Clone + Into<ServiceChange>>( + old: &HashMap<String, T>, + new: &HashMap<String, T>, +) -> Vec<ServiceChange> { + new.iter() + .filter(|(name, c)| old.get(*name) != Some(*c)) + .map(|(_, c)| c.clone().into()) .collect() } + +#[cfg(test)] +mod test { + use crate::config::ServerConfig; + + use super::*; + + // macro to create map or set literal + macro_rules! collection { + // map-like + ($($k:expr => $v:expr),* $(,)?) => {{ + use std::iter::{Iterator, IntoIterator}; + Iterator::collect(IntoIterator::into_iter([$(($k, $v),)*])) + }}; + } + + #[test] + fn test_calculate_events() { + struct Test { + old: Config, + new: Config, + } + + let tests = [ + Test { + old: Config { + server: Some(Default::default()), + client: None, + }, + new: Config { + server: Some(Default::default()), + client: Some(Default::default()), + }, + }, + Test { + old: Config { + server: Some(ServerConfig { + bind_addr: String::from("127.0.0.1:2334"), + ..Default::default() + }), + client: None, + }, + new: Config { + server: Some(ServerConfig { + bind_addr: String::from("127.0.0.1:2333"), + services: collection!(String::from("foo") => Default::default()), + ..Default::default() + }), + client: None, + }, + }, + Test { + old: Config { + server: Some(Default::default()), + client: None, + }, + new: Config { + server: Some(ServerConfig { + services: collection!(String::from("foo") => Default::default()), + ..Default::default() + }), + client: None, + }, + }, + Test { + old: Config { + server: Some(ServerConfig { + services: collection!(String::from("foo") => Default::default()), + ..Default::default() + }), + client: None, + }, + new: Config { + server: Some(Default::default()), + client: None, + }, + }, + ]; + let expected = [ + vec![ConfigChange::General(Box::new(tests[0].new.clone()))], + vec![ConfigChange::General(Box::new(tests[1].new.clone()))], + vec![ConfigChange::ServiceChange(ServiceChange::ServerAdd( + Default::default(), + ))], + vec![ConfigChange::ServiceChange(ServiceChange::ServerDelete( + String::from("foo"), + ))], + ]; + + assert_eq!(tests.len(), expected.len()); + + for i in 0..tests.len() { + let actual = calculate_events(&tests[i].old, &tests[i].new); + assert_eq!(actual, expected[i]); + } + } +} |