diff options
author | 2024-12-14 11:25:31 -0800 | |
---|---|---|
committer | 2024-12-14 13:35:02 -0800 | |
commit | ed73a37d2498a51af3cbefe69f46223b103d71f5 (patch) | |
tree | 25123738c474c15d1eddfd67a546e0887cd9f606 /src | |
download | restic-operator-ed73a37d2498a51af3cbefe69f46223b103d71f5.tar.gz restic-operator-ed73a37d2498a51af3cbefe69f46223b103d71f5.tar.zst restic-operator-ed73a37d2498a51af3cbefe69f46223b103d71f5.zip |
Initial Commit
Diffstat (limited to 'src')
-rw-r--r-- | src/backup/deploy.rs | 44 | ||||
-rw-r--r-- | src/backup/job.rs | 87 | ||||
-rw-r--r-- | src/backup/mod.rs | 96 | ||||
-rw-r--r-- | src/context.rs | 18 | ||||
-rw-r--r-- | src/deploy.rs | 43 | ||||
-rw-r--r-- | src/error.rs | 12 | ||||
-rw-r--r-- | src/finalizer.rs | 86 | ||||
-rw-r--r-- | src/jobspec.rs | 284 | ||||
-rw-r--r-- | src/main.rs | 42 | ||||
-rw-r--r-- | src/resticprofile/config.rs | 245 | ||||
-rw-r--r-- | src/resticprofile/mod.rs | 238 | ||||
-rw-r--r-- | src/schedule/cronjob.rs | 131 | ||||
-rw-r--r-- | src/schedule/deploy.rs | 45 | ||||
-rw-r--r-- | src/schedule/mod.rs | 110 |
14 files changed, 1481 insertions, 0 deletions
diff --git a/src/backup/deploy.rs b/src/backup/deploy.rs new file mode 100644 index 0000000..0ab0be8 --- /dev/null +++ b/src/backup/deploy.rs @@ -0,0 +1,44 @@ +use kube::{Client, ResourceExt}; +use restic_crd::Backup; + +use super::job::BackupJob; +use crate::{ + deploy::{Deployable, Labels}, + resticprofile::ResticProfile, + Error, +}; + +#[derive(Debug, Clone)] +pub struct BackupDeployment { + profile: ResticProfile, + job: BackupJob, +} + +impl BackupDeployment { + pub fn new(ns: String, backup: &Backup) -> Self { + let profile = ResticProfile::new(ns.clone(), backup.name_any(), &backup.spec); + let job = BackupJob::new(ns, backup, profile.name()); + Self { profile, job } + } +} + +impl Deployable for BackupDeployment { + type Error = Error; + + async fn create<O>(&self, client: Client, owner: &O, labels: Labels) -> Result<(), Self::Error> + where + O: kube::Resource<DynamicType = ()> + Send + Sync, + { + self.profile + .create(client.clone(), owner, labels.clone()) + .await?; + self.job.create(client, owner, labels).await?; + Ok(()) + } + + async fn delete(&self, client: Client) -> Result<(), Self::Error> { + self.profile.delete(client.clone()).await?; + self.job.delete(client).await?; + Ok(()) + } +} diff --git a/src/backup/job.rs b/src/backup/job.rs new file mode 100644 index 0000000..380c3cd --- /dev/null +++ b/src/backup/job.rs @@ -0,0 +1,87 @@ +use k8s_openapi::{api::batch::v1::Job, apimachinery::pkg::apis::meta::v1::OwnerReference}; +use kube::{api::ObjectMeta, Api, Resource, ResourceExt}; +use restic_crd::Backup; + +use crate::{deploy::Deployable, jobspec::BackupJobSpec, Error}; + +#[derive(Debug, Clone)] +pub struct BackupJob { + name: String, + ns: String, + spec: BackupJobSpec, +} + +impl BackupJob { + pub fn new(ns: impl Into<String>, backup: &Backup, config_name: impl Into<String>) -> Self { + let spec = BackupJobSpec::new(&backup.spec, config_name); + Self { + name: format!("{}-job", backup.name_any()), + ns: ns.into(), + spec, + } + } + + async fn get(&self, client: kube::Client) -> Result<Option<Job>, Error> { + let api: Api<Job> = Api::namespaced(client, &self.ns); + match api.get(&self.name).await { + Ok(c) => Ok(Some(c)), + Err(kube::Error::Api(ae)) => { + if ae.code == 404 { + Ok(None) + } else { + Err(Error::KubeError(kube::Error::Api(ae))) + } + } + Err(e) => Err(Error::KubeError(e)), + } + } +} + +impl Deployable for BackupJob { + type Error = Error; + + async fn create<O>( + &self, + client: kube::Client, + owner: &O, + labels: crate::deploy::Labels, + ) -> Result<(), Self::Error> + where + O: Resource<DynamicType = ()> + Send + Sync, + { + let job = Job { + metadata: ObjectMeta { + name: Some(self.name.clone()), + namespace: Some(self.ns.clone()), + labels: Some(labels.to_labels()), + owner_references: O::meta(owner).uid.clone().map(|uid| { + vec![OwnerReference { + api_version: O::api_version(&()).into_owned(), + block_owner_deletion: Some(true), + controller: Some(true), + kind: O::kind(&()).into_owned(), + name: O::name_any(owner), + uid, + }] + }), + ..Default::default() + }, + spec: Some(self.spec.clone().into()), + ..Default::default() + }; + + let api: Api<Job> = Api::namespaced(client, &self.ns); + api.create(&Default::default(), &job).await?; + + Ok(()) + } + + async fn delete(&self, client: kube::Client) -> Result<(), Self::Error> { + let job = self.get(client.clone()).await?; + if let Some(job) = job { + let api: Api<Job> = Api::namespaced(client, &self.ns); + api.delete(&job.name_any(), &Default::default()).await?; + } + Ok(()) + } +} diff --git a/src/backup/mod.rs b/src/backup/mod.rs new file mode 100644 index 0000000..16d60a3 --- /dev/null +++ b/src/backup/mod.rs @@ -0,0 +1,96 @@ +use std::{sync::Arc, time::Duration}; + +use deploy::BackupDeployment; +use futures::StreamExt; +use kube::{ + runtime::{controller::Action, watcher::Config, Controller}, + Api, Client, Resource, ResourceExt, +}; +use restic_crd::Backup; +use tracing::{error, info}; + +use crate::{ + context::ContextData, + deploy::{Deployable, Labels}, + finalizer::{self, FINALIZER}, + Error, +}; + +mod deploy; +mod job; + +pub async fn run_controller(client: Client) { + let crd_api: Api<Backup> = Api::all(client.clone()); + let context: Arc<ContextData> = Arc::new(ContextData::new(client)); + + Controller::new(crd_api, Config::default()) + .run(reconcile, on_error, context) + .for_each(|reconciliation_result| async move { + match reconciliation_result { + Ok(echo_resource) => { + info!("Reconciliation successful. Resource: {:?}", echo_resource); + } + Err(reconciliation_err) => { + error!(%reconciliation_err, "Reconciliation error") + } + } + }) + .await; +} + +async fn reconcile(backup: Arc<Backup>, context: Arc<ContextData>) -> Result<Action, Error> { + let client = context.client.clone(); + + let ns = backup.namespace().ok_or(Error::MissingNamespace)?; + let name = backup.name_any(); + + match determine_action(&backup) { + BackupAction::Create => { + finalizer::add(&Api::<Backup>::namespaced(client.clone(), &ns), &name).await?; + + let deployment = BackupDeployment::new(ns, &backup); + let labels = Labels::new(name); + deployment.create(client, &*backup, labels).await?; + Ok(Action::requeue(Duration::from_secs(10))) + } + BackupAction::Delete => { + let deployment = BackupDeployment::new(ns.clone(), &backup); + deployment.delete(client.clone()).await?; + + finalizer::remove(&Api::<Backup>::namespaced(client, &ns), &name).await?; + + Ok(Action::await_change()) + } + BackupAction::Noop => Ok(Action::requeue(Duration::from_secs(10))), + } +} + +fn determine_action(backup: &Backup) -> BackupAction { + if backup.meta().deletion_timestamp.is_some() { + BackupAction::Delete + } else if backup + .meta() + .finalizers + .as_ref() + .map_or(true, |f| !f.iter().any(|x| x == FINALIZER)) + { + BackupAction::Create + } else { + BackupAction::Noop + } +} + +fn on_error(backup: Arc<Backup>, error: &Error, _context: Arc<ContextData>) -> Action { + error!("Reconciliation error:\n{:?}.\n{:?}", error, backup); + Action::requeue(Duration::from_secs(5)) +} + +/// Possible actions to take on a [`Backup`] resource +enum BackupAction { + /// Create the sub-resources for the backup + Create, + /// Delete the sub-resources for the backup + Delete, + /// No operation required. Update the status if needed. + Noop, +} diff --git a/src/context.rs b/src/context.rs new file mode 100644 index 0000000..c8be716 --- /dev/null +++ b/src/context.rs @@ -0,0 +1,18 @@ +use kube::Client; + +/// Context injected with each `reconcile` and `on_error` method invocation. +pub struct ContextData { + /// Kubernetes client to make Kubernetes API requests with. + pub client: Client, +} + +impl ContextData { + /// Constructs a new instance of ContextData. + /// + /// # Arguments: + /// - `client`: A Kubernetes client to make Kubernetes REST API requests with. Resources + /// will be created and deleted with this client. + pub fn new(client: Client) -> Self { + ContextData { client } + } +} diff --git a/src/deploy.rs b/src/deploy.rs new file mode 100644 index 0000000..f73399b --- /dev/null +++ b/src/deploy.rs @@ -0,0 +1,43 @@ +use std::{collections::BTreeMap, future::Future}; + +use kube::{Client, Resource}; + +pub trait Deployable { + type Error; + + fn create<O>( + &self, + client: Client, + owner: &O, + labels: Labels, + ) -> impl Future<Output = Result<(), Self::Error>> + Send + where + O: Resource<DynamicType = ()> + Send + Sync; + fn delete(&self, client: Client) -> impl Future<Output = Result<(), Self::Error>> + Send; +} + +#[derive(Debug, Clone)] +pub struct Labels { + app_name: String, +} + +impl Labels { + pub fn new(app_name: impl Into<String>) -> Self { + Self { + app_name: app_name.into(), + } + } + + pub fn to_labels(&self) -> BTreeMap<String, String> { + let mut labels = BTreeMap::new(); + labels.insert( + "app.kubernetes.io/name".to_owned(), + self.app_name.to_owned(), + ); + labels.insert( + "app.kubernetes.io/managed-by".to_owned(), + "restic-operator".to_owned(), + ); + labels + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..fa28b1f --- /dev/null +++ b/src/error.rs @@ -0,0 +1,12 @@ +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Any error originating from the `kube-rs` crate + #[error("Kubernetes reported error: {0}")] + KubeError(#[from] kube::Error), + /// Error in serializing the resticprofile config to TOML + #[error("Error creating resticprofile config: {0}")] + TomlSerializeError(#[from] toml::ser::Error), + /// Missing Namespace + #[error("Namespace not found")] + MissingNamespace, +} diff --git a/src/finalizer.rs b/src/finalizer.rs new file mode 100644 index 0000000..5e7581a --- /dev/null +++ b/src/finalizer.rs @@ -0,0 +1,86 @@ +use kube::{ + api::{Patch, PatchParams}, + Api, Error, +}; +use serde::de::DeserializeOwned; +use serde_json::{json, Value}; + +pub const FINALIZER: &str = "restic.anshulg.com/finalizer"; + +/// Adds a finalizer to the given resource. +pub async fn add<K>(api: &Api<K>, name: &str) -> Result<K, Error> +where + K: Clone + DeserializeOwned + std::fmt::Debug, +{ + let finalizer: Value = json!({ + "metadata": { + "finalizers": [FINALIZER] + } + }); + + let patch = Patch::Merge(&finalizer); + api.patch(name, &PatchParams::default(), &patch).await +} + +/// Removes all finalizers from the given resource. +pub async fn remove<K>(api: &Api<K>, name: &str) -> Result<K, Error> +where + K: Clone + DeserializeOwned + std::fmt::Debug, +{ + let finalizer: Value = json!({ + "metadata": { + "finalizers": null + } + }); + + let patch = Patch::Merge(&finalizer); + api.patch(name, &PatchParams::default(), &patch).await +} + +#[cfg(test)] +mod tests { + use k8s_openapi::api::core::v1::ConfigMap; + use kube::{api::ObjectMeta, Client, ResourceExt}; + + use super::*; + + #[tokio::test] + #[cfg_attr( + not(feature = "integration-tests"), + ignore = "uses k8s current-context" + )] + async fn test_finalizer() { + let client = Client::try_default().await.unwrap(); + let api: Api<ConfigMap> = Api::namespaced(client, "default"); + + // Create a test ConfigMap + let cm = ConfigMap { + metadata: ObjectMeta { + name: Some("test-finalizer-add".to_string()), + ..Default::default() + }, + ..Default::default() + }; + let cm = api.create(&Default::default(), &cm).await.unwrap(); + + // Check for no finalizers + assert!(cm.metadata.finalizers.is_none()); + + // Add the finalizer + let cm = add(&api, &cm.name_any()).await.unwrap(); + + // Check for the finalizer + assert_eq!(cm.metadata.finalizers, Some(vec![FINALIZER.to_string()])); + + // Remove the finalizer + let cm = remove(&api, &cm.name_any()).await.unwrap(); + + // Check for no finalizers + assert!(cm.metadata.finalizers.is_none()); + + // Clean up + api.delete(&cm.name_any(), &Default::default()) + .await + .unwrap(); + } +} diff --git a/src/jobspec.rs b/src/jobspec.rs new file mode 100644 index 0000000..212172c --- /dev/null +++ b/src/jobspec.rs @@ -0,0 +1,284 @@ +use k8s_openapi::api::{ + batch::v1::JobSpec, + core::v1::{ + Affinity, ConfigMapVolumeSource, Container, EnvFromSource, EnvVar, EnvVarSource, PodSpec, + PodTemplateSpec, ResourceRequirements, SecretVolumeSource, SecurityContext, Volume, + VolumeMount, + }, +}; +use restic_crd::{BackupSpec, ResticProfileConfig}; + +const DEFAULT_RESTIC_IMAGE: &str = "creativeprojects/resticprofile"; + +#[derive(Debug, Clone)] +pub struct BackupJobSpec { + image: String, + image_pull_policy: Option<String>, + args: Option<Vec<String>>, + command: Option<Vec<String>>, + env: Vec<EnvVar>, + env_from: Vec<EnvFromSource>, + resources: Option<ResourceRequirements>, + security_context: Option<SecurityContext>, + affinity: Option<Affinity>, + node_selector: Option<std::collections::BTreeMap<String, String>>, + service_account_name: Option<String>, + volume_mounts: Vec<VolumeMount>, + volumes: Vec<Volume>, +} + +impl BackupJobSpec { + pub fn new(backup: &BackupSpec, config_name: impl Into<String>) -> Self { + let mut rpcfg = backup.restic_profile.clone().unwrap_or_default(); + let image = get_image(&mut rpcfg); + let env = fill_env(backup, &mut rpcfg); + let (volume_mounts, volumes) = fill_volume_mounts(backup, config_name); + + Self { + image, + image_pull_policy: rpcfg.image_pull_policy.take(), + args: rpcfg.args.take(), + command: rpcfg.command.take(), + env, + env_from: rpcfg.env_from.take().unwrap_or_default(), + resources: rpcfg.resources.take(), + security_context: rpcfg.security_context.take(), + affinity: rpcfg.affinity.take(), + node_selector: rpcfg.node_selector.take(), + service_account_name: rpcfg.service_account_name.take(), + volume_mounts, + volumes, + } + } +} + +impl From<BackupJobSpec> for JobSpec { + fn from(value: BackupJobSpec) -> Self { + Self { + suspend: Some(false), + template: PodTemplateSpec { + spec: Some(PodSpec { + affinity: value.affinity, + containers: vec![Container { + name: "restic-backup".to_owned(), + args: value.args, + command: value.command, + env: Some(value.env), + env_from: Some(value.env_from), + image: Some(value.image), + image_pull_policy: value.image_pull_policy, + resources: value.resources, + security_context: value.security_context, + volume_mounts: Some(value.volume_mounts), + ..Default::default() + }], + restart_policy: Some("OnFailure".to_string()), + node_selector: value.node_selector, + service_account_name: value.service_account_name, + volumes: Some(value.volumes), + ..Default::default() + }), + ..Default::default() + }, + ..Default::default() + } + } +} + +fn get_image(cfg: &mut ResticProfileConfig) -> String { + cfg.image.take().unwrap_or_else(|| { + format!( + "{}:{}", + DEFAULT_RESTIC_IMAGE, + cfg.version.as_deref().unwrap_or("latest") + ) + }) +} + +fn fill_env(backup: &BackupSpec, rpcfg: &mut ResticProfileConfig) -> Vec<EnvVar> { + let mut env = rpcfg.env.take().unwrap_or_default(); + + if let Some(rest_creds) = &backup.restic.repository.rest_credentials { + env.push(EnvVar { + name: "RESTIC_REST_USERNAME".to_string(), + value_from: Some(EnvVarSource { + secret_key_ref: Some(rest_creds.username.clone()), + ..Default::default() + }), + ..Default::default() + }); + env.push(EnvVar { + name: "RESTIC_REST_PASSWORD".to_string(), + value_from: Some(EnvVarSource { + secret_key_ref: Some(rest_creds.password.clone()), + ..Default::default() + }), + ..Default::default() + }); + } + + env +} + +fn fill_volume_mounts( + backup: &BackupSpec, + config_name: impl Into<String>, +) -> (Vec<VolumeMount>, Vec<Volume>) { + let mut mounts = Vec::new(); + let mut volumes = Vec::new(); + + // Add volume mount for resticprofile config + mounts.push(VolumeMount { + mount_path: "/resticprofile/profiles.toml".to_owned(), + name: "profile".to_owned(), + sub_path: Some("profiles.toml".to_owned()), + ..Default::default() + }); + volumes.push(Volume { + name: "profile".to_owned(), + config_map: Some(ConfigMapVolumeSource { + name: config_name.into(), + ..Default::default() + }), + ..Default::default() + }); + + // Add volume mount for restic repository password + mounts.push(VolumeMount { + mount_path: "/resticprofile/password.txt".to_owned(), + name: "restic-password".to_owned(), + sub_path: Some(backup.restic.repository.password.key.clone()), + ..Default::default() + }); + volumes.push(Volume { + name: "restic-password".to_owned(), + secret: Some(SecretVolumeSource { + secret_name: Some(backup.restic.repository.password.name.clone()), + ..Default::default() + }), + ..Default::default() + }); + + // Add other volume mounts + if let Some(vol_backup) = &backup.volume { + mounts.extend_from_slice(&vol_backup.mounts); + volumes.extend_from_slice(&vol_backup.volumes); + }; + + (mounts, volumes) +} + +#[cfg(test)] +mod tests { + use k8s_openapi::api::core::v1::SecretKeySelector; + use restic_crd::{Repository, RepositoryType, RestCredentials, ResticConfig, VolumeBackup}; + + use super::*; + + const CONFIG_NAME: &str = "test-config"; + + fn create_backup() -> BackupSpec { + BackupSpec { + restic: ResticConfig::builder() + .repository(Repository { + r#type: RepositoryType::Rest, + uri: "https://example.com".to_string(), + rest_credentials: Some(RestCredentials { + username: SecretKeySelector { + name: "restic-secret".to_string(), + key: "username".to_string(), + ..Default::default() + }, + password: SecretKeySelector { + name: "restic-secret".to_string(), + key: "password".to_string(), + ..Default::default() + }, + }), + password: SecretKeySelector { + name: "restic-password".to_string(), + key: "password.txt".to_string(), + ..Default::default() + }, + }) + .build(), + volume: None, + restic_profile: Some(ResticProfileConfig { + image: Some("custom/restic:latest".to_string()), + version: Some("v1.0.0".to_string()), + ..Default::default() + }), + } + } + + #[test] + fn test_jobspec_new() { + let backup = create_backup(); + let job_spec = BackupJobSpec::new(&backup, CONFIG_NAME); + + assert_eq!(job_spec.image, "custom/restic:latest"); + assert_eq!(job_spec.env.len(), 2); + assert_eq!(job_spec.volume_mounts.len(), 2); + assert_eq!(job_spec.volumes.len(), 2); + } + + #[test] + fn test_default_image() { + let mut backup = create_backup(); + backup.restic_profile.as_mut().unwrap().image = None; + let job = BackupJobSpec::new(&backup, CONFIG_NAME); + + assert_eq!(job.image, "creativeprojects/resticprofile:v1.0.0"); + } + + #[test] + fn test_default_image_tag() { + let mut backup = create_backup(); + backup.restic_profile.as_mut().unwrap().image = None; + backup.restic_profile.as_mut().unwrap().version = None; + let job = BackupJobSpec::new(&backup, CONFIG_NAME); + + assert_eq!(job.image, "creativeprojects/resticprofile:latest"); + } + + #[test] + fn test_fill_env_with_no_credentials() { + let mut backup = create_backup(); + backup.restic.repository.rest_credentials = None; + let mut rpcfg = backup.restic_profile.clone().unwrap_or_default(); + let env = fill_env(&backup, &mut rpcfg); + + assert!(env.is_empty()); + } + + #[test] + fn test_fill_volume_mounts_with_no_volume() { + let backup = create_backup(); + let config_name = "test-config"; + let (volume_mounts, volumes) = fill_volume_mounts(&backup, config_name); + + assert_eq!(volume_mounts.len(), 2); + assert_eq!(volumes.len(), 2); + } + + #[test] + fn test_fill_volume_mounts_with_volume() { + let mut backup = create_backup(); + backup.volume = Some(VolumeBackup { + mounts: vec![VolumeMount { + mount_path: "/data".to_string(), + name: "data-volume".to_string(), + ..Default::default() + }], + volumes: vec![Volume { + name: "data-volume".to_string(), + ..Default::default() + }], + }); + let config_name = "test-config"; + let (volume_mounts, volumes) = fill_volume_mounts(&backup, config_name); + + assert_eq!(volume_mounts.len(), 3); + assert_eq!(volumes.len(), 3); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..dfa2edf --- /dev/null +++ b/src/main.rs @@ -0,0 +1,42 @@ +use kube::Client; +use tracing::{info, level_filters::LevelFilter}; +use tracing_subscriber::EnvFilter; + +mod backup; +mod context; +mod deploy; +mod error; +mod finalizer; +mod jobspec; +mod resticprofile; +mod schedule; + +pub use error::Error; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .init(); + std::panic::set_hook(Box::new(tracing_panic::panic_hook)); + + let k8s_client = Client::try_default() + .await + .expect("Expected a valid KUBECONFIG environment variable."); + + let signal = tokio::signal::ctrl_c(); + let backup_fut = tokio::spawn(backup::run_controller(k8s_client.clone())); + let schedule_fut = tokio::spawn(schedule::run_controller(k8s_client.clone())); + + tokio::select! { + _ = signal => {} + _ = backup_fut => {} + _ = schedule_fut => {} + } + + info!("Successfully shut down.") +} diff --git a/src/resticprofile/config.rs b/src/resticprofile/config.rs new file mode 100644 index 0000000..b19ef2e --- /dev/null +++ b/src/resticprofile/config.rs @@ -0,0 +1,245 @@ +use std::collections::HashMap; + +use bon::Builder; +use serde::{Deserialize, Serialize}; + +pub const DEFAULT_PROFILE: &str = "default"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)] +#[non_exhaustive] +pub struct ResticProfileConfig { + #[builder(default)] + pub version: ResticProfileVersion, + pub global: Option<ResticProfileGlobal>, + #[serde(flatten)] + #[builder(default)] + pub profiles: HashMap<String, ResticProfileProfile>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[non_exhaustive] +pub enum ResticProfileVersion { + #[default] + #[serde(rename = "1")] + V1, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)] +#[non_exhaustive] +#[serde(rename_all = "kebab-case")] +pub struct ResticProfileGlobal { + /// Minimum available memory (in MB) required to run any commands. + pub min_memory: Option<u64>, + /// Time to wait before trying to get a lock on a restic repository. + pub restic_lock_retry_after: Option<String>, + /// The age an unused lock on a restic repository must have at least before resticprofile attempts to unlock. + pub restic_stale_lock_age: Option<String>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)] +#[non_exhaustive] +#[serde(rename_all = "kebab-case")] +pub struct ResticProfileProfile { + /// File to load root certificates from (default: use system certificates or $RESTIC_CACERT). + pub cacert: Option<String>, + /// Set the cache directory. (default: use system default cache directory). + pub cache_dir: Option<String>, + /// Compression mode (only available for repository format version 2), one of (auto/off/max) (default: $RESTIC_COMPRESSION). + pub compression: Option<String>, + /// Set a http user agent for outgoing http requests. + pub http_user_agent: Option<String>, + /// Skip TLS certificate verification when connecting to the repository (insecure). + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub insecure_tls: bool, + /// Limits downloads to a maximum rate in KiB/s. (default: unlimited). + pub limit_download: Option<u64>, + /// Limits uploads to a maximum rate in KiB/s. (default: unlimited). + pub limit_upload: Option<u64>, + /// Do not use a local cache + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub no_cache: bool, + /// Skip additional verification of data before upload (see documentation) + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub no_extra_verify: bool, + /// set target pack size in MiB, created pack files may be larger (default: $RESTIC_PACK_SIZE). + pub pack_size: Option<u64>, + /// File to read the repository password from. + pub password_file: Option<String>, + /// Repository to backup to or restore from (default: $RESTIC_REPOSITORY). + pub repository: Option<String>, + /// Path to a file containing PEM encoded TLS client certificate and private key (default: $RESTIC_TLS_CLIENT_CERT). + pub tls_client_cert: Option<String>, + /// Be verbose + pub verbose: Option<u8>, + + /// This section configures restic command `backup`. + pub backup: Option<ResticProfileProfileBackup>, + /// This section configures restic command `forget`. + pub retention: Option<ResticProfileProfileRetention>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)] +#[non_exhaustive] +#[serde(rename_all = "kebab-case")] +pub struct ResticProfileProfileBackup { + /// Check the repository after the backup command succeeded. + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub check_after: bool, + /// Check the repository before starting the backup command. + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub check_before: bool, + /// Do not fail the backup when some files could not be read. + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub no_error_on_warning: bool, + /// The paths to backup. Examples: /opt/, /home/user/, C:\Users\User\Documents. + #[serde(skip_serializing_if = "Vec::is_empty")] + #[builder(default)] + pub source: Vec<String>, + /// Exclude a pattern. + #[serde(skip_serializing_if = "Vec::is_empty")] + #[builder(default)] + pub exclude: Vec<String>, + /// Excludes cache directories that are marked with a CACHEDIR.TAG file. + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub exclude_caches: bool, + /// Takes filename[:header], exclude contents of directories containing filename (except filename itself) if header of that file is as provided. + #[serde(skip_serializing_if = "Vec::is_empty")] + #[builder(default)] + pub exclude_if_present: Vec<String>, + /// Max size of the files to be backed up (allowed suffixes: k/K, m/M, g/G, t/T). + pub exclude_larger_than: Option<String>, + /// Same as –exclude pattern but ignores the casing of filenames. + #[serde(skip_serializing_if = "Vec::is_empty")] + #[builder(default)] + pub iexclude: Vec<String>, + /// Add tags for the new snapshot in the format tag[,tag,…]. Boolean true is unsupported in section “backup”. Examples: false, "tag". + #[serde(skip_serializing_if = "Vec::is_empty")] + #[builder(default)] + pub tag: Vec<String>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)] +#[non_exhaustive] +#[serde(rename_all = "kebab-case")] +pub struct ResticProfileProfileRetention { + /// Apply retention after the backup command succeeded. + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub after_backup: bool, + /// Apply retention before starting the backup command + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub before_backup: bool, + /// Apply retention before starting the backup command + #[builder(default)] + pub host: bool, + + pub keep_last: Option<u32>, + pub keep_hourly: Option<u32>, + pub keep_daily: Option<u32>, + pub keep_weekly: Option<u32>, + pub keep_monthly: Option<u32>, + pub keep_yearly: Option<u32>, + + /// Automatically run the ‘prune’ command if snapshots have been removed. + #[serde(skip_serializing_if = "std::ops::Not::not")] + #[builder(default)] + pub prune: bool, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_default() { + let config = ResticProfileConfig::default(); + + let output = toml::to_string(&config).unwrap(); + assert_eq!( + output, + r#"version = "1" +"# + ); + } + + #[test] + fn test_global() { + let config = ResticProfileConfig::builder() + .global( + ResticProfileGlobal::builder() + .min_memory(100) + .restic_lock_retry_after("1m".to_owned()) + .restic_stale_lock_age("1h".to_owned()) + .build(), + ) + .build(); + + let output = toml::to_string(&config).unwrap(); + assert_eq!( + output, + r#"version = "1" + +[global] +min-memory = 100 +restic-lock-retry-after = "1m" +restic-stale-lock-age = "1h" +"# + ); + } + + #[test] + fn test_profile() { + let config = ResticProfileConfig::builder() + .profiles(HashMap::from([( + "default".to_owned(), + ResticProfileProfile::builder() + .cacert("/etc/ssl/ca.crt".to_owned()) + .cache_dir("/var/cache/restic".to_owned()) + .compression("auto".to_owned()) + .http_user_agent("resticprofile/0.1".to_owned()) + .insecure_tls(false) + .backup( + ResticProfileProfileBackup::builder() + .source(vec!["/opt".to_owned()]) + .build(), + ) + .retention( + ResticProfileProfileRetention::builder() + .after_backup(true) + .keep_last(10) + .build(), + ) + .build(), + )])) + .build(); + + let output = toml::to_string(&config).unwrap(); + assert_eq!( + output, + r#"version = "1" + +[default] +cacert = "/etc/ssl/ca.crt" +cache-dir = "/var/cache/restic" +compression = "auto" +http-user-agent = "resticprofile/0.1" + +[default.backup] +source = ["/opt"] + +[default.retention] +after-backup = true +host = false +keep-last = 10 +"# + ); + } +} diff --git a/src/resticprofile/mod.rs b/src/resticprofile/mod.rs new file mode 100644 index 0000000..0de6a11 --- /dev/null +++ b/src/resticprofile/mod.rs @@ -0,0 +1,238 @@ +use std::collections::{BTreeMap, HashMap}; + +use config::{ + ResticProfileConfig, ResticProfileProfile, ResticProfileProfileBackup, + ResticProfileProfileRetention, DEFAULT_PROFILE, +}; +use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::OwnerReference}; +use kube::{api::ObjectMeta, Api, Client, ResourceExt}; +use restic_crd::BackupSpec; + +use crate::{ + deploy::{Deployable, Labels}, + Error, +}; + +pub mod config; + +const PASSWORD_FILE_PATH: &str = "/resticprofile/password"; + +#[derive(Debug, Clone)] +pub struct ResticProfile { + name: String, + ns: String, + config: ResticProfileConfig, +} + +impl ResticProfile { + pub fn new(ns: String, name: impl AsRef<str>, backup: &BackupSpec) -> Self { + let config = create_config(backup); + let name = format!("{}-profile", name.as_ref()); + ResticProfile { name, ns, config } + } + + pub fn name(&self) -> &str { + &self.name + } + + async fn get(&self, client: Client) -> Result<Option<ConfigMap>, Error> { + let api: Api<ConfigMap> = Api::namespaced(client, &self.ns); + match api.get(&self.name).await { + Ok(c) => Ok(Some(c)), + Err(kube::Error::Api(ae)) => { + if ae.code == 404 { + Ok(None) + } else { + Err(Error::KubeError(kube::Error::Api(ae))) + } + } + Err(e) => Err(Error::KubeError(e)), + } + } +} + +impl Deployable for ResticProfile { + type Error = Error; + + async fn create<O>(&self, client: Client, owner: &O, labels: Labels) -> Result<(), Self::Error> + where + O: kube::Resource<DynamicType = ()> + Send + Sync, + { + let config = toml::to_string(&self.config)?; + let data = BTreeMap::from([("profiles.toml".to_owned(), config)]); + + let config_map = ConfigMap { + metadata: ObjectMeta { + name: Some(self.name.clone()), + namespace: Some(self.ns.clone()), + labels: Some(labels.to_labels()), + owner_references: O::meta(owner).uid.clone().map(|uid| { + vec![OwnerReference { + api_version: O::api_version(&()).into_owned(), + block_owner_deletion: Some(true), + controller: Some(true), + kind: O::kind(&()).into_owned(), + name: O::name_any(owner), + uid, + }] + }), + ..ObjectMeta::default() + }, + data: Some(data), + ..ConfigMap::default() + }; + + let api: Api<ConfigMap> = Api::namespaced(client, &self.ns); + api.create(&Default::default(), &config_map).await?; + + Ok(()) + } + + async fn delete(&self, client: Client) -> Result<(), Self::Error> { + let config_map = self.get(client.clone()).await?; + if let Some(config_map) = config_map { + let api: Api<ConfigMap> = Api::namespaced(client, &self.ns); + api.delete(&config_map.name_any(), &Default::default()) + .await?; + } + Ok(()) + } +} + +fn create_config(backup: &BackupSpec) -> ResticProfileConfig { + let paths = extract_paths(backup); + + let backup_conf = backup.restic.backup.as_ref().map(|b| { + ResticProfileProfileBackup::builder() + .source(paths) + .maybe_exclude(b.exclude.clone()) + .exclude_caches(b.exclude_caches) + .maybe_exclude_if_present(b.exclude_if_present.clone()) + .maybe_exclude_larger_than(b.exclude_larger_than.clone()) + .maybe_iexclude(b.iexclude.clone()) + .maybe_tag(b.tag.clone()) + .build() + }); + + let retention = backup.restic.retention.as_ref().map(|r| { + ResticProfileProfileRetention::builder() + .after_backup(r.after_backup) + .before_backup(r.before_backup) + .maybe_keep_last(r.keep_last) + .maybe_keep_hourly(r.keep_hourly) + .maybe_keep_daily(r.keep_daily) + .maybe_keep_weekly(r.keep_weekly) + .maybe_keep_monthly(r.keep_monthly) + .maybe_keep_yearly(r.keep_yearly) + .prune(r.prune) + .build() + }); + + let profile = ResticProfileProfile::builder() + .compression(backup.restic.compression.as_str().to_owned()) + .repository(backup.restic.repository.full_uri()) + .password_file(PASSWORD_FILE_PATH.to_owned()) + .maybe_backup(backup_conf) + .maybe_retention(retention) + .build(); + + ResticProfileConfig::builder() + .profiles(HashMap::from([(DEFAULT_PROFILE.to_owned(), profile)])) + .build() +} + +fn extract_paths(backup: &BackupSpec) -> Vec<String> { + if let Some(vol_backup) = &backup.volume { + vol_backup + .mounts + .iter() + .map(|m| m.mount_path.clone()) + .collect() + } else { + Vec::new() + } +} + +#[cfg(test)] +mod tests { + use k8s_openapi::{ + api::core::v1::VolumeMount, apimachinery::pkg::apis::meta::v1::OwnerReference, + }; + use restic_crd::{Backup, BackupSpec, Repository, ResticConfig, VolumeBackup}; + + use super::*; + + #[test] + fn test_extract_paths() { + let spec = BackupSpec::builder() + .restic( + ResticConfig::builder() + .repository( + Repository::builder() + .r#type(restic_crd::RepositoryType::Rest) + .uri("https://example.com".to_owned()) + .password(Default::default()) + .build(), + ) + .build(), + ) + .volume( + VolumeBackup::builder() + .mounts(vec![VolumeMount { + mount_path: "/mnt/data".to_owned(), + ..Default::default() + }]) + .volumes(Vec::new()) + .build(), + ) + .build(); + let backup = Backup::new("test", spec); + + let paths = extract_paths(&backup.spec); + assert_eq!(paths, vec!["/mnt/data".to_owned()]); + } + + #[tokio::test] + // #[cfg_attr( + // not(feature = "integration-tests"), + // ignore = "uses k8s current-context" + // )] + #[ignore = "TODO FIX: incorrect naming"] + async fn test_resticprofile() { + const NAME: &str = "test-resticprofile"; + const NS: &str = "default"; + let client = Client::try_default().await.unwrap(); + + let spec = BackupSpec::builder() + .restic( + ResticConfig::builder() + .repository( + Repository::builder() + .r#type(restic_crd::RepositoryType::Rest) + .uri("https://example.com".to_owned()) + .password(Default::default()) + .build(), + ) + .build(), + ) + .build(); + let _backup = Backup::new(NAME, spec); + + // let _profile = ResticProfile::new(NS.to_owned(), &backup); + // profile + // .create(client.clone(), Labels::new("test")) + // .await + // .unwrap(); + + let api: Api<ConfigMap> = Api::namespaced(client, NS); + let cm = api.get(NAME).await.unwrap(); + assert_eq!(cm.name_any(), NAME); + assert_eq!( + cm.owner_references(), + vec![OwnerReference { + name: "test".to_owned(), + ..Default::default() + }] + ); + } +} diff --git a/src/schedule/cronjob.rs b/src/schedule/cronjob.rs new file mode 100644 index 0000000..77a8ec7 --- /dev/null +++ b/src/schedule/cronjob.rs @@ -0,0 +1,131 @@ +use k8s_openapi::{ + api::batch::v1::{CronJob, CronJobSpec, JobTemplateSpec}, + apimachinery::pkg::apis::meta::v1::OwnerReference, +}; +use kube::{api::ObjectMeta, Api, Resource, ResourceExt}; +use restic_crd::ScheduledBackup; + +use super::Error; +use crate::{deploy::Deployable, jobspec::BackupJobSpec}; + +#[derive(Debug, Clone)] +pub struct BackupCronJob { + name: String, + ns: String, + spec: BackupJobSpec, + + /// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron. + pub schedule: String, + /// Specifies how to treat concurrent executions of a Job. Valid values are: + /// + /// - "Allow" (default): allows CronJobs to run concurrently; - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet; - "Replace": cancels currently running job and replaces it with a new one + pub concurrency_policy: Option<String>, + /// The number of failed finished jobs to retain. Value must be non-negative integer. Defaults to 1. + pub failed_jobs_history_limit: Option<i32>, + /// Optional deadline in seconds for starting the job if it misses scheduled time for any reason. Missed jobs executions will be counted as failed ones. + pub starting_deadline_seconds: Option<i64>, + /// The number of successful finished jobs to retain. Value must be non-negative integer. Defaults to 3. + pub successful_jobs_history_limit: Option<i32>, + /// This flag tells the controller to suspend subsequent executions, it does not apply to already started executions. Defaults to false. + pub suspend: Option<bool>, + /// The time zone name for the given schedule, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones. If not specified, this will default to the time zone of the kube-controller-manager process. The set of valid time zone names and the time zone offset is loaded from the system-wide time zone database by the API server during CronJob validation and the controller manager during execution. If no system-wide time zone database can be found a bundled version of the database is used instead. If the time zone name becomes invalid during the lifetime of a CronJob or due to a change in host configuration, the controller will stop creating new new Jobs and will create a system event with the reason UnknownTimeZone. More information can be found in https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#time-zones + pub time_zone: Option<String>, +} + +impl BackupCronJob { + pub fn new( + ns: impl Into<String>, + backup: &ScheduledBackup, + config_name: impl Into<String>, + ) -> Self { + let spec = BackupJobSpec::new(&backup.spec.backup, config_name); + Self { + name: format!("{}-cronjob", backup.name_any()), + ns: ns.into(), + spec, + schedule: backup.spec.schedule.clone(), + concurrency_policy: backup.spec.concurrency_policy.clone(), + failed_jobs_history_limit: backup.spec.failed_jobs_history_limit, + starting_deadline_seconds: backup.spec.starting_deadline_seconds, + successful_jobs_history_limit: backup.spec.successful_jobs_history_limit, + suspend: backup.spec.suspend, + time_zone: backup.spec.time_zone.clone(), + } + } + + async fn get(&self, client: kube::Client) -> Result<Option<CronJob>, Error> { + let api: Api<CronJob> = Api::namespaced(client, &self.ns); + match api.get(&self.name).await { + Ok(c) => Ok(Some(c)), + Err(kube::Error::Api(ae)) => { + if ae.code == 404 { + Ok(None) + } else { + Err(Error::KubeError(kube::Error::Api(ae))) + } + } + Err(e) => Err(Error::KubeError(e)), + } + } +} + +impl Deployable for BackupCronJob { + type Error = Error; + + async fn create<O>( + &self, + client: kube::Client, + owner: &O, + labels: crate::deploy::Labels, + ) -> Result<(), Self::Error> + where + O: Resource<DynamicType = ()> + Send + Sync, + { + let job = CronJob { + metadata: ObjectMeta { + name: Some(self.name.clone()), + namespace: Some(self.ns.clone()), + labels: Some(labels.to_labels()), + owner_references: O::meta(owner).uid.clone().map(|uid| { + vec![OwnerReference { + api_version: O::api_version(&()).into_owned(), + block_owner_deletion: Some(true), + controller: Some(true), + kind: O::kind(&()).into_owned(), + name: O::name_any(owner), + uid, + }] + }), + ..Default::default() + }, + spec: Some(CronJobSpec { + concurrency_policy: self.concurrency_policy.clone(), + failed_jobs_history_limit: self.failed_jobs_history_limit, + job_template: JobTemplateSpec { + metadata: None, + spec: Some(self.spec.clone().into()), + }, + schedule: self.schedule.clone(), + starting_deadline_seconds: self.starting_deadline_seconds, + successful_jobs_history_limit: self.successful_jobs_history_limit, + suspend: self.suspend, + time_zone: self.time_zone.clone(), + }), + ..Default::default() + }; + + let api: Api<CronJob> = Api::namespaced(client, &self.ns); + api.create(&Default::default(), &job).await?; + + Ok(()) + } + + async fn delete(&self, client: kube::Client) -> Result<(), Self::Error> { + let job = self.get(client.clone()).await?; + if let Some(job) = job { + let api: Api<CronJob> = Api::namespaced(client, &self.ns); + api.delete(&job.name_any(), &Default::default()).await?; + } + Ok(()) + } +} diff --git a/src/schedule/deploy.rs b/src/schedule/deploy.rs new file mode 100644 index 0000000..3897b3c --- /dev/null +++ b/src/schedule/deploy.rs @@ -0,0 +1,45 @@ +use kube::ResourceExt; +use restic_crd::ScheduledBackup; + +use super::{cronjob::BackupCronJob, Error}; +use crate::{deploy::Deployable, resticprofile::ResticProfile}; + +#[derive(Debug, Clone)] +pub struct ScheduledBackupDeployment { + profile: ResticProfile, + job: BackupCronJob, +} + +impl ScheduledBackupDeployment { + pub fn new(ns: String, backup: &ScheduledBackup) -> Self { + let profile = ResticProfile::new(ns.clone(), backup.name_any(), &backup.spec.backup); + let job = BackupCronJob::new(ns, backup, profile.name()); + Self { profile, job } + } +} + +impl Deployable for ScheduledBackupDeployment { + type Error = Error; + + async fn create<O>( + &self, + client: kube::Client, + owner: &O, + labels: crate::deploy::Labels, + ) -> Result<(), Self::Error> + where + O: kube::Resource<DynamicType = ()> + Send + Sync, + { + self.profile + .create(client.clone(), owner, labels.clone()) + .await?; + self.job.create(client, owner, labels).await?; + Ok(()) + } + + async fn delete(&self, client: kube::Client) -> Result<(), Self::Error> { + self.profile.delete(client.clone()).await?; + self.job.delete(client).await?; + Ok(()) + } +} diff --git a/src/schedule/mod.rs b/src/schedule/mod.rs new file mode 100644 index 0000000..fe303fb --- /dev/null +++ b/src/schedule/mod.rs @@ -0,0 +1,110 @@ +use std::{sync::Arc, time::Duration}; + +use futures::StreamExt; +use kube::{ + runtime::{controller::Action, watcher::Config, Controller}, + Api, Client, Resource, ResourceExt, +}; +use restic_crd::ScheduledBackup; +use tracing::{error, info}; + +use crate::{ + context::ContextData, + deploy::{Deployable, Labels}, + finalizer::{self, FINALIZER}, + Error, +}; + +mod cronjob; +mod deploy; + +pub async fn run_controller(client: Client) { + let crd_api: Api<ScheduledBackup> = Api::all(client.clone()); + let context: Arc<ContextData> = Arc::new(ContextData::new(client)); + + Controller::new(crd_api, Config::default()) + .run(reconcile, on_error, context) + .for_each(|reconciliation_result| async move { + match reconciliation_result { + Ok(echo_resource) => { + info!("Reconciliation successful. Resource: {:?}", echo_resource); + } + Err(reconciliation_err) => { + error!(%reconciliation_err, "Reconciliation error") + } + } + }) + .await; +} + +async fn reconcile( + backup: Arc<ScheduledBackup>, + context: Arc<ContextData>, +) -> Result<Action, Error> { + let client = context.client.clone(); + + let ns = backup.namespace().ok_or(Error::MissingNamespace)?; + let name = backup.name_any(); + + match determine_action(&backup) { + ScheduledBackupAction::Create => { + // Add the finalizer to the resource + finalizer::add( + &Api::<ScheduledBackup>::namespaced(client.clone(), &ns), + &name, + ) + .await?; + + // Create the deployment + let deployment = deploy::ScheduledBackupDeployment::new(ns, &backup); + let labels = Labels::new(name); + deployment.create(client, &*backup, labels).await?; + + Ok(Action::requeue(Duration::from_secs(10))) + } + ScheduledBackupAction::Delete => { + // Delete the deployment + let deployment = deploy::ScheduledBackupDeployment::new(ns.clone(), &backup); + deployment.delete(client.clone()).await?; + + // Remove the finalizer from the resource + finalizer::remove( + &Api::<ScheduledBackup>::namespaced(client.clone(), &ns), + &name, + ) + .await?; + Ok(Action::requeue(Duration::from_secs(10))) + } + ScheduledBackupAction::Noop => Ok(Action::requeue(Duration::from_secs(10))), + } +} + +fn determine_action(backup: &ScheduledBackup) -> ScheduledBackupAction { + if backup.meta().deletion_timestamp.is_some() { + ScheduledBackupAction::Delete + } else if backup + .meta() + .finalizers + .as_ref() + .map_or(true, |f| !f.iter().any(|x| x == FINALIZER)) + { + ScheduledBackupAction::Create + } else { + ScheduledBackupAction::Noop + } +} + +fn on_error(backup: Arc<ScheduledBackup>, error: &Error, _context: Arc<ContextData>) -> Action { + error!("Reconciliation error:\n{:?}.\n{:?}", error, backup); + Action::requeue(Duration::from_secs(5)) +} + +/// Possible actions to take on a [`ScheduledBackup`] resource +enum ScheduledBackupAction { + /// Create the sub-resources for the backup + Create, + /// Delete the sub-resources for the backup + Delete, + /// No operation required. Update the status if needed. + Noop, +} |