1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::sync::Arc;
4
5use thiserror::Error;
6use tokio::sync::{Mutex, RwLock};
7use tracing::{debug, error, warn};
8
9use crate::MachineStats;
10use crate::MonitordStats;
11
12pub type MachineConnections = HashMap<String, (u32, zbus::Connection)>;
16
17#[derive(Debug, PartialEq)]
19enum CacheAction {
20 Reuse,
22 Replace,
24 Create,
26}
27
28#[derive(Error, Debug)]
29pub enum MonitordMachinesError {
30 #[error("Machines D-Bus error: {0}")]
31 ZbusError(#[from] zbus::Error),
32}
33
34pub fn filter_machines(
35 machines: Vec<crate::dbus::zbus_machines::ListedMachine>,
36 allowlist: &HashSet<String>,
37 blocklist: &HashSet<String>,
38) -> Vec<crate::dbus::zbus_machines::ListedMachine> {
39 machines
40 .into_iter()
41 .filter(|c| c.class == "container")
42 .filter(|c| !blocklist.contains(&c.name))
43 .filter(|c| allowlist.is_empty() || allowlist.contains(&c.name))
44 .collect()
45}
46
47pub async fn get_machines(
48 connection: &zbus::Connection,
49 config: &crate::config::Config,
50) -> Result<HashMap<String, u32>, MonitordMachinesError> {
51 let c = crate::dbus::zbus_machines::ManagerProxy::builder(connection)
52 .cache_properties(zbus::proxy::CacheProperties::No)
53 .build()
54 .await?;
55 let mut results = HashMap::<String, u32>::new();
56
57 let machines = c.list_machines().await?;
58
59 for machine in filter_machines(
60 machines,
61 &config.machines.allowlist,
62 &config.machines.blocklist,
63 ) {
64 let m = c.get_machine(&machine.name).await?;
65 let leader_pid = m.leader().await?;
66 results.insert(machine.name, leader_pid);
67 }
68
69 Ok(results)
70}
71
72fn decide_cache_action(cached_pid: Option<u32>, leader_pid: u32) -> CacheAction {
74 match cached_pid {
75 Some(pid) if pid == leader_pid => CacheAction::Reuse,
76 Some(_) => CacheAction::Replace,
77 None => CacheAction::Create,
78 }
79}
80
81async fn evict_stale_connections(
83 cached_connections: &Mutex<MachineConnections>,
84 current_machines: &HashMap<String, u32>,
85) {
86 let mut cache = cached_connections.lock().await;
87 cache.retain(|name, _| current_machines.contains_key(name));
88}
89
90async fn evict_failed_connection(cached_connections: &Mutex<MachineConnections>, machine: &str) {
92 debug!(
93 "Evicting cached D-Bus connection for {} due to errors",
94 machine
95 );
96 let mut cache = cached_connections.lock().await;
97 cache.remove(machine);
98}
99
100async fn get_or_create_connection(
103 config: &crate::config::Config,
104 cached_connections: &Mutex<MachineConnections>,
105 machine: &str,
106 leader_pid: u32,
107) -> anyhow::Result<zbus::Connection> {
108 {
110 let mut cache = cached_connections.lock().await;
111 match decide_cache_action(cache.get(machine).map(|(pid, _)| *pid), leader_pid) {
112 CacheAction::Reuse => {
113 debug!("Reusing cached D-Bus connection for {}", machine);
114 let (_, conn) = cache.get(machine).unwrap();
115 return Ok(conn.clone());
116 }
117 CacheAction::Replace => {
118 debug!(
119 "Leader PID changed for {}, dropping stale connection",
120 machine
121 );
122 cache.remove(machine);
123 }
124 CacheAction::Create => {}
125 }
126 }
127
128 debug!("Creating new D-Bus connection for {}", machine);
130 let container_address = format!(
131 "unix:path=/proc/{}/root/run/dbus/system_bus_socket",
132 leader_pid
133 );
134 let conn = zbus::connection::Builder::address(container_address.as_str())?
135 .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
136 .build()
137 .await?;
138
139 {
141 let mut cache = cached_connections.lock().await;
142 cache.insert(machine.to_string(), (leader_pid, conn.clone()));
143 }
144
145 Ok(conn)
146}
147
148pub async fn update_machines_stats(
149 config: Arc<crate::config::Config>,
150 connection: zbus::Connection,
151 locked_monitord_stats: Arc<RwLock<MonitordStats>>,
152 cached_connections: Arc<Mutex<MachineConnections>>,
153) -> anyhow::Result<()> {
154 let locked_machine_stats: Arc<RwLock<MachineStats>> =
155 Arc::new(RwLock::new(MachineStats::default()));
156
157 let current_machines = get_machines(&connection, &config).await?;
158
159 evict_stale_connections(&cached_connections, ¤t_machines).await;
160
161 for (machine, leader_pid) in current_machines.into_iter() {
162 debug!(
163 "Collecting container: machine: {} leader_pid: {}",
164 machine, leader_pid
165 );
166
167 let sdc = match get_or_create_connection(&config, &cached_connections, &machine, leader_pid)
168 .await
169 {
170 Ok(conn) => conn,
171 Err(e) => {
172 error!("Failed to connect to container {}: {:?}", machine, e);
173 continue;
174 }
175 };
176
177 let mut join_set = tokio::task::JoinSet::new();
178
179 if config.pid1.enabled {
180 join_set.spawn(crate::pid1::update_pid1_stats(
181 leader_pid as i32,
182 locked_machine_stats.clone(),
183 ));
184 }
185
186 if config.networkd.enabled {
187 join_set.spawn(crate::networkd::update_networkd_stats(
188 config.networkd.link_state_dir.clone(),
189 None,
190 sdc.clone(),
191 locked_machine_stats.clone(),
192 ));
193 }
194
195 if config.system_state.enabled {
196 join_set.spawn(crate::system::update_system_stats(
197 sdc.clone(),
198 locked_machine_stats.clone(),
199 ));
200 }
201
202 join_set.spawn(crate::system::update_version(
203 sdc.clone(),
204 locked_machine_stats.clone(),
205 ));
206
207 if config.units.enabled {
208 if config.varlink.enabled {
209 let config_clone = Arc::clone(&config);
210 let sdc_clone = sdc.clone();
211 let stats_clone = locked_machine_stats.clone();
212 let container_socket_path = format!(
213 "/proc/{}/root{}",
214 leader_pid,
215 crate::varlink_units::METRICS_SOCKET_PATH
216 );
217 join_set.spawn(async move {
218 match crate::varlink_units::update_unit_stats(
219 Arc::clone(&config_clone),
220 stats_clone.clone(),
221 container_socket_path,
222 )
223 .await
224 {
225 Ok(()) => Ok(()),
226 Err(err) => {
227 warn!(
228 "Varlink units stats failed, falling back to D-Bus: {:?}",
229 err
230 );
231 crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone)
232 .await
233 }
234 }
235 });
236 } else {
237 join_set.spawn(crate::units::update_unit_stats(
238 Arc::clone(&config),
239 sdc.clone(),
240 locked_machine_stats.clone(),
241 ));
242 }
243 }
244
245 if config.dbus_stats.enabled {
246 join_set.spawn(crate::dbus_stats::update_dbus_stats(
247 Arc::clone(&config),
248 sdc.clone(),
249 locked_machine_stats.clone(),
250 ));
251 }
252
253 let mut had_error = false;
254 while let Some(res) = join_set.join_next().await {
255 match res {
256 Ok(r) => match r {
257 Ok(_) => (),
258 Err(e) => {
259 had_error = true;
260 error!(
261 "Collection specific failure (container {}): {:?}",
262 machine, e
263 );
264 }
265 },
266 Err(e) => {
267 had_error = true;
268 error!("Join error (container {}): {:?}", machine, e);
269 }
270 }
271 }
272
273 if had_error {
274 evict_failed_connection(&cached_connections, &machine).await;
275 }
276
277 {
278 let mut monitord_stats = locked_monitord_stats.write().await;
279 let machine_stats = locked_machine_stats.read().await;
280 monitord_stats
281 .machines
282 .insert(machine, machine_stats.clone());
283 }
284 }
285
286 Ok(())
287}
288
289#[cfg(test)]
290mod tests {
291 use std::collections::HashSet;
292 use zbus::zvariant::OwnedObjectPath;
293
294 use super::{decide_cache_action, CacheAction};
295
296 #[test]
297 fn test_filter_machines() {
298 let machines = vec![
299 crate::dbus::zbus_machines::ListedMachine {
300 name: "foo".to_string(),
301 class: "container".to_string(),
302 service: "".to_string(),
303 path: OwnedObjectPath::try_from("/sample/object").unwrap(),
304 },
305 crate::dbus::zbus_machines::ListedMachine {
306 name: "bar".to_string(),
307 class: "container".to_string(),
308 service: "".to_string(),
309 path: OwnedObjectPath::try_from("/sample/object").unwrap(),
310 },
311 crate::dbus::zbus_machines::ListedMachine {
312 name: "baz".to_string(),
313 class: "container".to_string(),
314 service: "".to_string(),
315 path: OwnedObjectPath::try_from("/sample/object").unwrap(),
316 },
317 ];
318 let allowlist = HashSet::from(["foo".to_string(), "baz".to_string()]);
319 let blocklist = HashSet::from(["bar".to_string()]);
320
321 let filtered = super::filter_machines(machines, &allowlist, &blocklist);
322
323 assert_eq!(filtered.len(), 2);
324 assert_eq!(filtered[0].name, "foo");
325 assert_eq!(filtered[1].name, "baz");
326 }
327
328 #[test]
329 fn test_decide_cache_action_reuse_on_same_pid() {
330 assert_eq!(decide_cache_action(Some(42), 42), CacheAction::Reuse);
331 }
332
333 #[test]
334 fn test_decide_cache_action_replace_on_pid_change() {
335 assert_eq!(decide_cache_action(Some(42), 99), CacheAction::Replace);
336 }
337
338 #[test]
339 fn test_decide_cache_action_create_on_miss() {
340 assert_eq!(decide_cache_action(None, 42), CacheAction::Create);
341 }
342}