1use std::collections::HashMap;
2use std::collections::HashSet;
3use std::sync::Arc;
4
5use thiserror::Error;
6use tokio::sync::{Mutex, RwLock};
7use tracing::{debug, error, warn};
8
9use crate::MachineStats;
10use crate::MonitordStats;
11
12pub type MachineConnections = HashMap<String, (u32, zbus::Connection)>;
16
17#[derive(Debug, PartialEq)]
19enum CacheAction {
20 Reuse,
22 Replace,
24 Create,
26}
27
28#[derive(Error, Debug)]
29pub enum MonitordMachinesError {
30 #[error("Machines D-Bus error: {0}")]
31 ZbusError(#[from] zbus::Error),
32}
33
34pub fn filter_machines(
35 machines: Vec<crate::dbus::zbus_machines::ListedMachine>,
36 allowlist: &HashSet<String>,
37 blocklist: &HashSet<String>,
38) -> Vec<crate::dbus::zbus_machines::ListedMachine> {
39 machines
40 .into_iter()
41 .filter(|c| c.class == "container")
42 .filter(|c| !blocklist.contains(&c.name))
43 .filter(|c| allowlist.is_empty() || allowlist.contains(&c.name))
44 .collect()
45}
46
47pub async fn get_machines(
48 connection: &zbus::Connection,
49 config: &crate::config::Config,
50) -> Result<HashMap<String, u32>, MonitordMachinesError> {
51 let c = crate::dbus::zbus_machines::ManagerProxy::builder(connection)
52 .cache_properties(zbus::proxy::CacheProperties::No)
53 .build()
54 .await?;
55 let mut results = HashMap::<String, u32>::new();
56
57 let machines = c.list_machines().await?;
58
59 for machine in filter_machines(
60 machines,
61 &config.machines.allowlist,
62 &config.machines.blocklist,
63 ) {
64 let m = c.get_machine(&machine.name).await?;
65 let leader_pid = m.leader().await?;
66 results.insert(machine.name, leader_pid);
67 }
68
69 Ok(results)
70}
71
72fn decide_cache_action(cached_pid: Option<u32>, leader_pid: u32) -> CacheAction {
74 match cached_pid {
75 Some(pid) if pid == leader_pid => CacheAction::Reuse,
76 Some(_) => CacheAction::Replace,
77 None => CacheAction::Create,
78 }
79}
80
81async fn evict_stale_connections(
83 cached_connections: &Mutex<MachineConnections>,
84 current_machines: &HashMap<String, u32>,
85) {
86 let mut cache = cached_connections.lock().await;
87 cache.retain(|name, _| current_machines.contains_key(name));
88}
89
90async fn evict_failed_connection(cached_connections: &Mutex<MachineConnections>, machine: &str) {
92 debug!(
93 "Evicting cached D-Bus connection for {} due to errors",
94 machine
95 );
96 let mut cache = cached_connections.lock().await;
97 cache.remove(machine);
98}
99
100async fn get_or_create_connection(
103 config: &crate::config::Config,
104 cached_connections: &Mutex<MachineConnections>,
105 machine: &str,
106 leader_pid: u32,
107) -> anyhow::Result<zbus::Connection> {
108 {
110 let mut cache = cached_connections.lock().await;
111 match decide_cache_action(cache.get(machine).map(|(pid, _)| *pid), leader_pid) {
112 CacheAction::Reuse => {
113 debug!("Reusing cached D-Bus connection for {}", machine);
114 let (_, conn) = cache.get(machine).unwrap();
115 return Ok(conn.clone());
116 }
117 CacheAction::Replace => {
118 debug!(
119 "Leader PID changed for {}, dropping stale connection",
120 machine
121 );
122 cache.remove(machine);
123 }
124 CacheAction::Create => {}
125 }
126 }
127
128 debug!("Creating new D-Bus connection for {}", machine);
130 let container_address = format!(
131 "unix:path=/proc/{}/root/run/dbus/system_bus_socket",
132 leader_pid
133 );
134 let conn = zbus::connection::Builder::address(container_address.as_str())?
135 .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
136 .build()
137 .await?;
138
139 {
141 let mut cache = cached_connections.lock().await;
142 cache.insert(machine.to_string(), (leader_pid, conn.clone()));
143 }
144
145 Ok(conn)
146}
147
148pub async fn update_machines_stats(
149 config: Arc<crate::config::Config>,
150 connection: zbus::Connection,
151 locked_monitord_stats: Arc<RwLock<MonitordStats>>,
152 cached_connections: Arc<Mutex<MachineConnections>>,
153) -> anyhow::Result<()> {
154 let locked_machine_stats: Arc<RwLock<MachineStats>> =
155 Arc::new(RwLock::new(MachineStats::default()));
156
157 let current_machines = get_machines(&connection, &config).await?;
158
159 evict_stale_connections(&cached_connections, ¤t_machines).await;
160
161 for (machine, leader_pid) in current_machines.into_iter() {
162 debug!(
163 "Collecting container: machine: {} leader_pid: {}",
164 machine, leader_pid
165 );
166
167 let sdc = match get_or_create_connection(&config, &cached_connections, &machine, leader_pid)
168 .await
169 {
170 Ok(conn) => conn,
171 Err(e) => {
172 error!("Failed to connect to container {}: {:?}", machine, e);
173 continue;
174 }
175 };
176
177 let mut join_set = tokio::task::JoinSet::new();
178
179 if config.pid1.enabled {
180 join_set.spawn(crate::pid1::update_pid1_stats(
181 leader_pid as i32,
182 locked_machine_stats.clone(),
183 ));
184 }
185
186 if config.networkd.enabled {
187 let config_clone = Arc::clone(&config);
188 let sdc_clone = sdc.clone();
189 let stats_clone = locked_machine_stats.clone();
190 let machine_name = machine.clone();
191 join_set.spawn(async move {
192 if config_clone.varlink.enabled {
193 let socket_path = format!(
194 "/proc/{}/root{}",
195 leader_pid,
196 crate::varlink_networkd::NETWORK_SOCKET_PATH
197 );
198 match crate::varlink_networkd::get_networkd_state(&socket_path).await {
199 Ok(networkd_stats) => {
200 let mut machine_stats = stats_clone.write().await;
201 machine_stats.networkd = networkd_stats;
202 return Ok(());
203 }
204 Err(err) => {
205 warn!(
206 "Varlink networkd stats failed for container {}, falling back to file-based: {:?}",
207 machine_name, err
208 );
209 }
210 }
211 }
212 crate::networkd::update_networkd_stats(
213 config_clone.networkd.link_state_dir.clone(),
214 None,
215 sdc_clone,
216 stats_clone,
217 )
218 .await
219 });
220 }
221
222 if config.system_state.enabled {
223 join_set.spawn(crate::system::update_system_stats(
224 sdc.clone(),
225 locked_machine_stats.clone(),
226 ));
227 }
228
229 join_set.spawn(crate::system::update_version(
230 sdc.clone(),
231 locked_machine_stats.clone(),
232 ));
233
234 if config.units.enabled {
235 if config.varlink.enabled {
236 let config_clone = Arc::clone(&config);
237 let sdc_clone = sdc.clone();
238 let stats_clone = locked_machine_stats.clone();
239 let container_socket_path = format!(
240 "/proc/{}/root{}",
241 leader_pid,
242 crate::varlink_units::METRICS_SOCKET_PATH
243 );
244 join_set.spawn(async move {
245 match crate::varlink_units::update_unit_stats(
246 Arc::clone(&config_clone),
247 stats_clone.clone(),
248 container_socket_path,
249 )
250 .await
251 {
252 Ok(()) => Ok(()),
253 Err(err) => {
254 warn!(
255 "Varlink units stats failed, falling back to D-Bus: {:?}",
256 err
257 );
258 crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone)
259 .await
260 }
261 }
262 });
263 } else {
264 join_set.spawn(crate::units::update_unit_stats(
265 Arc::clone(&config),
266 sdc.clone(),
267 locked_machine_stats.clone(),
268 ));
269 }
270 }
271
272 if config.dbus_stats.enabled {
273 join_set.spawn(crate::dbus_stats::update_dbus_stats(
274 Arc::clone(&config),
275 sdc.clone(),
276 locked_machine_stats.clone(),
277 ));
278 }
279
280 let mut had_error = false;
281 while let Some(res) = join_set.join_next().await {
282 match res {
283 Ok(r) => match r {
284 Ok(_) => (),
285 Err(e) => {
286 had_error = true;
287 error!(
288 "Collection specific failure (container {}): {:?}",
289 machine, e
290 );
291 }
292 },
293 Err(e) => {
294 had_error = true;
295 error!("Join error (container {}): {:?}", machine, e);
296 }
297 }
298 }
299
300 if had_error {
301 evict_failed_connection(&cached_connections, &machine).await;
302 }
303
304 {
305 let mut monitord_stats = locked_monitord_stats.write().await;
306 let machine_stats = locked_machine_stats.read().await;
307 monitord_stats
308 .machines
309 .insert(machine, machine_stats.clone());
310 }
311 }
312
313 Ok(())
314}
315
316#[cfg(test)]
317mod tests {
318 use std::collections::HashSet;
319 use zbus::zvariant::OwnedObjectPath;
320
321 use super::{decide_cache_action, CacheAction};
322
323 #[test]
324 fn test_filter_machines() {
325 let machines = vec![
326 crate::dbus::zbus_machines::ListedMachine {
327 name: "foo".to_string(),
328 class: "container".to_string(),
329 service: "".to_string(),
330 path: OwnedObjectPath::try_from("/sample/object").unwrap(),
331 },
332 crate::dbus::zbus_machines::ListedMachine {
333 name: "bar".to_string(),
334 class: "container".to_string(),
335 service: "".to_string(),
336 path: OwnedObjectPath::try_from("/sample/object").unwrap(),
337 },
338 crate::dbus::zbus_machines::ListedMachine {
339 name: "baz".to_string(),
340 class: "container".to_string(),
341 service: "".to_string(),
342 path: OwnedObjectPath::try_from("/sample/object").unwrap(),
343 },
344 ];
345 let allowlist = HashSet::from(["foo".to_string(), "baz".to_string()]);
346 let blocklist = HashSet::from(["bar".to_string()]);
347
348 let filtered = super::filter_machines(machines, &allowlist, &blocklist);
349
350 assert_eq!(filtered.len(), 2);
351 assert_eq!(filtered[0].name, "foo");
352 assert_eq!(filtered[1].name, "baz");
353 }
354
355 #[test]
356 fn test_decide_cache_action_reuse_on_same_pid() {
357 assert_eq!(decide_cache_action(Some(42), 42), CacheAction::Reuse);
358 }
359
360 #[test]
361 fn test_decide_cache_action_replace_on_pid_change() {
362 assert_eq!(decide_cache_action(Some(42), 99), CacheAction::Replace);
363 }
364
365 #[test]
366 fn test_decide_cache_action_create_on_miss() {
367 assert_eq!(decide_cache_action(None, 42), CacheAction::Create);
368 }
369}