summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGravatar Anshul Gupta <ansg191@anshulg.com> 2024-12-14 11:25:31 -0800
committerGravatar Anshul Gupta <ansg191@anshulg.com> 2024-12-14 13:35:02 -0800
commited73a37d2498a51af3cbefe69f46223b103d71f5 (patch)
tree25123738c474c15d1eddfd67a546e0887cd9f606 /src
downloadrestic-operator-ed73a37d2498a51af3cbefe69f46223b103d71f5.tar.gz
restic-operator-ed73a37d2498a51af3cbefe69f46223b103d71f5.tar.zst
restic-operator-ed73a37d2498a51af3cbefe69f46223b103d71f5.zip
Initial Commit
Diffstat (limited to 'src')
-rw-r--r--src/backup/deploy.rs44
-rw-r--r--src/backup/job.rs87
-rw-r--r--src/backup/mod.rs96
-rw-r--r--src/context.rs18
-rw-r--r--src/deploy.rs43
-rw-r--r--src/error.rs12
-rw-r--r--src/finalizer.rs86
-rw-r--r--src/jobspec.rs284
-rw-r--r--src/main.rs42
-rw-r--r--src/resticprofile/config.rs245
-rw-r--r--src/resticprofile/mod.rs238
-rw-r--r--src/schedule/cronjob.rs131
-rw-r--r--src/schedule/deploy.rs45
-rw-r--r--src/schedule/mod.rs110
14 files changed, 1481 insertions, 0 deletions
diff --git a/src/backup/deploy.rs b/src/backup/deploy.rs
new file mode 100644
index 0000000..0ab0be8
--- /dev/null
+++ b/src/backup/deploy.rs
@@ -0,0 +1,44 @@
+use kube::{Client, ResourceExt};
+use restic_crd::Backup;
+
+use super::job::BackupJob;
+use crate::{
+ deploy::{Deployable, Labels},
+ resticprofile::ResticProfile,
+ Error,
+};
+
+#[derive(Debug, Clone)]
+pub struct BackupDeployment {
+ profile: ResticProfile,
+ job: BackupJob,
+}
+
+impl BackupDeployment {
+ pub fn new(ns: String, backup: &Backup) -> Self {
+ let profile = ResticProfile::new(ns.clone(), backup.name_any(), &backup.spec);
+ let job = BackupJob::new(ns, backup, profile.name());
+ Self { profile, job }
+ }
+}
+
+impl Deployable for BackupDeployment {
+ type Error = Error;
+
+ async fn create<O>(&self, client: Client, owner: &O, labels: Labels) -> Result<(), Self::Error>
+ where
+ O: kube::Resource<DynamicType = ()> + Send + Sync,
+ {
+ self.profile
+ .create(client.clone(), owner, labels.clone())
+ .await?;
+ self.job.create(client, owner, labels).await?;
+ Ok(())
+ }
+
+ async fn delete(&self, client: Client) -> Result<(), Self::Error> {
+ self.profile.delete(client.clone()).await?;
+ self.job.delete(client).await?;
+ Ok(())
+ }
+}
diff --git a/src/backup/job.rs b/src/backup/job.rs
new file mode 100644
index 0000000..380c3cd
--- /dev/null
+++ b/src/backup/job.rs
@@ -0,0 +1,87 @@
+use k8s_openapi::{api::batch::v1::Job, apimachinery::pkg::apis::meta::v1::OwnerReference};
+use kube::{api::ObjectMeta, Api, Resource, ResourceExt};
+use restic_crd::Backup;
+
+use crate::{deploy::Deployable, jobspec::BackupJobSpec, Error};
+
+#[derive(Debug, Clone)]
+pub struct BackupJob {
+ name: String,
+ ns: String,
+ spec: BackupJobSpec,
+}
+
+impl BackupJob {
+ pub fn new(ns: impl Into<String>, backup: &Backup, config_name: impl Into<String>) -> Self {
+ let spec = BackupJobSpec::new(&backup.spec, config_name);
+ Self {
+ name: format!("{}-job", backup.name_any()),
+ ns: ns.into(),
+ spec,
+ }
+ }
+
+ async fn get(&self, client: kube::Client) -> Result<Option<Job>, Error> {
+ let api: Api<Job> = Api::namespaced(client, &self.ns);
+ match api.get(&self.name).await {
+ Ok(c) => Ok(Some(c)),
+ Err(kube::Error::Api(ae)) => {
+ if ae.code == 404 {
+ Ok(None)
+ } else {
+ Err(Error::KubeError(kube::Error::Api(ae)))
+ }
+ }
+ Err(e) => Err(Error::KubeError(e)),
+ }
+ }
+}
+
+impl Deployable for BackupJob {
+ type Error = Error;
+
+ async fn create<O>(
+ &self,
+ client: kube::Client,
+ owner: &O,
+ labels: crate::deploy::Labels,
+ ) -> Result<(), Self::Error>
+ where
+ O: Resource<DynamicType = ()> + Send + Sync,
+ {
+ let job = Job {
+ metadata: ObjectMeta {
+ name: Some(self.name.clone()),
+ namespace: Some(self.ns.clone()),
+ labels: Some(labels.to_labels()),
+ owner_references: O::meta(owner).uid.clone().map(|uid| {
+ vec![OwnerReference {
+ api_version: O::api_version(&()).into_owned(),
+ block_owner_deletion: Some(true),
+ controller: Some(true),
+ kind: O::kind(&()).into_owned(),
+ name: O::name_any(owner),
+ uid,
+ }]
+ }),
+ ..Default::default()
+ },
+ spec: Some(self.spec.clone().into()),
+ ..Default::default()
+ };
+
+ let api: Api<Job> = Api::namespaced(client, &self.ns);
+ api.create(&Default::default(), &job).await?;
+
+ Ok(())
+ }
+
+ async fn delete(&self, client: kube::Client) -> Result<(), Self::Error> {
+ let job = self.get(client.clone()).await?;
+ if let Some(job) = job {
+ let api: Api<Job> = Api::namespaced(client, &self.ns);
+ api.delete(&job.name_any(), &Default::default()).await?;
+ }
+ Ok(())
+ }
+}
diff --git a/src/backup/mod.rs b/src/backup/mod.rs
new file mode 100644
index 0000000..16d60a3
--- /dev/null
+++ b/src/backup/mod.rs
@@ -0,0 +1,96 @@
+use std::{sync::Arc, time::Duration};
+
+use deploy::BackupDeployment;
+use futures::StreamExt;
+use kube::{
+ runtime::{controller::Action, watcher::Config, Controller},
+ Api, Client, Resource, ResourceExt,
+};
+use restic_crd::Backup;
+use tracing::{error, info};
+
+use crate::{
+ context::ContextData,
+ deploy::{Deployable, Labels},
+ finalizer::{self, FINALIZER},
+ Error,
+};
+
+mod deploy;
+mod job;
+
+pub async fn run_controller(client: Client) {
+ let crd_api: Api<Backup> = Api::all(client.clone());
+ let context: Arc<ContextData> = Arc::new(ContextData::new(client));
+
+ Controller::new(crd_api, Config::default())
+ .run(reconcile, on_error, context)
+ .for_each(|reconciliation_result| async move {
+ match reconciliation_result {
+ Ok(echo_resource) => {
+ info!("Reconciliation successful. Resource: {:?}", echo_resource);
+ }
+ Err(reconciliation_err) => {
+ error!(%reconciliation_err, "Reconciliation error")
+ }
+ }
+ })
+ .await;
+}
+
+async fn reconcile(backup: Arc<Backup>, context: Arc<ContextData>) -> Result<Action, Error> {
+ let client = context.client.clone();
+
+ let ns = backup.namespace().ok_or(Error::MissingNamespace)?;
+ let name = backup.name_any();
+
+ match determine_action(&backup) {
+ BackupAction::Create => {
+ finalizer::add(&Api::<Backup>::namespaced(client.clone(), &ns), &name).await?;
+
+ let deployment = BackupDeployment::new(ns, &backup);
+ let labels = Labels::new(name);
+ deployment.create(client, &*backup, labels).await?;
+ Ok(Action::requeue(Duration::from_secs(10)))
+ }
+ BackupAction::Delete => {
+ let deployment = BackupDeployment::new(ns.clone(), &backup);
+ deployment.delete(client.clone()).await?;
+
+ finalizer::remove(&Api::<Backup>::namespaced(client, &ns), &name).await?;
+
+ Ok(Action::await_change())
+ }
+ BackupAction::Noop => Ok(Action::requeue(Duration::from_secs(10))),
+ }
+}
+
+fn determine_action(backup: &Backup) -> BackupAction {
+ if backup.meta().deletion_timestamp.is_some() {
+ BackupAction::Delete
+ } else if backup
+ .meta()
+ .finalizers
+ .as_ref()
+ .map_or(true, |f| !f.iter().any(|x| x == FINALIZER))
+ {
+ BackupAction::Create
+ } else {
+ BackupAction::Noop
+ }
+}
+
+fn on_error(backup: Arc<Backup>, error: &Error, _context: Arc<ContextData>) -> Action {
+ error!("Reconciliation error:\n{:?}.\n{:?}", error, backup);
+ Action::requeue(Duration::from_secs(5))
+}
+
+/// Possible actions to take on a [`Backup`] resource
+enum BackupAction {
+ /// Create the sub-resources for the backup
+ Create,
+ /// Delete the sub-resources for the backup
+ Delete,
+ /// No operation required. Update the status if needed.
+ Noop,
+}
diff --git a/src/context.rs b/src/context.rs
new file mode 100644
index 0000000..c8be716
--- /dev/null
+++ b/src/context.rs
@@ -0,0 +1,18 @@
+use kube::Client;
+
+/// Context injected with each `reconcile` and `on_error` method invocation.
+pub struct ContextData {
+ /// Kubernetes client to make Kubernetes API requests with.
+ pub client: Client,
+}
+
+impl ContextData {
+ /// Constructs a new instance of ContextData.
+ ///
+ /// # Arguments:
+ /// - `client`: A Kubernetes client to make Kubernetes REST API requests with. Resources
+ /// will be created and deleted with this client.
+ pub fn new(client: Client) -> Self {
+ ContextData { client }
+ }
+}
diff --git a/src/deploy.rs b/src/deploy.rs
new file mode 100644
index 0000000..f73399b
--- /dev/null
+++ b/src/deploy.rs
@@ -0,0 +1,43 @@
+use std::{collections::BTreeMap, future::Future};
+
+use kube::{Client, Resource};
+
+pub trait Deployable {
+ type Error;
+
+ fn create<O>(
+ &self,
+ client: Client,
+ owner: &O,
+ labels: Labels,
+ ) -> impl Future<Output = Result<(), Self::Error>> + Send
+ where
+ O: Resource<DynamicType = ()> + Send + Sync;
+ fn delete(&self, client: Client) -> impl Future<Output = Result<(), Self::Error>> + Send;
+}
+
+#[derive(Debug, Clone)]
+pub struct Labels {
+ app_name: String,
+}
+
+impl Labels {
+ pub fn new(app_name: impl Into<String>) -> Self {
+ Self {
+ app_name: app_name.into(),
+ }
+ }
+
+ pub fn to_labels(&self) -> BTreeMap<String, String> {
+ let mut labels = BTreeMap::new();
+ labels.insert(
+ "app.kubernetes.io/name".to_owned(),
+ self.app_name.to_owned(),
+ );
+ labels.insert(
+ "app.kubernetes.io/managed-by".to_owned(),
+ "restic-operator".to_owned(),
+ );
+ labels
+ }
+}
diff --git a/src/error.rs b/src/error.rs
new file mode 100644
index 0000000..fa28b1f
--- /dev/null
+++ b/src/error.rs
@@ -0,0 +1,12 @@
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ /// Any error originating from the `kube-rs` crate
+ #[error("Kubernetes reported error: {0}")]
+ KubeError(#[from] kube::Error),
+ /// Error in serializing the resticprofile config to TOML
+ #[error("Error creating resticprofile config: {0}")]
+ TomlSerializeError(#[from] toml::ser::Error),
+ /// Missing Namespace
+ #[error("Namespace not found")]
+ MissingNamespace,
+}
diff --git a/src/finalizer.rs b/src/finalizer.rs
new file mode 100644
index 0000000..5e7581a
--- /dev/null
+++ b/src/finalizer.rs
@@ -0,0 +1,86 @@
+use kube::{
+ api::{Patch, PatchParams},
+ Api, Error,
+};
+use serde::de::DeserializeOwned;
+use serde_json::{json, Value};
+
+pub const FINALIZER: &str = "restic.anshulg.com/finalizer";
+
+/// Adds a finalizer to the given resource.
+pub async fn add<K>(api: &Api<K>, name: &str) -> Result<K, Error>
+where
+ K: Clone + DeserializeOwned + std::fmt::Debug,
+{
+ let finalizer: Value = json!({
+ "metadata": {
+ "finalizers": [FINALIZER]
+ }
+ });
+
+ let patch = Patch::Merge(&finalizer);
+ api.patch(name, &PatchParams::default(), &patch).await
+}
+
+/// Removes all finalizers from the given resource.
+pub async fn remove<K>(api: &Api<K>, name: &str) -> Result<K, Error>
+where
+ K: Clone + DeserializeOwned + std::fmt::Debug,
+{
+ let finalizer: Value = json!({
+ "metadata": {
+ "finalizers": null
+ }
+ });
+
+ let patch = Patch::Merge(&finalizer);
+ api.patch(name, &PatchParams::default(), &patch).await
+}
+
+#[cfg(test)]
+mod tests {
+ use k8s_openapi::api::core::v1::ConfigMap;
+ use kube::{api::ObjectMeta, Client, ResourceExt};
+
+ use super::*;
+
+ #[tokio::test]
+ #[cfg_attr(
+ not(feature = "integration-tests"),
+ ignore = "uses k8s current-context"
+ )]
+ async fn test_finalizer() {
+ let client = Client::try_default().await.unwrap();
+ let api: Api<ConfigMap> = Api::namespaced(client, "default");
+
+ // Create a test ConfigMap
+ let cm = ConfigMap {
+ metadata: ObjectMeta {
+ name: Some("test-finalizer-add".to_string()),
+ ..Default::default()
+ },
+ ..Default::default()
+ };
+ let cm = api.create(&Default::default(), &cm).await.unwrap();
+
+ // Check for no finalizers
+ assert!(cm.metadata.finalizers.is_none());
+
+ // Add the finalizer
+ let cm = add(&api, &cm.name_any()).await.unwrap();
+
+ // Check for the finalizer
+ assert_eq!(cm.metadata.finalizers, Some(vec![FINALIZER.to_string()]));
+
+ // Remove the finalizer
+ let cm = remove(&api, &cm.name_any()).await.unwrap();
+
+ // Check for no finalizers
+ assert!(cm.metadata.finalizers.is_none());
+
+ // Clean up
+ api.delete(&cm.name_any(), &Default::default())
+ .await
+ .unwrap();
+ }
+}
diff --git a/src/jobspec.rs b/src/jobspec.rs
new file mode 100644
index 0000000..212172c
--- /dev/null
+++ b/src/jobspec.rs
@@ -0,0 +1,284 @@
+use k8s_openapi::api::{
+ batch::v1::JobSpec,
+ core::v1::{
+ Affinity, ConfigMapVolumeSource, Container, EnvFromSource, EnvVar, EnvVarSource, PodSpec,
+ PodTemplateSpec, ResourceRequirements, SecretVolumeSource, SecurityContext, Volume,
+ VolumeMount,
+ },
+};
+use restic_crd::{BackupSpec, ResticProfileConfig};
+
+const DEFAULT_RESTIC_IMAGE: &str = "creativeprojects/resticprofile";
+
+#[derive(Debug, Clone)]
+pub struct BackupJobSpec {
+ image: String,
+ image_pull_policy: Option<String>,
+ args: Option<Vec<String>>,
+ command: Option<Vec<String>>,
+ env: Vec<EnvVar>,
+ env_from: Vec<EnvFromSource>,
+ resources: Option<ResourceRequirements>,
+ security_context: Option<SecurityContext>,
+ affinity: Option<Affinity>,
+ node_selector: Option<std::collections::BTreeMap<String, String>>,
+ service_account_name: Option<String>,
+ volume_mounts: Vec<VolumeMount>,
+ volumes: Vec<Volume>,
+}
+
+impl BackupJobSpec {
+ pub fn new(backup: &BackupSpec, config_name: impl Into<String>) -> Self {
+ let mut rpcfg = backup.restic_profile.clone().unwrap_or_default();
+ let image = get_image(&mut rpcfg);
+ let env = fill_env(backup, &mut rpcfg);
+ let (volume_mounts, volumes) = fill_volume_mounts(backup, config_name);
+
+ Self {
+ image,
+ image_pull_policy: rpcfg.image_pull_policy.take(),
+ args: rpcfg.args.take(),
+ command: rpcfg.command.take(),
+ env,
+ env_from: rpcfg.env_from.take().unwrap_or_default(),
+ resources: rpcfg.resources.take(),
+ security_context: rpcfg.security_context.take(),
+ affinity: rpcfg.affinity.take(),
+ node_selector: rpcfg.node_selector.take(),
+ service_account_name: rpcfg.service_account_name.take(),
+ volume_mounts,
+ volumes,
+ }
+ }
+}
+
+impl From<BackupJobSpec> for JobSpec {
+ fn from(value: BackupJobSpec) -> Self {
+ Self {
+ suspend: Some(false),
+ template: PodTemplateSpec {
+ spec: Some(PodSpec {
+ affinity: value.affinity,
+ containers: vec![Container {
+ name: "restic-backup".to_owned(),
+ args: value.args,
+ command: value.command,
+ env: Some(value.env),
+ env_from: Some(value.env_from),
+ image: Some(value.image),
+ image_pull_policy: value.image_pull_policy,
+ resources: value.resources,
+ security_context: value.security_context,
+ volume_mounts: Some(value.volume_mounts),
+ ..Default::default()
+ }],
+ restart_policy: Some("OnFailure".to_string()),
+ node_selector: value.node_selector,
+ service_account_name: value.service_account_name,
+ volumes: Some(value.volumes),
+ ..Default::default()
+ }),
+ ..Default::default()
+ },
+ ..Default::default()
+ }
+ }
+}
+
+fn get_image(cfg: &mut ResticProfileConfig) -> String {
+ cfg.image.take().unwrap_or_else(|| {
+ format!(
+ "{}:{}",
+ DEFAULT_RESTIC_IMAGE,
+ cfg.version.as_deref().unwrap_or("latest")
+ )
+ })
+}
+
+fn fill_env(backup: &BackupSpec, rpcfg: &mut ResticProfileConfig) -> Vec<EnvVar> {
+ let mut env = rpcfg.env.take().unwrap_or_default();
+
+ if let Some(rest_creds) = &backup.restic.repository.rest_credentials {
+ env.push(EnvVar {
+ name: "RESTIC_REST_USERNAME".to_string(),
+ value_from: Some(EnvVarSource {
+ secret_key_ref: Some(rest_creds.username.clone()),
+ ..Default::default()
+ }),
+ ..Default::default()
+ });
+ env.push(EnvVar {
+ name: "RESTIC_REST_PASSWORD".to_string(),
+ value_from: Some(EnvVarSource {
+ secret_key_ref: Some(rest_creds.password.clone()),
+ ..Default::default()
+ }),
+ ..Default::default()
+ });
+ }
+
+ env
+}
+
+fn fill_volume_mounts(
+ backup: &BackupSpec,
+ config_name: impl Into<String>,
+) -> (Vec<VolumeMount>, Vec<Volume>) {
+ let mut mounts = Vec::new();
+ let mut volumes = Vec::new();
+
+ // Add volume mount for resticprofile config
+ mounts.push(VolumeMount {
+ mount_path: "/resticprofile/profiles.toml".to_owned(),
+ name: "profile".to_owned(),
+ sub_path: Some("profiles.toml".to_owned()),
+ ..Default::default()
+ });
+ volumes.push(Volume {
+ name: "profile".to_owned(),
+ config_map: Some(ConfigMapVolumeSource {
+ name: config_name.into(),
+ ..Default::default()
+ }),
+ ..Default::default()
+ });
+
+ // Add volume mount for restic repository password
+ mounts.push(VolumeMount {
+ mount_path: "/resticprofile/password.txt".to_owned(),
+ name: "restic-password".to_owned(),
+ sub_path: Some(backup.restic.repository.password.key.clone()),
+ ..Default::default()
+ });
+ volumes.push(Volume {
+ name: "restic-password".to_owned(),
+ secret: Some(SecretVolumeSource {
+ secret_name: Some(backup.restic.repository.password.name.clone()),
+ ..Default::default()
+ }),
+ ..Default::default()
+ });
+
+ // Add other volume mounts
+ if let Some(vol_backup) = &backup.volume {
+ mounts.extend_from_slice(&vol_backup.mounts);
+ volumes.extend_from_slice(&vol_backup.volumes);
+ };
+
+ (mounts, volumes)
+}
+
+#[cfg(test)]
+mod tests {
+ use k8s_openapi::api::core::v1::SecretKeySelector;
+ use restic_crd::{Repository, RepositoryType, RestCredentials, ResticConfig, VolumeBackup};
+
+ use super::*;
+
+ const CONFIG_NAME: &str = "test-config";
+
+ fn create_backup() -> BackupSpec {
+ BackupSpec {
+ restic: ResticConfig::builder()
+ .repository(Repository {
+ r#type: RepositoryType::Rest,
+ uri: "https://example.com".to_string(),
+ rest_credentials: Some(RestCredentials {
+ username: SecretKeySelector {
+ name: "restic-secret".to_string(),
+ key: "username".to_string(),
+ ..Default::default()
+ },
+ password: SecretKeySelector {
+ name: "restic-secret".to_string(),
+ key: "password".to_string(),
+ ..Default::default()
+ },
+ }),
+ password: SecretKeySelector {
+ name: "restic-password".to_string(),
+ key: "password.txt".to_string(),
+ ..Default::default()
+ },
+ })
+ .build(),
+ volume: None,
+ restic_profile: Some(ResticProfileConfig {
+ image: Some("custom/restic:latest".to_string()),
+ version: Some("v1.0.0".to_string()),
+ ..Default::default()
+ }),
+ }
+ }
+
+ #[test]
+ fn test_jobspec_new() {
+ let backup = create_backup();
+ let job_spec = BackupJobSpec::new(&backup, CONFIG_NAME);
+
+ assert_eq!(job_spec.image, "custom/restic:latest");
+ assert_eq!(job_spec.env.len(), 2);
+ assert_eq!(job_spec.volume_mounts.len(), 2);
+ assert_eq!(job_spec.volumes.len(), 2);
+ }
+
+ #[test]
+ fn test_default_image() {
+ let mut backup = create_backup();
+ backup.restic_profile.as_mut().unwrap().image = None;
+ let job = BackupJobSpec::new(&backup, CONFIG_NAME);
+
+ assert_eq!(job.image, "creativeprojects/resticprofile:v1.0.0");
+ }
+
+ #[test]
+ fn test_default_image_tag() {
+ let mut backup = create_backup();
+ backup.restic_profile.as_mut().unwrap().image = None;
+ backup.restic_profile.as_mut().unwrap().version = None;
+ let job = BackupJobSpec::new(&backup, CONFIG_NAME);
+
+ assert_eq!(job.image, "creativeprojects/resticprofile:latest");
+ }
+
+ #[test]
+ fn test_fill_env_with_no_credentials() {
+ let mut backup = create_backup();
+ backup.restic.repository.rest_credentials = None;
+ let mut rpcfg = backup.restic_profile.clone().unwrap_or_default();
+ let env = fill_env(&backup, &mut rpcfg);
+
+ assert!(env.is_empty());
+ }
+
+ #[test]
+ fn test_fill_volume_mounts_with_no_volume() {
+ let backup = create_backup();
+ let config_name = "test-config";
+ let (volume_mounts, volumes) = fill_volume_mounts(&backup, config_name);
+
+ assert_eq!(volume_mounts.len(), 2);
+ assert_eq!(volumes.len(), 2);
+ }
+
+ #[test]
+ fn test_fill_volume_mounts_with_volume() {
+ let mut backup = create_backup();
+ backup.volume = Some(VolumeBackup {
+ mounts: vec![VolumeMount {
+ mount_path: "/data".to_string(),
+ name: "data-volume".to_string(),
+ ..Default::default()
+ }],
+ volumes: vec![Volume {
+ name: "data-volume".to_string(),
+ ..Default::default()
+ }],
+ });
+ let config_name = "test-config";
+ let (volume_mounts, volumes) = fill_volume_mounts(&backup, config_name);
+
+ assert_eq!(volume_mounts.len(), 3);
+ assert_eq!(volumes.len(), 3);
+ }
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644
index 0000000..dfa2edf
--- /dev/null
+++ b/src/main.rs
@@ -0,0 +1,42 @@
+use kube::Client;
+use tracing::{info, level_filters::LevelFilter};
+use tracing_subscriber::EnvFilter;
+
+mod backup;
+mod context;
+mod deploy;
+mod error;
+mod finalizer;
+mod jobspec;
+mod resticprofile;
+mod schedule;
+
+pub use error::Error;
+
+#[tokio::main]
+async fn main() {
+ tracing_subscriber::fmt()
+ .with_env_filter(
+ EnvFilter::builder()
+ .with_default_directive(LevelFilter::INFO.into())
+ .from_env_lossy(),
+ )
+ .init();
+ std::panic::set_hook(Box::new(tracing_panic::panic_hook));
+
+ let k8s_client = Client::try_default()
+ .await
+ .expect("Expected a valid KUBECONFIG environment variable.");
+
+ let signal = tokio::signal::ctrl_c();
+ let backup_fut = tokio::spawn(backup::run_controller(k8s_client.clone()));
+ let schedule_fut = tokio::spawn(schedule::run_controller(k8s_client.clone()));
+
+ tokio::select! {
+ _ = signal => {}
+ _ = backup_fut => {}
+ _ = schedule_fut => {}
+ }
+
+ info!("Successfully shut down.")
+}
diff --git a/src/resticprofile/config.rs b/src/resticprofile/config.rs
new file mode 100644
index 0000000..b19ef2e
--- /dev/null
+++ b/src/resticprofile/config.rs
@@ -0,0 +1,245 @@
+use std::collections::HashMap;
+
+use bon::Builder;
+use serde::{Deserialize, Serialize};
+
+pub const DEFAULT_PROFILE: &str = "default";
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)]
+#[non_exhaustive]
+pub struct ResticProfileConfig {
+ #[builder(default)]
+ pub version: ResticProfileVersion,
+ pub global: Option<ResticProfileGlobal>,
+ #[serde(flatten)]
+ #[builder(default)]
+ pub profiles: HashMap<String, ResticProfileProfile>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
+#[non_exhaustive]
+pub enum ResticProfileVersion {
+ #[default]
+ #[serde(rename = "1")]
+ V1,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)]
+#[non_exhaustive]
+#[serde(rename_all = "kebab-case")]
+pub struct ResticProfileGlobal {
+ /// Minimum available memory (in MB) required to run any commands.
+ pub min_memory: Option<u64>,
+ /// Time to wait before trying to get a lock on a restic repository.
+ pub restic_lock_retry_after: Option<String>,
+ /// The age an unused lock on a restic repository must have at least before resticprofile attempts to unlock.
+ pub restic_stale_lock_age: Option<String>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)]
+#[non_exhaustive]
+#[serde(rename_all = "kebab-case")]
+pub struct ResticProfileProfile {
+ /// File to load root certificates from (default: use system certificates or $RESTIC_CACERT).
+ pub cacert: Option<String>,
+ /// Set the cache directory. (default: use system default cache directory).
+ pub cache_dir: Option<String>,
+ /// Compression mode (only available for repository format version 2), one of (auto/off/max) (default: $RESTIC_COMPRESSION).
+ pub compression: Option<String>,
+ /// Set a http user agent for outgoing http requests.
+ pub http_user_agent: Option<String>,
+ /// Skip TLS certificate verification when connecting to the repository (insecure).
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub insecure_tls: bool,
+ /// Limits downloads to a maximum rate in KiB/s. (default: unlimited).
+ pub limit_download: Option<u64>,
+ /// Limits uploads to a maximum rate in KiB/s. (default: unlimited).
+ pub limit_upload: Option<u64>,
+ /// Do not use a local cache
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub no_cache: bool,
+ /// Skip additional verification of data before upload (see documentation)
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub no_extra_verify: bool,
+ /// set target pack size in MiB, created pack files may be larger (default: $RESTIC_PACK_SIZE).
+ pub pack_size: Option<u64>,
+ /// File to read the repository password from.
+ pub password_file: Option<String>,
+ /// Repository to backup to or restore from (default: $RESTIC_REPOSITORY).
+ pub repository: Option<String>,
+ /// Path to a file containing PEM encoded TLS client certificate and private key (default: $RESTIC_TLS_CLIENT_CERT).
+ pub tls_client_cert: Option<String>,
+ /// Be verbose
+ pub verbose: Option<u8>,
+
+ /// This section configures restic command `backup`.
+ pub backup: Option<ResticProfileProfileBackup>,
+ /// This section configures restic command `forget`.
+ pub retention: Option<ResticProfileProfileRetention>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)]
+#[non_exhaustive]
+#[serde(rename_all = "kebab-case")]
+pub struct ResticProfileProfileBackup {
+ /// Check the repository after the backup command succeeded.
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub check_after: bool,
+ /// Check the repository before starting the backup command.
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub check_before: bool,
+ /// Do not fail the backup when some files could not be read.
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub no_error_on_warning: bool,
+ /// The paths to backup. Examples: /opt/, /home/user/, C:\Users\User\Documents.
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ #[builder(default)]
+ pub source: Vec<String>,
+ /// Exclude a pattern.
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ #[builder(default)]
+ pub exclude: Vec<String>,
+ /// Excludes cache directories that are marked with a CACHEDIR.TAG file.
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub exclude_caches: bool,
+ /// Takes filename[:header], exclude contents of directories containing filename (except filename itself) if header of that file is as provided.
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ #[builder(default)]
+ pub exclude_if_present: Vec<String>,
+ /// Max size of the files to be backed up (allowed suffixes: k/K, m/M, g/G, t/T).
+ pub exclude_larger_than: Option<String>,
+ /// Same as –exclude pattern but ignores the casing of filenames.
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ #[builder(default)]
+ pub iexclude: Vec<String>,
+ /// Add tags for the new snapshot in the format tag[,tag,…]. Boolean true is unsupported in section “backup”. Examples: false, "tag".
+ #[serde(skip_serializing_if = "Vec::is_empty")]
+ #[builder(default)]
+ pub tag: Vec<String>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Builder, Default)]
+#[non_exhaustive]
+#[serde(rename_all = "kebab-case")]
+pub struct ResticProfileProfileRetention {
+ /// Apply retention after the backup command succeeded.
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub after_backup: bool,
+ /// Apply retention before starting the backup command
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub before_backup: bool,
+ /// Apply retention before starting the backup command
+ #[builder(default)]
+ pub host: bool,
+
+ pub keep_last: Option<u32>,
+ pub keep_hourly: Option<u32>,
+ pub keep_daily: Option<u32>,
+ pub keep_weekly: Option<u32>,
+ pub keep_monthly: Option<u32>,
+ pub keep_yearly: Option<u32>,
+
+ /// Automatically run the ‘prune’ command if snapshots have been removed.
+ #[serde(skip_serializing_if = "std::ops::Not::not")]
+ #[builder(default)]
+ pub prune: bool,
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_default() {
+ let config = ResticProfileConfig::default();
+
+ let output = toml::to_string(&config).unwrap();
+ assert_eq!(
+ output,
+ r#"version = "1"
+"#
+ );
+ }
+
+ #[test]
+ fn test_global() {
+ let config = ResticProfileConfig::builder()
+ .global(
+ ResticProfileGlobal::builder()
+ .min_memory(100)
+ .restic_lock_retry_after("1m".to_owned())
+ .restic_stale_lock_age("1h".to_owned())
+ .build(),
+ )
+ .build();
+
+ let output = toml::to_string(&config).unwrap();
+ assert_eq!(
+ output,
+ r#"version = "1"
+
+[global]
+min-memory = 100
+restic-lock-retry-after = "1m"
+restic-stale-lock-age = "1h"
+"#
+ );
+ }
+
+ #[test]
+ fn test_profile() {
+ let config = ResticProfileConfig::builder()
+ .profiles(HashMap::from([(
+ "default".to_owned(),
+ ResticProfileProfile::builder()
+ .cacert("/etc/ssl/ca.crt".to_owned())
+ .cache_dir("/var/cache/restic".to_owned())
+ .compression("auto".to_owned())
+ .http_user_agent("resticprofile/0.1".to_owned())
+ .insecure_tls(false)
+ .backup(
+ ResticProfileProfileBackup::builder()
+ .source(vec!["/opt".to_owned()])
+ .build(),
+ )
+ .retention(
+ ResticProfileProfileRetention::builder()
+ .after_backup(true)
+ .keep_last(10)
+ .build(),
+ )
+ .build(),
+ )]))
+ .build();
+
+ let output = toml::to_string(&config).unwrap();
+ assert_eq!(
+ output,
+ r#"version = "1"
+
+[default]
+cacert = "/etc/ssl/ca.crt"
+cache-dir = "/var/cache/restic"
+compression = "auto"
+http-user-agent = "resticprofile/0.1"
+
+[default.backup]
+source = ["/opt"]
+
+[default.retention]
+after-backup = true
+host = false
+keep-last = 10
+"#
+ );
+ }
+}
diff --git a/src/resticprofile/mod.rs b/src/resticprofile/mod.rs
new file mode 100644
index 0000000..0de6a11
--- /dev/null
+++ b/src/resticprofile/mod.rs
@@ -0,0 +1,238 @@
+use std::collections::{BTreeMap, HashMap};
+
+use config::{
+ ResticProfileConfig, ResticProfileProfile, ResticProfileProfileBackup,
+ ResticProfileProfileRetention, DEFAULT_PROFILE,
+};
+use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::OwnerReference};
+use kube::{api::ObjectMeta, Api, Client, ResourceExt};
+use restic_crd::BackupSpec;
+
+use crate::{
+ deploy::{Deployable, Labels},
+ Error,
+};
+
+pub mod config;
+
+const PASSWORD_FILE_PATH: &str = "/resticprofile/password";
+
+#[derive(Debug, Clone)]
+pub struct ResticProfile {
+ name: String,
+ ns: String,
+ config: ResticProfileConfig,
+}
+
+impl ResticProfile {
+ pub fn new(ns: String, name: impl AsRef<str>, backup: &BackupSpec) -> Self {
+ let config = create_config(backup);
+ let name = format!("{}-profile", name.as_ref());
+ ResticProfile { name, ns, config }
+ }
+
+ pub fn name(&self) -> &str {
+ &self.name
+ }
+
+ async fn get(&self, client: Client) -> Result<Option<ConfigMap>, Error> {
+ let api: Api<ConfigMap> = Api::namespaced(client, &self.ns);
+ match api.get(&self.name).await {
+ Ok(c) => Ok(Some(c)),
+ Err(kube::Error::Api(ae)) => {
+ if ae.code == 404 {
+ Ok(None)
+ } else {
+ Err(Error::KubeError(kube::Error::Api(ae)))
+ }
+ }
+ Err(e) => Err(Error::KubeError(e)),
+ }
+ }
+}
+
+impl Deployable for ResticProfile {
+ type Error = Error;
+
+ async fn create<O>(&self, client: Client, owner: &O, labels: Labels) -> Result<(), Self::Error>
+ where
+ O: kube::Resource<DynamicType = ()> + Send + Sync,
+ {
+ let config = toml::to_string(&self.config)?;
+ let data = BTreeMap::from([("profiles.toml".to_owned(), config)]);
+
+ let config_map = ConfigMap {
+ metadata: ObjectMeta {
+ name: Some(self.name.clone()),
+ namespace: Some(self.ns.clone()),
+ labels: Some(labels.to_labels()),
+ owner_references: O::meta(owner).uid.clone().map(|uid| {
+ vec![OwnerReference {
+ api_version: O::api_version(&()).into_owned(),
+ block_owner_deletion: Some(true),
+ controller: Some(true),
+ kind: O::kind(&()).into_owned(),
+ name: O::name_any(owner),
+ uid,
+ }]
+ }),
+ ..ObjectMeta::default()
+ },
+ data: Some(data),
+ ..ConfigMap::default()
+ };
+
+ let api: Api<ConfigMap> = Api::namespaced(client, &self.ns);
+ api.create(&Default::default(), &config_map).await?;
+
+ Ok(())
+ }
+
+ async fn delete(&self, client: Client) -> Result<(), Self::Error> {
+ let config_map = self.get(client.clone()).await?;
+ if let Some(config_map) = config_map {
+ let api: Api<ConfigMap> = Api::namespaced(client, &self.ns);
+ api.delete(&config_map.name_any(), &Default::default())
+ .await?;
+ }
+ Ok(())
+ }
+}
+
+fn create_config(backup: &BackupSpec) -> ResticProfileConfig {
+ let paths = extract_paths(backup);
+
+ let backup_conf = backup.restic.backup.as_ref().map(|b| {
+ ResticProfileProfileBackup::builder()
+ .source(paths)
+ .maybe_exclude(b.exclude.clone())
+ .exclude_caches(b.exclude_caches)
+ .maybe_exclude_if_present(b.exclude_if_present.clone())
+ .maybe_exclude_larger_than(b.exclude_larger_than.clone())
+ .maybe_iexclude(b.iexclude.clone())
+ .maybe_tag(b.tag.clone())
+ .build()
+ });
+
+ let retention = backup.restic.retention.as_ref().map(|r| {
+ ResticProfileProfileRetention::builder()
+ .after_backup(r.after_backup)
+ .before_backup(r.before_backup)
+ .maybe_keep_last(r.keep_last)
+ .maybe_keep_hourly(r.keep_hourly)
+ .maybe_keep_daily(r.keep_daily)
+ .maybe_keep_weekly(r.keep_weekly)
+ .maybe_keep_monthly(r.keep_monthly)
+ .maybe_keep_yearly(r.keep_yearly)
+ .prune(r.prune)
+ .build()
+ });
+
+ let profile = ResticProfileProfile::builder()
+ .compression(backup.restic.compression.as_str().to_owned())
+ .repository(backup.restic.repository.full_uri())
+ .password_file(PASSWORD_FILE_PATH.to_owned())
+ .maybe_backup(backup_conf)
+ .maybe_retention(retention)
+ .build();
+
+ ResticProfileConfig::builder()
+ .profiles(HashMap::from([(DEFAULT_PROFILE.to_owned(), profile)]))
+ .build()
+}
+
+fn extract_paths(backup: &BackupSpec) -> Vec<String> {
+ if let Some(vol_backup) = &backup.volume {
+ vol_backup
+ .mounts
+ .iter()
+ .map(|m| m.mount_path.clone())
+ .collect()
+ } else {
+ Vec::new()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use k8s_openapi::{
+ api::core::v1::VolumeMount, apimachinery::pkg::apis::meta::v1::OwnerReference,
+ };
+ use restic_crd::{Backup, BackupSpec, Repository, ResticConfig, VolumeBackup};
+
+ use super::*;
+
+ #[test]
+ fn test_extract_paths() {
+ let spec = BackupSpec::builder()
+ .restic(
+ ResticConfig::builder()
+ .repository(
+ Repository::builder()
+ .r#type(restic_crd::RepositoryType::Rest)
+ .uri("https://example.com".to_owned())
+ .password(Default::default())
+ .build(),
+ )
+ .build(),
+ )
+ .volume(
+ VolumeBackup::builder()
+ .mounts(vec![VolumeMount {
+ mount_path: "/mnt/data".to_owned(),
+ ..Default::default()
+ }])
+ .volumes(Vec::new())
+ .build(),
+ )
+ .build();
+ let backup = Backup::new("test", spec);
+
+ let paths = extract_paths(&backup.spec);
+ assert_eq!(paths, vec!["/mnt/data".to_owned()]);
+ }
+
+ #[tokio::test]
+ // #[cfg_attr(
+ // not(feature = "integration-tests"),
+ // ignore = "uses k8s current-context"
+ // )]
+ #[ignore = "TODO FIX: incorrect naming"]
+ async fn test_resticprofile() {
+ const NAME: &str = "test-resticprofile";
+ const NS: &str = "default";
+ let client = Client::try_default().await.unwrap();
+
+ let spec = BackupSpec::builder()
+ .restic(
+ ResticConfig::builder()
+ .repository(
+ Repository::builder()
+ .r#type(restic_crd::RepositoryType::Rest)
+ .uri("https://example.com".to_owned())
+ .password(Default::default())
+ .build(),
+ )
+ .build(),
+ )
+ .build();
+ let _backup = Backup::new(NAME, spec);
+
+ // let _profile = ResticProfile::new(NS.to_owned(), &backup);
+ // profile
+ // .create(client.clone(), Labels::new("test"))
+ // .await
+ // .unwrap();
+
+ let api: Api<ConfigMap> = Api::namespaced(client, NS);
+ let cm = api.get(NAME).await.unwrap();
+ assert_eq!(cm.name_any(), NAME);
+ assert_eq!(
+ cm.owner_references(),
+ vec![OwnerReference {
+ name: "test".to_owned(),
+ ..Default::default()
+ }]
+ );
+ }
+}
diff --git a/src/schedule/cronjob.rs b/src/schedule/cronjob.rs
new file mode 100644
index 0000000..77a8ec7
--- /dev/null
+++ b/src/schedule/cronjob.rs
@@ -0,0 +1,131 @@
+use k8s_openapi::{
+ api::batch::v1::{CronJob, CronJobSpec, JobTemplateSpec},
+ apimachinery::pkg::apis::meta::v1::OwnerReference,
+};
+use kube::{api::ObjectMeta, Api, Resource, ResourceExt};
+use restic_crd::ScheduledBackup;
+
+use super::Error;
+use crate::{deploy::Deployable, jobspec::BackupJobSpec};
+
+#[derive(Debug, Clone)]
+pub struct BackupCronJob {
+ name: String,
+ ns: String,
+ spec: BackupJobSpec,
+
+ /// The schedule in Cron format, see https://en.wikipedia.org/wiki/Cron.
+ pub schedule: String,
+ /// Specifies how to treat concurrent executions of a Job. Valid values are:
+ ///
+ /// - "Allow" (default): allows CronJobs to run concurrently; - "Forbid": forbids concurrent runs, skipping next run if previous run hasn't finished yet; - "Replace": cancels currently running job and replaces it with a new one
+ pub concurrency_policy: Option<String>,
+ /// The number of failed finished jobs to retain. Value must be non-negative integer. Defaults to 1.
+ pub failed_jobs_history_limit: Option<i32>,
+ /// Optional deadline in seconds for starting the job if it misses scheduled time for any reason. Missed jobs executions will be counted as failed ones.
+ pub starting_deadline_seconds: Option<i64>,
+ /// The number of successful finished jobs to retain. Value must be non-negative integer. Defaults to 3.
+ pub successful_jobs_history_limit: Option<i32>,
+ /// This flag tells the controller to suspend subsequent executions, it does not apply to already started executions. Defaults to false.
+ pub suspend: Option<bool>,
+ /// The time zone name for the given schedule, see https://en.wikipedia.org/wiki/List_of_tz_database_time_zones. If not specified, this will default to the time zone of the kube-controller-manager process. The set of valid time zone names and the time zone offset is loaded from the system-wide time zone database by the API server during CronJob validation and the controller manager during execution. If no system-wide time zone database can be found a bundled version of the database is used instead. If the time zone name becomes invalid during the lifetime of a CronJob or due to a change in host configuration, the controller will stop creating new new Jobs and will create a system event with the reason UnknownTimeZone. More information can be found in https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#time-zones
+ pub time_zone: Option<String>,
+}
+
+impl BackupCronJob {
+ pub fn new(
+ ns: impl Into<String>,
+ backup: &ScheduledBackup,
+ config_name: impl Into<String>,
+ ) -> Self {
+ let spec = BackupJobSpec::new(&backup.spec.backup, config_name);
+ Self {
+ name: format!("{}-cronjob", backup.name_any()),
+ ns: ns.into(),
+ spec,
+ schedule: backup.spec.schedule.clone(),
+ concurrency_policy: backup.spec.concurrency_policy.clone(),
+ failed_jobs_history_limit: backup.spec.failed_jobs_history_limit,
+ starting_deadline_seconds: backup.spec.starting_deadline_seconds,
+ successful_jobs_history_limit: backup.spec.successful_jobs_history_limit,
+ suspend: backup.spec.suspend,
+ time_zone: backup.spec.time_zone.clone(),
+ }
+ }
+
+ async fn get(&self, client: kube::Client) -> Result<Option<CronJob>, Error> {
+ let api: Api<CronJob> = Api::namespaced(client, &self.ns);
+ match api.get(&self.name).await {
+ Ok(c) => Ok(Some(c)),
+ Err(kube::Error::Api(ae)) => {
+ if ae.code == 404 {
+ Ok(None)
+ } else {
+ Err(Error::KubeError(kube::Error::Api(ae)))
+ }
+ }
+ Err(e) => Err(Error::KubeError(e)),
+ }
+ }
+}
+
+impl Deployable for BackupCronJob {
+ type Error = Error;
+
+ async fn create<O>(
+ &self,
+ client: kube::Client,
+ owner: &O,
+ labels: crate::deploy::Labels,
+ ) -> Result<(), Self::Error>
+ where
+ O: Resource<DynamicType = ()> + Send + Sync,
+ {
+ let job = CronJob {
+ metadata: ObjectMeta {
+ name: Some(self.name.clone()),
+ namespace: Some(self.ns.clone()),
+ labels: Some(labels.to_labels()),
+ owner_references: O::meta(owner).uid.clone().map(|uid| {
+ vec![OwnerReference {
+ api_version: O::api_version(&()).into_owned(),
+ block_owner_deletion: Some(true),
+ controller: Some(true),
+ kind: O::kind(&()).into_owned(),
+ name: O::name_any(owner),
+ uid,
+ }]
+ }),
+ ..Default::default()
+ },
+ spec: Some(CronJobSpec {
+ concurrency_policy: self.concurrency_policy.clone(),
+ failed_jobs_history_limit: self.failed_jobs_history_limit,
+ job_template: JobTemplateSpec {
+ metadata: None,
+ spec: Some(self.spec.clone().into()),
+ },
+ schedule: self.schedule.clone(),
+ starting_deadline_seconds: self.starting_deadline_seconds,
+ successful_jobs_history_limit: self.successful_jobs_history_limit,
+ suspend: self.suspend,
+ time_zone: self.time_zone.clone(),
+ }),
+ ..Default::default()
+ };
+
+ let api: Api<CronJob> = Api::namespaced(client, &self.ns);
+ api.create(&Default::default(), &job).await?;
+
+ Ok(())
+ }
+
+ async fn delete(&self, client: kube::Client) -> Result<(), Self::Error> {
+ let job = self.get(client.clone()).await?;
+ if let Some(job) = job {
+ let api: Api<CronJob> = Api::namespaced(client, &self.ns);
+ api.delete(&job.name_any(), &Default::default()).await?;
+ }
+ Ok(())
+ }
+}
diff --git a/src/schedule/deploy.rs b/src/schedule/deploy.rs
new file mode 100644
index 0000000..3897b3c
--- /dev/null
+++ b/src/schedule/deploy.rs
@@ -0,0 +1,45 @@
+use kube::ResourceExt;
+use restic_crd::ScheduledBackup;
+
+use super::{cronjob::BackupCronJob, Error};
+use crate::{deploy::Deployable, resticprofile::ResticProfile};
+
+#[derive(Debug, Clone)]
+pub struct ScheduledBackupDeployment {
+ profile: ResticProfile,
+ job: BackupCronJob,
+}
+
+impl ScheduledBackupDeployment {
+ pub fn new(ns: String, backup: &ScheduledBackup) -> Self {
+ let profile = ResticProfile::new(ns.clone(), backup.name_any(), &backup.spec.backup);
+ let job = BackupCronJob::new(ns, backup, profile.name());
+ Self { profile, job }
+ }
+}
+
+impl Deployable for ScheduledBackupDeployment {
+ type Error = Error;
+
+ async fn create<O>(
+ &self,
+ client: kube::Client,
+ owner: &O,
+ labels: crate::deploy::Labels,
+ ) -> Result<(), Self::Error>
+ where
+ O: kube::Resource<DynamicType = ()> + Send + Sync,
+ {
+ self.profile
+ .create(client.clone(), owner, labels.clone())
+ .await?;
+ self.job.create(client, owner, labels).await?;
+ Ok(())
+ }
+
+ async fn delete(&self, client: kube::Client) -> Result<(), Self::Error> {
+ self.profile.delete(client.clone()).await?;
+ self.job.delete(client).await?;
+ Ok(())
+ }
+}
diff --git a/src/schedule/mod.rs b/src/schedule/mod.rs
new file mode 100644
index 0000000..fe303fb
--- /dev/null
+++ b/src/schedule/mod.rs
@@ -0,0 +1,110 @@
+use std::{sync::Arc, time::Duration};
+
+use futures::StreamExt;
+use kube::{
+ runtime::{controller::Action, watcher::Config, Controller},
+ Api, Client, Resource, ResourceExt,
+};
+use restic_crd::ScheduledBackup;
+use tracing::{error, info};
+
+use crate::{
+ context::ContextData,
+ deploy::{Deployable, Labels},
+ finalizer::{self, FINALIZER},
+ Error,
+};
+
+mod cronjob;
+mod deploy;
+
+pub async fn run_controller(client: Client) {
+ let crd_api: Api<ScheduledBackup> = Api::all(client.clone());
+ let context: Arc<ContextData> = Arc::new(ContextData::new(client));
+
+ Controller::new(crd_api, Config::default())
+ .run(reconcile, on_error, context)
+ .for_each(|reconciliation_result| async move {
+ match reconciliation_result {
+ Ok(echo_resource) => {
+ info!("Reconciliation successful. Resource: {:?}", echo_resource);
+ }
+ Err(reconciliation_err) => {
+ error!(%reconciliation_err, "Reconciliation error")
+ }
+ }
+ })
+ .await;
+}
+
+async fn reconcile(
+ backup: Arc<ScheduledBackup>,
+ context: Arc<ContextData>,
+) -> Result<Action, Error> {
+ let client = context.client.clone();
+
+ let ns = backup.namespace().ok_or(Error::MissingNamespace)?;
+ let name = backup.name_any();
+
+ match determine_action(&backup) {
+ ScheduledBackupAction::Create => {
+ // Add the finalizer to the resource
+ finalizer::add(
+ &Api::<ScheduledBackup>::namespaced(client.clone(), &ns),
+ &name,
+ )
+ .await?;
+
+ // Create the deployment
+ let deployment = deploy::ScheduledBackupDeployment::new(ns, &backup);
+ let labels = Labels::new(name);
+ deployment.create(client, &*backup, labels).await?;
+
+ Ok(Action::requeue(Duration::from_secs(10)))
+ }
+ ScheduledBackupAction::Delete => {
+ // Delete the deployment
+ let deployment = deploy::ScheduledBackupDeployment::new(ns.clone(), &backup);
+ deployment.delete(client.clone()).await?;
+
+ // Remove the finalizer from the resource
+ finalizer::remove(
+ &Api::<ScheduledBackup>::namespaced(client.clone(), &ns),
+ &name,
+ )
+ .await?;
+ Ok(Action::requeue(Duration::from_secs(10)))
+ }
+ ScheduledBackupAction::Noop => Ok(Action::requeue(Duration::from_secs(10))),
+ }
+}
+
+fn determine_action(backup: &ScheduledBackup) -> ScheduledBackupAction {
+ if backup.meta().deletion_timestamp.is_some() {
+ ScheduledBackupAction::Delete
+ } else if backup
+ .meta()
+ .finalizers
+ .as_ref()
+ .map_or(true, |f| !f.iter().any(|x| x == FINALIZER))
+ {
+ ScheduledBackupAction::Create
+ } else {
+ ScheduledBackupAction::Noop
+ }
+}
+
+fn on_error(backup: Arc<ScheduledBackup>, error: &Error, _context: Arc<ContextData>) -> Action {
+ error!("Reconciliation error:\n{:?}.\n{:?}", error, backup);
+ Action::requeue(Duration::from_secs(5))
+}
+
+/// Possible actions to take on a [`ScheduledBackup`] resource
+enum ScheduledBackupAction {
+ /// Create the sub-resources for the backup
+ Create,
+ /// Delete the sub-resources for the backup
+ Delete,
+ /// No operation required. Update the status if needed.
+ Noop,
+}