monitord/
machines.rs

1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::sync::Arc;
4
5use thiserror::Error;
6use tokio::sync::{Mutex, RwLock};
7use tracing::{debug, error, warn};
8
9use crate::MachineStats;
10use crate::MonitordStats;
11
12/// Cached D-Bus connections to containers, keyed by machine name.
13/// The u32 is the leader PID at the time the connection was established.
14/// A connection is only reused if the current leader PID matches.
15pub type MachineConnections = HashMap<String, (u32, zbus::Connection)>;
16
17/// What action to take for a machine's cached connection.
18#[derive(Debug, PartialEq)]
19enum CacheAction {
20    /// Cached connection exists with matching leader PID — reuse it.
21    Reuse,
22    /// Cached connection exists but leader PID changed — drop old and create new.
23    Replace,
24    /// No cached connection — create new.
25    Create,
26}
27
28#[derive(Error, Debug)]
29pub enum MonitordMachinesError {
30    #[error("Machines D-Bus error: {0}")]
31    ZbusError(#[from] zbus::Error),
32}
33
34pub fn filter_machines(
35    machines: Vec<crate::dbus::zbus_machines::ListedMachine>,
36    allowlist: &HashSet<String>,
37    blocklist: &HashSet<String>,
38) -> Vec<crate::dbus::zbus_machines::ListedMachine> {
39    machines
40        .into_iter()
41        .filter(|c| c.class == "container")
42        .filter(|c| !blocklist.contains(&c.name))
43        .filter(|c| allowlist.is_empty() || allowlist.contains(&c.name))
44        .collect()
45}
46
47pub async fn get_machines(
48    connection: &zbus::Connection,
49    config: &crate::config::Config,
50) -> Result<HashMap<String, u32>, MonitordMachinesError> {
51    let c = crate::dbus::zbus_machines::ManagerProxy::builder(connection)
52        .cache_properties(zbus::proxy::CacheProperties::No)
53        .build()
54        .await?;
55    let mut results = HashMap::<String, u32>::new();
56
57    let machines = c.list_machines().await?;
58
59    for machine in filter_machines(
60        machines,
61        &config.machines.allowlist,
62        &config.machines.blocklist,
63    ) {
64        let m = c.get_machine(&machine.name).await?;
65        let leader_pid = m.leader().await?;
66        results.insert(machine.name, leader_pid);
67    }
68
69    Ok(results)
70}
71
72/// Determine the cache action for a machine based on its cached and current leader PID.
73fn decide_cache_action(cached_pid: Option<u32>, leader_pid: u32) -> CacheAction {
74    match cached_pid {
75        Some(pid) if pid == leader_pid => CacheAction::Reuse,
76        Some(_) => CacheAction::Replace,
77        None => CacheAction::Create,
78    }
79}
80
81/// Remove cached connections for machines that no longer exist.
82async fn evict_stale_connections(
83    cached_connections: &Mutex<MachineConnections>,
84    current_machines: &HashMap<String, u32>,
85) {
86    let mut cache = cached_connections.lock().await;
87    cache.retain(|name, _| current_machines.contains_key(name));
88}
89
90/// Evict a cached connection for a machine that experienced errors.
91async fn evict_failed_connection(cached_connections: &Mutex<MachineConnections>, machine: &str) {
92    debug!(
93        "Evicting cached D-Bus connection for {} due to errors",
94        machine
95    );
96    let mut cache = cached_connections.lock().await;
97    cache.remove(machine);
98}
99
100/// Return a cached D-Bus connection if one exists for the same leader PID,
101/// otherwise create a new connection to the container's system bus.
102async fn get_or_create_connection(
103    config: &crate::config::Config,
104    cached_connections: &Mutex<MachineConnections>,
105    machine: &str,
106    leader_pid: u32,
107) -> anyhow::Result<zbus::Connection> {
108    // Check cache and return if hit; drop the lock before any async work
109    {
110        let mut cache = cached_connections.lock().await;
111        match decide_cache_action(cache.get(machine).map(|(pid, _)| *pid), leader_pid) {
112            CacheAction::Reuse => {
113                debug!("Reusing cached D-Bus connection for {}", machine);
114                let (_, conn) = cache.get(machine).unwrap();
115                return Ok(conn.clone());
116            }
117            CacheAction::Replace => {
118                debug!(
119                    "Leader PID changed for {}, dropping stale connection",
120                    machine
121                );
122                cache.remove(machine);
123            }
124            CacheAction::Create => {}
125        }
126    }
127
128    // Build connection without holding the lock
129    debug!("Creating new D-Bus connection for {}", machine);
130    let container_address = format!(
131        "unix:path=/proc/{}/root/run/dbus/system_bus_socket",
132        leader_pid
133    );
134    let conn = zbus::connection::Builder::address(container_address.as_str())?
135        .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
136        .build()
137        .await?;
138
139    // Re-lock to insert
140    {
141        let mut cache = cached_connections.lock().await;
142        cache.insert(machine.to_string(), (leader_pid, conn.clone()));
143    }
144
145    Ok(conn)
146}
147
148pub async fn update_machines_stats(
149    config: Arc<crate::config::Config>,
150    connection: zbus::Connection,
151    locked_monitord_stats: Arc<RwLock<MonitordStats>>,
152    cached_connections: Arc<Mutex<MachineConnections>>,
153) -> anyhow::Result<()> {
154    let locked_machine_stats: Arc<RwLock<MachineStats>> =
155        Arc::new(RwLock::new(MachineStats::default()));
156
157    let current_machines = get_machines(&connection, &config).await?;
158
159    evict_stale_connections(&cached_connections, &current_machines).await;
160
161    for (machine, leader_pid) in current_machines.into_iter() {
162        debug!(
163            "Collecting container: machine: {} leader_pid: {}",
164            machine, leader_pid
165        );
166
167        let sdc = match get_or_create_connection(&config, &cached_connections, &machine, leader_pid)
168            .await
169        {
170            Ok(conn) => conn,
171            Err(e) => {
172                error!("Failed to connect to container {}: {:?}", machine, e);
173                continue;
174            }
175        };
176
177        let mut join_set = tokio::task::JoinSet::new();
178
179        if config.pid1.enabled {
180            join_set.spawn(crate::pid1::update_pid1_stats(
181                leader_pid as i32,
182                locked_machine_stats.clone(),
183            ));
184        }
185
186        if config.networkd.enabled {
187            join_set.spawn(crate::networkd::update_networkd_stats(
188                config.networkd.link_state_dir.clone(),
189                None,
190                sdc.clone(),
191                locked_machine_stats.clone(),
192            ));
193        }
194
195        if config.system_state.enabled {
196            join_set.spawn(crate::system::update_system_stats(
197                sdc.clone(),
198                locked_machine_stats.clone(),
199            ));
200        }
201
202        join_set.spawn(crate::system::update_version(
203            sdc.clone(),
204            locked_machine_stats.clone(),
205        ));
206
207        if config.units.enabled {
208            if config.varlink.enabled {
209                let config_clone = Arc::clone(&config);
210                let sdc_clone = sdc.clone();
211                let stats_clone = locked_machine_stats.clone();
212                let container_socket_path = format!(
213                    "/proc/{}/root{}",
214                    leader_pid,
215                    crate::varlink_units::METRICS_SOCKET_PATH
216                );
217                join_set.spawn(async move {
218                    match crate::varlink_units::update_unit_stats(
219                        Arc::clone(&config_clone),
220                        stats_clone.clone(),
221                        container_socket_path,
222                    )
223                    .await
224                    {
225                        Ok(()) => Ok(()),
226                        Err(err) => {
227                            warn!(
228                                "Varlink units stats failed, falling back to D-Bus: {:?}",
229                                err
230                            );
231                            crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone)
232                                .await
233                        }
234                    }
235                });
236            } else {
237                join_set.spawn(crate::units::update_unit_stats(
238                    Arc::clone(&config),
239                    sdc.clone(),
240                    locked_machine_stats.clone(),
241                ));
242            }
243        }
244
245        if config.dbus_stats.enabled {
246            join_set.spawn(crate::dbus_stats::update_dbus_stats(
247                Arc::clone(&config),
248                sdc.clone(),
249                locked_machine_stats.clone(),
250            ));
251        }
252
253        let mut had_error = false;
254        while let Some(res) = join_set.join_next().await {
255            match res {
256                Ok(r) => match r {
257                    Ok(_) => (),
258                    Err(e) => {
259                        had_error = true;
260                        error!(
261                            "Collection specific failure (container {}): {:?}",
262                            machine, e
263                        );
264                    }
265                },
266                Err(e) => {
267                    had_error = true;
268                    error!("Join error (container {}): {:?}", machine, e);
269                }
270            }
271        }
272
273        if had_error {
274            evict_failed_connection(&cached_connections, &machine).await;
275        }
276
277        {
278            let mut monitord_stats = locked_monitord_stats.write().await;
279            let machine_stats = locked_machine_stats.read().await;
280            monitord_stats
281                .machines
282                .insert(machine, machine_stats.clone());
283        }
284    }
285
286    Ok(())
287}
288
289#[cfg(test)]
290mod tests {
291    use std::collections::HashSet;
292    use zbus::zvariant::OwnedObjectPath;
293
294    use super::{decide_cache_action, CacheAction};
295
296    #[test]
297    fn test_filter_machines() {
298        let machines = vec![
299            crate::dbus::zbus_machines::ListedMachine {
300                name: "foo".to_string(),
301                class: "container".to_string(),
302                service: "".to_string(),
303                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
304            },
305            crate::dbus::zbus_machines::ListedMachine {
306                name: "bar".to_string(),
307                class: "container".to_string(),
308                service: "".to_string(),
309                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
310            },
311            crate::dbus::zbus_machines::ListedMachine {
312                name: "baz".to_string(),
313                class: "container".to_string(),
314                service: "".to_string(),
315                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
316            },
317        ];
318        let allowlist = HashSet::from(["foo".to_string(), "baz".to_string()]);
319        let blocklist = HashSet::from(["bar".to_string()]);
320
321        let filtered = super::filter_machines(machines, &allowlist, &blocklist);
322
323        assert_eq!(filtered.len(), 2);
324        assert_eq!(filtered[0].name, "foo");
325        assert_eq!(filtered[1].name, "baz");
326    }
327
328    #[test]
329    fn test_decide_cache_action_reuse_on_same_pid() {
330        assert_eq!(decide_cache_action(Some(42), 42), CacheAction::Reuse);
331    }
332
333    #[test]
334    fn test_decide_cache_action_replace_on_pid_change() {
335        assert_eq!(decide_cache_action(Some(42), 99), CacheAction::Replace);
336    }
337
338    #[test]
339    fn test_decide_cache_action_create_on_miss() {
340        assert_eq!(decide_cache_action(None, 42), CacheAction::Create);
341    }
342}