Skip to main content

monitord/
machines.rs

1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::sync::Arc;
4
5use thiserror::Error;
6use tokio::sync::{Mutex, RwLock};
7use tracing::{debug, error, warn};
8
9use crate::MachineStats;
10use crate::MonitordStats;
11
12/// Cached D-Bus connections to containers, keyed by machine name.
13/// The u32 is the leader PID at the time the connection was established.
14/// A connection is only reused if the current leader PID matches.
15pub type MachineConnections = HashMap<String, (u32, zbus::Connection)>;
16
17/// What action to take for a machine's cached connection.
18#[derive(Debug, PartialEq)]
19enum CacheAction {
20    /// Cached connection exists with matching leader PID — reuse it.
21    Reuse,
22    /// Cached connection exists but leader PID changed — drop old and create new.
23    Replace,
24    /// No cached connection — create new.
25    Create,
26}
27
28#[derive(Error, Debug)]
29pub enum MonitordMachinesError {
30    #[error("Machines D-Bus error: {0}")]
31    ZbusError(#[from] zbus::Error),
32}
33
34pub fn filter_machines(
35    machines: Vec<crate::dbus::zbus_machines::ListedMachine>,
36    allowlist: &HashSet<String>,
37    blocklist: &HashSet<String>,
38) -> Vec<crate::dbus::zbus_machines::ListedMachine> {
39    machines
40        .into_iter()
41        .filter(|c| c.class == "container")
42        .filter(|c| !blocklist.contains(&c.name))
43        .filter(|c| allowlist.is_empty() || allowlist.contains(&c.name))
44        .collect()
45}
46
47pub async fn get_machines(
48    connection: &zbus::Connection,
49    config: &crate::config::Config,
50) -> Result<HashMap<String, u32>, MonitordMachinesError> {
51    let c = crate::dbus::zbus_machines::ManagerProxy::builder(connection)
52        .cache_properties(zbus::proxy::CacheProperties::No)
53        .build()
54        .await?;
55    let mut results = HashMap::<String, u32>::new();
56
57    let machines = c.list_machines().await?;
58
59    for machine in filter_machines(
60        machines,
61        &config.machines.allowlist,
62        &config.machines.blocklist,
63    ) {
64        let m = c.get_machine(&machine.name).await?;
65        let leader_pid = m.leader().await?;
66        results.insert(machine.name, leader_pid);
67    }
68
69    Ok(results)
70}
71
72/// Determine the cache action for a machine based on its cached and current leader PID.
73fn decide_cache_action(cached_pid: Option<u32>, leader_pid: u32) -> CacheAction {
74    match cached_pid {
75        Some(pid) if pid == leader_pid => CacheAction::Reuse,
76        Some(_) => CacheAction::Replace,
77        None => CacheAction::Create,
78    }
79}
80
81/// Remove cached connections for machines that no longer exist.
82async fn evict_stale_connections(
83    cached_connections: &Mutex<MachineConnections>,
84    current_machines: &HashMap<String, u32>,
85) {
86    let mut cache = cached_connections.lock().await;
87    cache.retain(|name, _| current_machines.contains_key(name));
88}
89
90/// Evict a cached connection for a machine that experienced errors.
91async fn evict_failed_connection(cached_connections: &Mutex<MachineConnections>, machine: &str) {
92    debug!(
93        "Evicting cached D-Bus connection for {} due to errors",
94        machine
95    );
96    let mut cache = cached_connections.lock().await;
97    cache.remove(machine);
98}
99
100/// Return a cached D-Bus connection if one exists for the same leader PID,
101/// otherwise create a new connection to the container's system bus.
102async fn get_or_create_connection(
103    config: &crate::config::Config,
104    cached_connections: &Mutex<MachineConnections>,
105    machine: &str,
106    leader_pid: u32,
107) -> anyhow::Result<zbus::Connection> {
108    // Check cache and return if hit; drop the lock before any async work
109    {
110        let mut cache = cached_connections.lock().await;
111        match decide_cache_action(cache.get(machine).map(|(pid, _)| *pid), leader_pid) {
112            CacheAction::Reuse => {
113                debug!("Reusing cached D-Bus connection for {}", machine);
114                let (_, conn) = cache.get(machine).unwrap();
115                return Ok(conn.clone());
116            }
117            CacheAction::Replace => {
118                debug!(
119                    "Leader PID changed for {}, dropping stale connection",
120                    machine
121                );
122                cache.remove(machine);
123            }
124            CacheAction::Create => {}
125        }
126    }
127
128    // Build connection without holding the lock
129    debug!("Creating new D-Bus connection for {}", machine);
130    let container_address = format!(
131        "unix:path=/proc/{}/root/run/dbus/system_bus_socket",
132        leader_pid
133    );
134    let conn = zbus::connection::Builder::address(container_address.as_str())?
135        .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
136        .build()
137        .await?;
138
139    // Re-lock to insert
140    {
141        let mut cache = cached_connections.lock().await;
142        cache.insert(machine.to_string(), (leader_pid, conn.clone()));
143    }
144
145    Ok(conn)
146}
147
148pub async fn update_machines_stats(
149    config: Arc<crate::config::Config>,
150    connection: zbus::Connection,
151    locked_monitord_stats: Arc<RwLock<MonitordStats>>,
152    cached_connections: Arc<Mutex<MachineConnections>>,
153) -> anyhow::Result<()> {
154    let locked_machine_stats: Arc<RwLock<MachineStats>> =
155        Arc::new(RwLock::new(MachineStats::default()));
156
157    let current_machines = get_machines(&connection, &config).await?;
158
159    evict_stale_connections(&cached_connections, &current_machines).await;
160
161    for (machine, leader_pid) in current_machines.into_iter() {
162        debug!(
163            "Collecting container: machine: {} leader_pid: {}",
164            machine, leader_pid
165        );
166
167        let sdc = match get_or_create_connection(&config, &cached_connections, &machine, leader_pid)
168            .await
169        {
170            Ok(conn) => conn,
171            Err(e) => {
172                error!("Failed to connect to container {}: {:?}", machine, e);
173                continue;
174            }
175        };
176
177        let mut join_set = tokio::task::JoinSet::new();
178
179        if config.pid1.enabled {
180            join_set.spawn(crate::pid1::update_pid1_stats(
181                leader_pid as i32,
182                locked_machine_stats.clone(),
183            ));
184        }
185
186        if config.networkd.enabled {
187            let config_clone = Arc::clone(&config);
188            let sdc_clone = sdc.clone();
189            let stats_clone = locked_machine_stats.clone();
190            let machine_name = machine.clone();
191            join_set.spawn(async move {
192                if config_clone.varlink.enabled {
193                    let socket_path = format!(
194                        "/proc/{}/root{}",
195                        leader_pid,
196                        crate::varlink_networkd::NETWORK_SOCKET_PATH
197                    );
198                    match crate::varlink_networkd::get_networkd_state(&socket_path).await {
199                        Ok(networkd_stats) => {
200                            let mut machine_stats = stats_clone.write().await;
201                            machine_stats.networkd = networkd_stats;
202                            return Ok(());
203                        }
204                        Err(err) => {
205                            warn!(
206                                "Varlink networkd stats failed for container {}, falling back to file-based: {:?}",
207                                machine_name, err
208                            );
209                        }
210                    }
211                }
212                crate::networkd::update_networkd_stats(
213                    config_clone.networkd.link_state_dir.clone(),
214                    None,
215                    sdc_clone,
216                    stats_clone,
217                )
218                .await
219            });
220        }
221
222        if config.system_state.enabled {
223            join_set.spawn(crate::system::update_system_stats(
224                sdc.clone(),
225                locked_machine_stats.clone(),
226            ));
227        }
228
229        join_set.spawn(crate::system::update_version(
230            sdc.clone(),
231            locked_machine_stats.clone(),
232        ));
233
234        if config.units.enabled {
235            if config.varlink.enabled {
236                let config_clone = Arc::clone(&config);
237                let sdc_clone = sdc.clone();
238                let stats_clone = locked_machine_stats.clone();
239                let container_socket_path = format!(
240                    "/proc/{}/root{}",
241                    leader_pid,
242                    crate::varlink_units::METRICS_SOCKET_PATH
243                );
244                join_set.spawn(async move {
245                    match crate::varlink_units::update_unit_stats(
246                        Arc::clone(&config_clone),
247                        stats_clone.clone(),
248                        container_socket_path,
249                    )
250                    .await
251                    {
252                        Ok(()) => Ok(()),
253                        Err(err) => {
254                            warn!(
255                                "Varlink units stats failed, falling back to D-Bus: {:?}",
256                                err
257                            );
258                            crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone)
259                                .await
260                        }
261                    }
262                });
263            } else {
264                join_set.spawn(crate::units::update_unit_stats(
265                    Arc::clone(&config),
266                    sdc.clone(),
267                    locked_machine_stats.clone(),
268                ));
269            }
270        }
271
272        if config.dbus_stats.enabled {
273            join_set.spawn(crate::dbus_stats::update_dbus_stats(
274                Arc::clone(&config),
275                sdc.clone(),
276                locked_machine_stats.clone(),
277            ));
278        }
279
280        let mut had_error = false;
281        while let Some(res) = join_set.join_next().await {
282            match res {
283                Ok(r) => match r {
284                    Ok(_) => (),
285                    Err(e) => {
286                        had_error = true;
287                        error!(
288                            "Collection specific failure (container {}): {:?}",
289                            machine, e
290                        );
291                    }
292                },
293                Err(e) => {
294                    had_error = true;
295                    error!("Join error (container {}): {:?}", machine, e);
296                }
297            }
298        }
299
300        if had_error {
301            evict_failed_connection(&cached_connections, &machine).await;
302        }
303
304        {
305            let mut monitord_stats = locked_monitord_stats.write().await;
306            let machine_stats = locked_machine_stats.read().await;
307            monitord_stats
308                .machines
309                .insert(machine, machine_stats.clone());
310        }
311    }
312
313    Ok(())
314}
315
316#[cfg(test)]
317mod tests {
318    use std::collections::HashSet;
319    use zbus::zvariant::OwnedObjectPath;
320
321    use super::{decide_cache_action, CacheAction};
322
323    #[test]
324    fn test_filter_machines() {
325        let machines = vec![
326            crate::dbus::zbus_machines::ListedMachine {
327                name: "foo".to_string(),
328                class: "container".to_string(),
329                service: "".to_string(),
330                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
331            },
332            crate::dbus::zbus_machines::ListedMachine {
333                name: "bar".to_string(),
334                class: "container".to_string(),
335                service: "".to_string(),
336                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
337            },
338            crate::dbus::zbus_machines::ListedMachine {
339                name: "baz".to_string(),
340                class: "container".to_string(),
341                service: "".to_string(),
342                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
343            },
344        ];
345        let allowlist = HashSet::from(["foo".to_string(), "baz".to_string()]);
346        let blocklist = HashSet::from(["bar".to_string()]);
347
348        let filtered = super::filter_machines(machines, &allowlist, &blocklist);
349
350        assert_eq!(filtered.len(), 2);
351        assert_eq!(filtered[0].name, "foo");
352        assert_eq!(filtered[1].name, "baz");
353    }
354
355    #[test]
356    fn test_decide_cache_action_reuse_on_same_pid() {
357        assert_eq!(decide_cache_action(Some(42), 42), CacheAction::Reuse);
358    }
359
360    #[test]
361    fn test_decide_cache_action_replace_on_pid_change() {
362        assert_eq!(decide_cache_action(Some(42), 99), CacheAction::Replace);
363    }
364
365    #[test]
366    fn test_decide_cache_action_create_on_miss() {
367        assert_eq!(decide_cache_action(None, 42), CacheAction::Create);
368    }
369}