1use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18#[derive(Error, Debug)]
19pub enum MonitordError {
20 #[error("D-Bus connection error: {0}")]
21 ZbusError(#[from] zbus::Error),
22}
23
24pub mod boot;
25pub mod config;
26pub(crate) mod dbus;
27pub mod dbus_stats;
28pub mod json;
29pub mod logging;
30pub mod machines;
31pub mod networkd;
32pub mod pid1;
33pub mod system;
34pub mod timer;
35pub mod unit_constants;
36pub mod units;
37pub mod varlink;
38pub mod varlink_networkd;
39pub mod varlink_units;
40pub mod verify;
41
42pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
43
44#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
53pub struct CollectorTiming {
54 pub name: String,
56 pub start_offset_ms: f64,
59 pub elapsed_ms: f64,
61 pub success: bool,
63}
64
65#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
67pub struct MachineStats {
68 pub networkd: networkd::NetworkdState,
70 pub pid1: Option<pid1::Pid1Stats>,
72 pub system_state: system::SystemdSystemState,
74 pub units: units::SystemdUnitStats,
76 pub version: system::SystemdVersion,
78 pub dbus_stats: Option<dbus_stats::DBusStats>,
80 #[serde(skip_serializing_if = "Option::is_none")]
82 pub boot_blame: Option<boot::BootBlameStats>,
83 pub verify_stats: Option<verify::VerifyStats>,
85}
86
87#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
89pub struct MonitordStats {
90 pub networkd: networkd::NetworkdState,
92 pub pid1: Option<pid1::Pid1Stats>,
94 pub system_state: system::SystemdSystemState,
96 pub units: units::SystemdUnitStats,
98 pub version: system::SystemdVersion,
100 pub dbus_stats: Option<dbus_stats::DBusStats>,
102 pub machines: HashMap<String, MachineStats>,
104 #[serde(skip_serializing_if = "Option::is_none")]
106 pub boot_blame: Option<boot::BootBlameStats>,
107 pub verify_stats: Option<verify::VerifyStats>,
109 pub stat_collection_run_time_ms: f64,
111 pub collector_timings: Vec<CollectorTiming>,
116}
117
118pub fn print_stats(
120 key_prefix: &str,
121 output_format: &config::MonitordOutputFormat,
122 stats: &MonitordStats,
123) {
124 match output_format {
125 config::MonitordOutputFormat::Json => println!(
126 "{}",
127 serde_json::to_string(&stats).expect("Invalid JSON serialization")
128 ),
129 config::MonitordOutputFormat::JsonFlat => println!(
130 "{}",
131 json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
132 ),
133 config::MonitordOutputFormat::JsonPretty => println!(
134 "{}",
135 serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
136 ),
137 }
138}
139
140fn set_stat_collection_run_time(stats: &mut MonitordStats, elapsed_runtime: Duration) {
141 stats.stat_collection_run_time_ms = elapsed_runtime.as_secs_f64() * 1000.0;
142}
143
144type TimedCollectorOutput = (String, anyhow::Result<()>, Duration, Duration);
146
147fn spawn_timed<F>(
153 join_set: &mut tokio::task::JoinSet<TimedCollectorOutput>,
154 name: &'static str,
155 collect_start: Instant,
156 fut: F,
157) where
158 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
159{
160 join_set.spawn(async move {
161 let task_first_poll = Instant::now();
162 let start_offset = task_first_poll.duration_since(collect_start);
163 let result = fut.await;
164 let elapsed = task_first_poll.elapsed();
165 (name.to_string(), result, start_offset, elapsed)
166 });
167}
168
169async fn get_or_create_dbus_connection(
171 config: &config::Config,
172 maybe_connection: Option<zbus::Connection>,
173) -> Result<zbus::Connection, MonitordError> {
174 match maybe_connection {
175 Some(conn) => Ok(conn),
176 None => Ok(zbus::connection::Builder::system()?
177 .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
178 .build()
179 .await?),
180 }
181}
182
183pub async fn stat_collector(
189 config: config::Config,
190 maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
191 output_stats: bool,
192 maybe_connection: Option<zbus::Connection>,
193) -> Result<Option<zbus::Connection>, MonitordError> {
194 let mut collect_interval_ms: u128 = 0;
195 if config.monitord.daemon {
196 collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
197 }
198
199 let config = Arc::new(config);
200 let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
201 maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
202 let locked_machine_stats: Arc<RwLock<MachineStats>> =
203 Arc::new(RwLock::new(MachineStats::default()));
204 let cached_machine_connections: Arc<tokio::sync::Mutex<machines::MachineConnections>> =
205 Arc::new(tokio::sync::Mutex::new(HashMap::new()));
206 std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
207 let sdc = get_or_create_dbus_connection(&config, maybe_connection).await?;
208 let mut join_set: tokio::task::JoinSet<TimedCollectorOutput> = tokio::task::JoinSet::new();
209 let mut had_error;
210
211 loop {
212 let collect_start_time = Instant::now();
213 info!("Starting stat collection run");
214
215 spawn_timed(
218 &mut join_set,
219 "version",
220 collect_start_time,
221 crate::system::update_version(sdc.clone(), locked_machine_stats.clone()),
222 );
223
224 if config.pid1.enabled {
226 spawn_timed(
227 &mut join_set,
228 "pid1",
229 collect_start_time,
230 crate::pid1::update_pid1_stats(1, locked_machine_stats.clone()),
231 );
232 }
233
234 if config.networkd.enabled {
236 let config_clone = Arc::clone(&config);
237 let sdc_clone = sdc.clone();
238 let stats_clone = locked_machine_stats.clone();
239 spawn_timed(&mut join_set, "networkd", collect_start_time, async move {
240 if config_clone.varlink.enabled {
241 let socket_path = crate::varlink_networkd::NETWORK_SOCKET_PATH.to_string();
242 match crate::varlink_networkd::get_networkd_state(&socket_path).await {
243 Ok(networkd_stats) => {
244 let mut machine_stats = stats_clone.write().await;
245 machine_stats.networkd = networkd_stats;
246 return Ok(());
247 }
248 Err(err) => {
249 warn!(
250 "Varlink networkd stats failed, falling back to file-based: {:?}",
251 err
252 );
253 }
254 }
255 }
256 crate::networkd::update_networkd_stats(
257 config_clone.networkd.link_state_dir.clone(),
258 None,
259 sdc_clone,
260 stats_clone,
261 )
262 .await
263 });
264 }
265
266 if config.system_state.enabled {
268 spawn_timed(
269 &mut join_set,
270 "system_state",
271 collect_start_time,
272 crate::system::update_system_stats(sdc.clone(), locked_machine_stats.clone()),
273 );
274 }
275
276 if config.units.enabled {
278 let config_clone = Arc::clone(&config);
279 let sdc_clone = sdc.clone();
280 let stats_clone = locked_machine_stats.clone();
281 spawn_timed(&mut join_set, "units", collect_start_time, async move {
282 if config_clone.varlink.enabled {
283 let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
284 match crate::varlink_units::update_unit_stats(
285 Arc::clone(&config_clone),
286 stats_clone.clone(),
287 socket_path,
288 )
289 .await
290 {
291 Ok(()) => {
292 match crate::timer::collect_all_timers_dbus(&sdc_clone, &config_clone)
294 .await
295 {
296 Ok(timer_stats) => {
297 let mut ms = stats_clone.write().await;
298 ms.units.timer_stats = timer_stats.timer_stats;
299 ms.units.timer_persistent_units =
300 timer_stats.timer_persistent_units;
301 ms.units.timer_remain_after_elapse =
302 timer_stats.timer_remain_after_elapse;
303 }
304 Err(err) => {
305 warn!("Varlink timer stats (D-Bus fallback) failed: {:?}", err);
306 }
307 }
308 if config_clone.units.unit_files {
309 let unit_files = crate::units::collect_unit_files_stats("").await;
310 let mut ms = stats_clone.write().await;
311 ms.units.unit_files = unit_files;
312 }
313 return Ok(());
314 }
315 Err(err) => {
316 warn!(
317 "Varlink units stats failed, falling back to D-Bus: {:?}",
318 err
319 );
320 }
321 }
322 }
323 crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone, String::new())
324 .await
325 });
326 }
327
328 if config.machines.enabled {
329 spawn_timed(
330 &mut join_set,
331 "machines",
332 collect_start_time,
333 crate::machines::update_machines_stats(
334 Arc::clone(&config),
335 sdc.clone(),
336 locked_monitord_stats.clone(),
337 cached_machine_connections.clone(),
338 ),
339 );
340 }
341
342 if config.dbus_stats.enabled {
343 spawn_timed(
344 &mut join_set,
345 "dbus_stats",
346 collect_start_time,
347 crate::dbus_stats::update_dbus_stats(
348 Arc::clone(&config),
349 sdc.clone(),
350 locked_machine_stats.clone(),
351 ),
352 );
353 }
354
355 if config.boot_blame.enabled {
356 spawn_timed(
357 &mut join_set,
358 "boot_blame",
359 collect_start_time,
360 crate::boot::update_boot_blame_stats(
361 Arc::clone(&config),
362 sdc.clone(),
363 locked_machine_stats.clone(),
364 ),
365 );
366 }
367
368 if config.verify.enabled {
369 spawn_timed(
370 &mut join_set,
371 "verify",
372 collect_start_time,
373 crate::verify::update_verify_stats(
374 sdc.clone(),
375 locked_machine_stats.clone(),
376 config.verify.allowlist.clone(),
377 config.verify.blocklist.clone(),
378 ),
379 );
380 }
381
382 if join_set.len() == 1 {
383 warn!("No collectors except systemd version scheduled to run. Exiting");
384 }
385
386 had_error = false;
388 let mut timings: Vec<CollectorTiming> = Vec::new();
389 while let Some(res) = join_set.join_next().await {
390 match res {
391 Ok((name, collector_result, start_offset, elapsed)) => {
392 let success = collector_result.is_ok();
393 if let Err(e) = collector_result {
394 had_error = true;
395 error!("Collector '{}' failure: {:?}", name, e);
396 }
397 timings.push(CollectorTiming {
398 name,
399 start_offset_ms: start_offset.as_secs_f64() * 1000.0,
400 elapsed_ms: elapsed.as_secs_f64() * 1000.0,
401 success,
402 });
403 }
404 Err(e) => {
405 had_error = true;
406 error!("Join error: {:?}", e);
407 }
408 }
409 }
410
411 let elapsed_runtime = collect_start_time.elapsed();
412 let elapsed_runtime_ms = elapsed_runtime.as_millis();
413
414 timings.sort_by(|a, b| {
416 b.elapsed_ms
417 .partial_cmp(&a.elapsed_ms)
418 .unwrap_or(std::cmp::Ordering::Equal)
419 });
420
421 for t in &timings {
424 debug!(
425 "collector '{}' start_offset={:.1}ms elapsed={:.1}ms{}",
426 t.name,
427 t.start_offset_ms,
428 t.elapsed_ms,
429 if t.success { "" } else { " (FAILED)" },
430 );
431 }
432
433 {
434 let mut monitord_stats = locked_monitord_stats.write().await;
436 let machine_stats = locked_machine_stats.read().await;
437 monitord_stats.pid1 = machine_stats.pid1.clone();
438 monitord_stats.networkd = machine_stats.networkd.clone();
439 monitord_stats.system_state = machine_stats.system_state;
440 monitord_stats.version = machine_stats.version.clone();
441 monitord_stats.units = machine_stats.units.clone();
442 monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
443 monitord_stats.boot_blame = machine_stats.boot_blame.clone();
444 monitord_stats.verify_stats = machine_stats.verify_stats.clone();
445 set_stat_collection_run_time(&mut monitord_stats, elapsed_runtime);
446 monitord_stats.collector_timings = timings;
447 }
448
449 info!("stat collection run took {}ms", elapsed_runtime_ms);
450 if output_stats {
451 let monitord_stats = locked_monitord_stats.read().await;
452 print_stats(
453 &config.monitord.key_prefix,
454 &config.monitord.output_format,
455 &monitord_stats,
456 );
457 }
458 if !config.monitord.daemon {
459 break;
460 }
461 let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
462 info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
463 tokio::time::sleep(Duration::from_millis(
464 sleep_time_ms
465 .try_into()
466 .expect("Sleep time does not fit into a u64 :O"),
467 ))
468 .await;
469 }
470 Ok(if had_error { None } else { Some(sdc) })
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476
477 #[test]
478 fn test_stat_collection_run_time_ms_conversion() {
479 let mut stats = MonitordStats::default();
480 set_stat_collection_run_time(&mut stats, Duration::from_millis(5));
481 assert_eq!(stats.stat_collection_run_time_ms, 5.0);
482
483 set_stat_collection_run_time(&mut stats, Duration::from_micros(500));
484 assert!((stats.stat_collection_run_time_ms - 0.5).abs() < f64::EPSILON);
485 }
486}