1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use kube::{
runtime::{controller::Action, watcher::Config, Controller},
Api, Client, Resource, ResourceExt,
};
use restic_crd::ScheduledBackup;
use tracing::{error, info};
use crate::{
context::ContextData,
deploy::{Deployable, Labels},
finalizer::{self, FINALIZER},
Error,
};
mod cronjob;
mod deploy;
pub async fn run_controller(client: Client) {
let crd_api: Api<ScheduledBackup> = Api::all(client.clone());
let context: Arc<ContextData> = Arc::new(ContextData::new(client));
Controller::new(crd_api, Config::default())
.run(reconcile, on_error, context)
.for_each(|reconciliation_result| async move {
match reconciliation_result {
Ok(echo_resource) => {
info!("Reconciliation successful. Resource: {:?}", echo_resource);
}
Err(reconciliation_err) => {
error!(%reconciliation_err, "Reconciliation error")
}
}
})
.await;
}
async fn reconcile(
backup: Arc<ScheduledBackup>,
context: Arc<ContextData>,
) -> Result<Action, Error> {
let client = context.client.clone();
let ns = backup.namespace().ok_or(Error::MissingNamespace)?;
let name = backup.name_any();
match determine_action(&backup) {
ScheduledBackupAction::Create => {
// Add the finalizer to the resource
finalizer::add(
&Api::<ScheduledBackup>::namespaced(client.clone(), &ns),
&name,
)
.await?;
// Create the deployment
let deployment = deploy::ScheduledBackupDeployment::new(ns, &backup);
let labels = Labels::new(name);
deployment.create(client, &*backup, labels).await?;
Ok(Action::requeue(Duration::from_secs(10)))
}
ScheduledBackupAction::Delete => {
// Delete the deployment
let deployment = deploy::ScheduledBackupDeployment::new(ns.clone(), &backup);
deployment.delete(client.clone()).await?;
// Remove the finalizer from the resource
finalizer::remove(
&Api::<ScheduledBackup>::namespaced(client.clone(), &ns),
&name,
)
.await?;
Ok(Action::requeue(Duration::from_secs(10)))
}
ScheduledBackupAction::Noop => Ok(Action::requeue(Duration::from_secs(10))),
}
}
fn determine_action(backup: &ScheduledBackup) -> ScheduledBackupAction {
if backup.meta().deletion_timestamp.is_some() {
ScheduledBackupAction::Delete
} else if backup
.meta()
.finalizers
.as_ref()
.map_or(true, |f| !f.iter().any(|x| x == FINALIZER))
{
ScheduledBackupAction::Create
} else {
ScheduledBackupAction::Noop
}
}
fn on_error(backup: Arc<ScheduledBackup>, error: &Error, _context: Arc<ContextData>) -> Action {
error!("Reconciliation error:\n{:?}.\n{:?}", error, backup);
Action::requeue(Duration::from_secs(5))
}
/// Possible actions to take on a [`ScheduledBackup`] resource
enum ScheduledBackupAction {
/// Create the sub-resources for the backup
Create,
/// Delete the sub-resources for the backup
Delete,
/// No operation required. Update the status if needed.
Noop,
}
|