1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
use crate::{
    api::{ListParams, Meta, ObjectList, Resource, WatchEvent},
    client::APIClient,
    Error, Result,
};
use futures::{lock::Mutex, StreamExt, TryStreamExt};
use futures_timer::Delay;
use serde::de::DeserializeOwned;

use std::{collections::BTreeMap, sync::Arc, time::Duration};

/// Internal representation for Reflector
type Cache<K> = BTreeMap<ObjectId, K>;

/// Internal shared state of Reflector
struct State<K> {
    data: Cache<K>,
    version: String,
}

impl<K> Default for State<K> {
    fn default() -> Self {
        State {
            data: Default::default(),
            version: 0.to_string(),
        }
    }
}

/// A reflection of `Resource` state in kubernetes
///
/// This watches and caches a `Resource<K>` by:
/// - seeding the cache from a large initial list call
/// - keeping track of initial, and subsequent resourceVersions
/// - recovering when resourceVersions get desynced
///
/// It exposes it's internal state readably through a getter.
#[derive(Clone)]
pub struct Reflector<K>
where
    K: Clone + DeserializeOwned + Send + Meta,
{
    state: Arc<Mutex<State<K>>>,
    client: APIClient,
    resource: Resource,
    params: ListParams,
}

impl<K> Reflector<K>
where
    K: Clone + DeserializeOwned + Meta + Send,
{
    /// Create a reflector with a kube client on a resource
    pub fn new(client: APIClient, lp: ListParams, r: Resource) -> Self {
        Reflector {
            client,
            resource: r,
            params: lp,
            state: Default::default(),
        }
    }

    /// Initializes with a full list of data from a large initial LIST call
    pub async fn init(self) -> Result<Self> {
        info!("Starting Reflector for {}", self.resource.kind);
        self.reset().await?;
        Ok(self)
    }

    /// Run a single watch poll
    ///
    /// If this returns an error, it tries a full refresh.
    /// This is meant to be run continually in a thread/task. Spawn one.
    pub async fn poll(&self) -> Result<()> {
        trace!("Watching {}", self.resource.kind);
        if let Err(e) = self.single_watch().await {
            warn!("Poll error on {}: {}: {:?}", self.resource.kind, e, e);
            // If desynched due to mismatching resourceVersion, retry in a bit
            let dur = Duration::from_secs(10);
            Delay::new(dur).await;
            self.reset().await?; // propagate error if this failed..
        }

        Ok(())
    }

    /// Read data for users of the reflector
    ///
    /// This is instant if you are reading and writing from the same context.
    pub async fn state(&self) -> Result<Vec<K>> {
        let state = self.state.lock().await;
        Ok(state.data.values().cloned().collect::<Vec<K>>())
    }

    /// Read a single entry by name
    ///
    /// Will read in the configured namsepace, or globally on non-namespaced reflectors.
    /// If you are using a non-namespaced resources with name clashes,
    /// Try `Reflector::get_within` instead.
    pub fn get(&self, name: &str) -> Result<Option<K>> {
        let id = ObjectId {
            name: name.into(),
            namespace: self.resource.namespace.clone(),
        };

        futures::executor::block_on(async { Ok(self.state.lock().await.data.get(&id).map(Clone::clone)) })
    }

    /// Read a single entry by name within a specific namespace
    ///
    /// This is a more specific version of `Reflector::get`.
    /// This is only useful if your reflector is configured to poll across namsepaces.
    pub fn get_within(&self, name: &str, ns: &str) -> Result<Option<K>> {
        let id = ObjectId {
            name: name.into(),
            namespace: Some(ns.into()),
        };
        futures::executor::block_on(async { Ok(self.state.lock().await.data.get(&id).map(Clone::clone)) })
    }

    /// Reset the state with a full LIST call
    ///
    /// Same as what is done in `State::new`.
    pub async fn reset(&self) -> Result<()> {
        trace!("Refreshing {}", self.resource.kind);
        let (data, version) = self.get_full_resource_entries().await?;
        *self.state.lock().await = State { data, version };
        Ok(())
    }

    async fn get_full_resource_entries(&self) -> Result<(Cache<K>, String)> {
        let req = self.resource.list(&self.params)?;
        // NB: Object isn't general enough here
        let res = self.client.request::<ObjectList<K>>(req).await?;
        let mut data = BTreeMap::new();
        let version = res.metadata.resource_version.unwrap_or_else(|| "".into());

        trace!(
            "Got {} {} at resourceVersion={:?}",
            res.items.len(),
            self.resource.kind,
            version
        );
        for i in res.items {
            // The non-generic parts we care about are spec + status
            data.insert(ObjectId::key_for(&i), i);
        }
        let keys = data
            .keys()
            .map(ObjectId::to_string)
            .collect::<Vec<_>>()
            .join(", ");
        debug!("Initialized with: [{}]", keys);
        Ok((data, version))
    }

    // Watch helper
    async fn single_watch(&self) -> Result<()> {
        let rg = &self.resource;
        let oldver = self.state.lock().await.version.clone();
        let req = rg.watch(&self.params, &oldver)?;
        let mut events = self.client.request_events::<WatchEvent<K>>(req).await?.boxed();

        // Follow docs conventions and store the last resourceVersion
        // https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
        while let Some(ev) = events.try_next().await? {
            // Update in place:
            let mut state = self.state.lock().await;
            match ev {
                WatchEvent::Added(o) => {
                    let name = Meta::name(&o);
                    debug!("Adding {} to {}", name, rg.kind);
                    state
                        .data
                        .entry(ObjectId::key_for(&o))
                        .or_insert_with(|| o.clone());
                    if let Some(v) = Meta::resource_ver(&o) {
                        state.version = v.to_string();
                    }
                }
                WatchEvent::Modified(o) => {
                    let name = Meta::name(&o);
                    debug!("Modifying {} in {}", name, rg.kind);
                    state
                        .data
                        .entry(ObjectId::key_for(&o))
                        .and_modify(|e| *e = o.clone());
                    if let Some(v) = Meta::resource_ver(&o) {
                        state.version = v.to_string();
                    }
                }
                WatchEvent::Deleted(o) => {
                    let name = Meta::name(&o);
                    debug!("Removing {} from {}", name, rg.kind);
                    state.data.remove(&ObjectId::key_for(&o));
                    if let Some(v) = Meta::resource_ver(&o) {
                        state.version = v.to_string();
                    }
                }
                WatchEvent::Error(e) => {
                    warn!("Failed to watch {}: {:?}", rg.kind, e);
                    return Err(Error::Api(e));
                }
            }
        }
        Ok(())
    }
}

/// ObjectId represents an object by name and namespace (if any)
#[derive(Ord, PartialOrd, Hash, Eq, PartialEq, Clone)]
struct ObjectId {
    name: String,
    namespace: Option<String>,
}

impl ToString for ObjectId {
    fn to_string(&self) -> String {
        match &self.namespace {
            Some(ns) => format!("{} [{}]", self.name, ns),
            None => self.name.clone(),
        }
    }
}

impl ObjectId {
    fn key_for<K: Meta>(o: &K) -> Self {
        ObjectId {
            name: Meta::name(o),
            namespace: Meta::namespace(o),
        }
    }
}