monitord/
machines.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use tokio::sync::RwLock;
5use tracing::{debug, error};
6
7use crate::MachineStats;
8use crate::MonitordStats;
9
10pub fn filter_machines(
11    machines: Vec<crate::dbus::zbus_machines::ListedMachine>,
12    allowlist: Vec<String>,
13    blocklist: Vec<String>,
14) -> Vec<crate::dbus::zbus_machines::ListedMachine> {
15    machines
16        .into_iter()
17        .filter(|c| c.class == "container")
18        .filter(|c| !blocklist.contains(&c.name))
19        .filter(|c| allowlist.is_empty() || allowlist.contains(&c.name))
20        .collect()
21}
22
23pub async fn get_machines(
24    connection: &zbus::Connection,
25    config: &crate::config::Config,
26) -> Result<HashMap<String, u32>, zbus::Error> {
27    let c = crate::dbus::zbus_machines::ManagerProxy::new(connection).await?;
28    let mut results = HashMap::<String, u32>::new();
29
30    let machines = c.list_machines().await?;
31
32    for machine in filter_machines(
33        machines,
34        config.machines.allowlist.clone(),
35        config.machines.blocklist.clone(),
36    ) {
37        let m = c.get_machine(&machine.name).await?;
38        let leader_pid = m.leader().await?;
39        results.insert(machine.name.to_string(), leader_pid);
40    }
41
42    Ok(results)
43}
44
45pub async fn update_machines_stats(
46    config: crate::config::Config,
47    connection: zbus::Connection,
48    locked_monitord_stats: Arc<RwLock<MonitordStats>>,
49) -> anyhow::Result<()> {
50    let locked_machine_stats: Arc<RwLock<MachineStats>> =
51        Arc::new(RwLock::new(MachineStats::default()));
52
53    for (machine, leader_pid) in get_machines(&connection, &config).await?.into_iter() {
54        debug!(
55            "Collecting container: machine: {} leader_pid: {}",
56            machine, leader_pid
57        );
58        let container_address = format!(
59            "unix:path=/proc/{}/root/run/dbus/system_bus_socket",
60            leader_pid
61        );
62        let sdc = zbus::connection::Builder::address(container_address.as_str())?
63            .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
64            .build()
65            .await?;
66        let mut join_set = tokio::task::JoinSet::new();
67
68        if config.pid1.enabled {
69            join_set.spawn(crate::pid1::update_pid1_stats(
70                leader_pid as i32,
71                locked_machine_stats.clone(),
72            ));
73        }
74
75        if config.networkd.enabled {
76            join_set.spawn(crate::networkd::update_networkd_stats(
77                config.networkd.link_state_dir.clone(),
78                None,
79                sdc.clone(),
80                locked_machine_stats.clone(),
81            ));
82        }
83
84        if config.system_state.enabled {
85            join_set.spawn(crate::system::update_system_stats(
86                sdc.clone(),
87                locked_machine_stats.clone(),
88            ));
89        }
90
91        join_set.spawn(crate::system::update_version(
92            sdc.clone(),
93            locked_machine_stats.clone(),
94        ));
95
96        if config.units.enabled {
97            join_set.spawn(crate::units::update_unit_stats(
98                config.clone(),
99                sdc.clone(),
100                locked_machine_stats.clone(),
101            ));
102        }
103
104        if config.dbus_stats.enabled {
105            join_set.spawn(crate::dbus_stats::update_dbus_stats(
106                sdc.clone(),
107                locked_machine_stats.clone(),
108            ));
109        }
110
111        while let Some(res) = join_set.join_next().await {
112            match res {
113                Ok(r) => match r {
114                    Ok(_) => (),
115                    Err(e) => {
116                        error!(
117                            "Collection specific failure (container {}): {:?}",
118                            machine, e
119                        );
120                    }
121                },
122                Err(e) => {
123                    error!("Join error (container {}): {:?}", machine, e);
124                }
125            }
126        }
127
128        {
129            let mut monitord_stats = locked_monitord_stats.write().await;
130            let machine_stats = locked_machine_stats.read().await;
131            monitord_stats
132                .machines
133                .insert(machine.clone(), machine_stats.clone());
134        }
135    }
136
137    Ok(())
138}
139
140#[cfg(test)]
141mod tests {
142    use zbus::zvariant::OwnedObjectPath;
143
144    #[test]
145    fn test_filter_machines() {
146        let machines = vec![
147            crate::dbus::zbus_machines::ListedMachine {
148                name: "foo".to_string(),
149                class: "container".to_string(),
150                service: "".to_string(),
151                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
152            },
153            crate::dbus::zbus_machines::ListedMachine {
154                name: "bar".to_string(),
155                class: "container".to_string(),
156                service: "".to_string(),
157                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
158            },
159            crate::dbus::zbus_machines::ListedMachine {
160                name: "baz".to_string(),
161                class: "container".to_string(),
162                service: "".to_string(),
163                path: OwnedObjectPath::try_from("/sample/object").unwrap(),
164            },
165        ];
166        let allowlist = vec!["foo".to_string(), "baz".to_string()];
167        let blocklist = vec!["bar".to_string()];
168
169        let filtered = super::filter_machines(machines, allowlist, blocklist);
170
171        assert_eq!(filtered.len(), 2);
172        assert_eq!(filtered[0].name, "foo");
173        assert_eq!(filtered[1].name, "baz");
174    }
175}