1use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18#[derive(Error, Debug)]
19pub enum MonitordError {
20 #[error("D-Bus connection error: {0}")]
21 ZbusError(#[from] zbus::Error),
22}
23
24pub mod boot;
25pub mod config;
26pub(crate) mod dbus;
27pub mod dbus_stats;
28pub mod json;
29pub mod logging;
30pub mod machines;
31pub mod networkd;
32pub mod pid1;
33pub mod system;
34pub mod timer;
35pub mod unit_constants;
36pub mod units;
37pub mod varlink;
38pub mod varlink_networkd;
39pub mod varlink_units;
40pub mod verify;
41
42pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
43
44#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
53pub struct CollectorTiming {
54 pub name: String,
56 pub start_offset_ms: f64,
59 pub elapsed_ms: f64,
61 pub success: bool,
63}
64
65#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
67pub struct MachineStats {
68 pub networkd: networkd::NetworkdState,
70 pub pid1: Option<pid1::Pid1Stats>,
72 pub system_state: system::SystemdSystemState,
74 pub units: units::SystemdUnitStats,
76 pub version: system::SystemdVersion,
78 pub dbus_stats: Option<dbus_stats::DBusStats>,
80 #[serde(skip_serializing_if = "Option::is_none")]
82 pub boot_blame: Option<boot::BootBlameStats>,
83 pub verify_stats: Option<verify::VerifyStats>,
85}
86
87#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
89pub struct MonitordStats {
90 pub networkd: networkd::NetworkdState,
92 pub pid1: Option<pid1::Pid1Stats>,
94 pub system_state: system::SystemdSystemState,
96 pub units: units::SystemdUnitStats,
98 pub version: system::SystemdVersion,
100 pub dbus_stats: Option<dbus_stats::DBusStats>,
102 pub machines: HashMap<String, MachineStats>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub boot_blame: Option<boot::BootBlameStats>,
107 pub verify_stats: Option<verify::VerifyStats>,
109 pub stat_collection_run_time_ms: f64,
111 pub collector_timings: Vec<CollectorTiming>,
116}
117
118pub fn print_stats(
120 key_prefix: &str,
121 output_format: &config::MonitordOutputFormat,
122 stats: &MonitordStats,
123) {
124 match output_format {
125 config::MonitordOutputFormat::Json => println!(
126 "{}",
127 serde_json::to_string(&stats).expect("Invalid JSON serialization")
128 ),
129 config::MonitordOutputFormat::JsonFlat => println!(
130 "{}",
131 json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
132 ),
133 config::MonitordOutputFormat::JsonPretty => println!(
134 "{}",
135 serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
136 ),
137 }
138}
139
140fn set_stat_collection_run_time(stats: &mut MonitordStats, elapsed_runtime: Duration) {
141 stats.stat_collection_run_time_ms = elapsed_runtime.as_secs_f64() * 1000.0;
142}
143
144type TimedCollectorOutput = (String, anyhow::Result<()>, Duration, Duration);
146
147fn spawn_timed<F>(
153 join_set: &mut tokio::task::JoinSet<TimedCollectorOutput>,
154 name: &'static str,
155 collect_start: Instant,
156 fut: F,
157) where
158 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
159{
160 join_set.spawn(async move {
161 let task_first_poll = Instant::now();
162 let start_offset = task_first_poll.duration_since(collect_start);
163 let result = fut.await;
164 let elapsed = task_first_poll.elapsed();
165 (name.to_string(), result, start_offset, elapsed)
166 });
167}
168
169async fn get_or_create_dbus_connection(
171 config: &config::Config,
172 maybe_connection: Option<zbus::Connection>,
173) -> Result<zbus::Connection, MonitordError> {
174 match maybe_connection {
175 Some(conn) => Ok(conn),
176 None => Ok(zbus::connection::Builder::system()?
177 .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
178 .build()
179 .await?),
180 }
181}
182
183pub async fn stat_collector(
189 config: config::Config,
190 maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
191 output_stats: bool,
192 maybe_connection: Option<zbus::Connection>,
193) -> Result<Option<zbus::Connection>, MonitordError> {
194 let mut collect_interval_ms: u128 = 0;
195 if config.monitord.daemon {
196 collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
197 }
198
199 let config = Arc::new(config);
200 let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
201 maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
202 let locked_machine_stats: Arc<RwLock<MachineStats>> =
203 Arc::new(RwLock::new(MachineStats::default()));
204 let cached_machine_connections: Arc<tokio::sync::Mutex<machines::MachineConnections>> =
205 Arc::new(tokio::sync::Mutex::new(HashMap::new()));
206 std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
207 let sdc = get_or_create_dbus_connection(&config, maybe_connection).await?;
208 let mut join_set: tokio::task::JoinSet<TimedCollectorOutput> = tokio::task::JoinSet::new();
209 let mut had_error;
210
211 loop {
212 let collect_start_time = Instant::now();
213 info!("Starting stat collection run");
214
215 spawn_timed(
218 &mut join_set,
219 "version",
220 collect_start_time,
221 crate::system::update_version(sdc.clone(), locked_machine_stats.clone()),
222 );
223
224 if config.pid1.enabled {
226 spawn_timed(
227 &mut join_set,
228 "pid1",
229 collect_start_time,
230 crate::pid1::update_pid1_stats(1, locked_machine_stats.clone()),
231 );
232 }
233
234 if config.networkd.enabled {
236 let config_clone = Arc::clone(&config);
237 let sdc_clone = sdc.clone();
238 let stats_clone = locked_machine_stats.clone();
239 spawn_timed(&mut join_set, "networkd", collect_start_time, async move {
240 if config_clone.varlink.enabled {
241 let socket_path = crate::varlink_networkd::NETWORK_SOCKET_PATH.to_string();
242 match crate::varlink_networkd::get_networkd_state(&socket_path).await {
243 Ok(networkd_stats) => {
244 let mut machine_stats = stats_clone.write().await;
245 machine_stats.networkd = networkd_stats;
246 return Ok(());
247 }
248 Err(err) => {
249 warn!(
250 "Varlink networkd stats failed, falling back to file-based: {:?}",
251 err
252 );
253 }
254 }
255 }
256 crate::networkd::update_networkd_stats(
257 config_clone.networkd.link_state_dir.clone(),
258 None,
259 sdc_clone,
260 stats_clone,
261 )
262 .await
263 });
264 }
265
266 if config.system_state.enabled {
268 spawn_timed(
269 &mut join_set,
270 "system_state",
271 collect_start_time,
272 crate::system::update_system_stats(sdc.clone(), locked_machine_stats.clone()),
273 );
274 }
275
276 if config.units.enabled {
278 let config_clone = Arc::clone(&config);
279 let sdc_clone = sdc.clone();
280 let stats_clone = locked_machine_stats.clone();
281 spawn_timed(&mut join_set, "units", collect_start_time, async move {
282 if config_clone.varlink.enabled {
283 let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
284 match crate::varlink_units::update_unit_stats(
285 Arc::clone(&config_clone),
286 stats_clone.clone(),
287 socket_path,
288 )
289 .await
290 {
291 Ok(()) => {
292 match crate::timer::collect_all_timers_dbus(&sdc_clone, &config_clone)
294 .await
295 {
296 Ok(timer_stats) => {
297 let mut ms = stats_clone.write().await;
298 ms.units.timer_stats = timer_stats.timer_stats;
299 ms.units.timer_persistent_units =
300 timer_stats.timer_persistent_units;
301 ms.units.timer_remain_after_elapse =
302 timer_stats.timer_remain_after_elapse;
303 }
304 Err(err) => {
305 warn!("Varlink timer stats (D-Bus fallback) failed: {:?}", err);
306 }
307 }
308 return Ok(());
309 }
310 Err(err) => {
311 warn!(
312 "Varlink units stats failed, falling back to D-Bus: {:?}",
313 err
314 );
315 }
316 }
317 }
318 crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone).await
319 });
320 }
321
322 if config.machines.enabled {
323 spawn_timed(
324 &mut join_set,
325 "machines",
326 collect_start_time,
327 crate::machines::update_machines_stats(
328 Arc::clone(&config),
329 sdc.clone(),
330 locked_monitord_stats.clone(),
331 cached_machine_connections.clone(),
332 ),
333 );
334 }
335
336 if config.dbus_stats.enabled {
337 spawn_timed(
338 &mut join_set,
339 "dbus_stats",
340 collect_start_time,
341 crate::dbus_stats::update_dbus_stats(
342 Arc::clone(&config),
343 sdc.clone(),
344 locked_machine_stats.clone(),
345 ),
346 );
347 }
348
349 if config.boot_blame.enabled {
350 spawn_timed(
351 &mut join_set,
352 "boot_blame",
353 collect_start_time,
354 crate::boot::update_boot_blame_stats(
355 Arc::clone(&config),
356 sdc.clone(),
357 locked_machine_stats.clone(),
358 ),
359 );
360 }
361
362 if config.verify.enabled {
363 spawn_timed(
364 &mut join_set,
365 "verify",
366 collect_start_time,
367 crate::verify::update_verify_stats(
368 sdc.clone(),
369 locked_machine_stats.clone(),
370 config.verify.allowlist.clone(),
371 config.verify.blocklist.clone(),
372 ),
373 );
374 }
375
376 if join_set.len() == 1 {
377 warn!("No collectors except systemd version scheduled to run. Exiting");
378 }
379
380 had_error = false;
382 let mut timings: Vec<CollectorTiming> = Vec::new();
383 while let Some(res) = join_set.join_next().await {
384 match res {
385 Ok((name, collector_result, start_offset, elapsed)) => {
386 let success = collector_result.is_ok();
387 if let Err(e) = collector_result {
388 had_error = true;
389 error!("Collector '{}' failure: {:?}", name, e);
390 }
391 timings.push(CollectorTiming {
392 name,
393 start_offset_ms: start_offset.as_secs_f64() * 1000.0,
394 elapsed_ms: elapsed.as_secs_f64() * 1000.0,
395 success,
396 });
397 }
398 Err(e) => {
399 had_error = true;
400 error!("Join error: {:?}", e);
401 }
402 }
403 }
404
405 let elapsed_runtime = collect_start_time.elapsed();
406 let elapsed_runtime_ms = elapsed_runtime.as_millis();
407
408 timings.sort_by(|a, b| {
410 b.elapsed_ms
411 .partial_cmp(&a.elapsed_ms)
412 .unwrap_or(std::cmp::Ordering::Equal)
413 });
414
415 for t in &timings {
418 debug!(
419 "collector '{}' start_offset={:.1}ms elapsed={:.1}ms{}",
420 t.name,
421 t.start_offset_ms,
422 t.elapsed_ms,
423 if t.success { "" } else { " (FAILED)" },
424 );
425 }
426
427 {
428 let mut monitord_stats = locked_monitord_stats.write().await;
430 let machine_stats = locked_machine_stats.read().await;
431 monitord_stats.pid1 = machine_stats.pid1.clone();
432 monitord_stats.networkd = machine_stats.networkd.clone();
433 monitord_stats.system_state = machine_stats.system_state;
434 monitord_stats.version = machine_stats.version.clone();
435 monitord_stats.units = machine_stats.units.clone();
436 monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
437 monitord_stats.boot_blame = machine_stats.boot_blame.clone();
438 monitord_stats.verify_stats = machine_stats.verify_stats.clone();
439 set_stat_collection_run_time(&mut monitord_stats, elapsed_runtime);
440 monitord_stats.collector_timings = timings;
441 }
442
443 info!("stat collection run took {}ms", elapsed_runtime_ms);
444 if output_stats {
445 let monitord_stats = locked_monitord_stats.read().await;
446 print_stats(
447 &config.monitord.key_prefix,
448 &config.monitord.output_format,
449 &monitord_stats,
450 );
451 }
452 if !config.monitord.daemon {
453 break;
454 }
455 let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
456 info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
457 tokio::time::sleep(Duration::from_millis(
458 sleep_time_ms
459 .try_into()
460 .expect("Sleep time does not fit into a u64 :O"),
461 ))
462 .await;
463 }
464 Ok(if had_error { None } else { Some(sdc) })
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 #[test]
472 fn test_stat_collection_run_time_ms_conversion() {
473 let mut stats = MonitordStats::default();
474 set_stat_collection_run_time(&mut stats, Duration::from_millis(5));
475 assert_eq!(stats.stat_collection_run_time_ms, 5.0);
476
477 set_stat_collection_run_time(&mut stats, Duration::from_micros(500));
478 assert!((stats.stat_collection_run_time_ms - 0.5).abs() < f64::EPSILON);
479 }
480}