Skip to main content

monitord/
lib.rs

1//! # monitord Crate
2//!
3//! `monitord` is a library to gather statistics about systemd.
4
5use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing::warn;
17
18#[derive(Error, Debug)]
19pub enum MonitordError {
20    #[error("D-Bus connection error: {0}")]
21    ZbusError(#[from] zbus::Error),
22}
23
24pub mod boot;
25pub mod config;
26pub(crate) mod dbus;
27pub mod dbus_stats;
28pub mod json;
29pub mod logging;
30pub mod machines;
31pub mod networkd;
32pub mod pid1;
33pub mod system;
34pub mod timer;
35pub mod unit_constants;
36pub mod units;
37pub mod varlink;
38pub mod varlink_networkd;
39pub mod varlink_units;
40pub mod verify;
41
42pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
43
44/// Per-collector timing for a single stat collection run.
45///
46/// `start_offset_ms` is the wall time between the top of the collection cycle and
47/// the moment this collector's future was first polled. A non-trivial offset
48/// indicates the spawn/scheduling loop or the runtime is delaying first poll,
49/// which means collectors are not starting in parallel as intended.
50///
51/// `elapsed_ms` is the wall time between first poll and completion.
52#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
53pub struct CollectorTiming {
54    /// Name of the collector (e.g. "units", "pid1", "dbus_stats")
55    pub name: String,
56    /// Milliseconds from top of the run until the spawned future's first poll.
57    /// Should be small (< a few ms) when collectors are truly running in parallel.
58    pub start_offset_ms: f64,
59    /// Milliseconds from first poll to future completion.
60    pub elapsed_ms: f64,
61    /// Whether the collector returned Ok.
62    pub success: bool,
63}
64
65/// Stats collected for a single systemd-nspawn container or VM managed by systemd-machined
66#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
67pub struct MachineStats {
68    /// systemd-networkd interface states inside the container
69    pub networkd: networkd::NetworkdState,
70    /// PID 1 process stats from procfs (using the container's leader PID)
71    pub pid1: Option<pid1::Pid1Stats>,
72    /// Overall systemd system state (e.g. running, degraded) inside the container
73    pub system_state: system::SystemdSystemState,
74    /// Aggregated systemd unit counts and per-service/timer stats inside the container
75    pub units: units::SystemdUnitStats,
76    /// systemd version running inside the container
77    pub version: system::SystemdVersion,
78    /// D-Bus daemon/broker statistics inside the container
79    pub dbus_stats: Option<dbus_stats::DBusStats>,
80    /// Boot blame statistics: slowest units at boot with activation times in seconds
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub boot_blame: Option<boot::BootBlameStats>,
83    /// Unit verification error statistics
84    pub verify_stats: Option<verify::VerifyStats>,
85}
86
87/// Root struct containing all enabled monitord metrics for the host system and containers
88#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
89pub struct MonitordStats {
90    /// systemd-networkd interface states and managed interface count
91    pub networkd: networkd::NetworkdState,
92    /// PID 1 (systemd) process stats from procfs: CPU, memory, FDs, tasks
93    pub pid1: Option<pid1::Pid1Stats>,
94    /// Overall systemd manager state (e.g. running, degraded, initializing)
95    pub system_state: system::SystemdSystemState,
96    /// Aggregated systemd unit counts by type/state and per-service/timer detailed metrics
97    pub units: units::SystemdUnitStats,
98    /// Installed systemd version (major.minor.revision.os)
99    pub version: system::SystemdVersion,
100    /// D-Bus daemon/broker statistics (connections, bus names, match rules, per-peer accounting)
101    pub dbus_stats: Option<dbus_stats::DBusStats>,
102    /// Per-container stats keyed by machine name, collected via systemd-machined
103    pub machines: HashMap<String, MachineStats>,
104    /// Boot blame statistics: slowest units at boot with activation times in seconds
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub boot_blame: Option<boot::BootBlameStats>,
107    /// Unit verification error statistics
108    pub verify_stats: Option<verify::VerifyStats>,
109    /// End-to-end duration of the last stat collection run in milliseconds.
110    pub stat_collection_run_time_ms: f64,
111    /// Per-collector timings from the last run, sorted slowest first. Empty
112    /// before the first run completes. Callers compute parallelism ratio
113    /// (sum of `elapsed_ms` / `stat_collection_run_time_ms`) and identify the
114    /// gating collector (first entry) directly from this vector.
115    pub collector_timings: Vec<CollectorTiming>,
116}
117
118/// Print statistics in the format set in configuration
119pub fn print_stats(
120    key_prefix: &str,
121    output_format: &config::MonitordOutputFormat,
122    stats: &MonitordStats,
123) {
124    match output_format {
125        config::MonitordOutputFormat::Json => println!(
126            "{}",
127            serde_json::to_string(&stats).expect("Invalid JSON serialization")
128        ),
129        config::MonitordOutputFormat::JsonFlat => println!(
130            "{}",
131            json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
132        ),
133        config::MonitordOutputFormat::JsonPretty => println!(
134            "{}",
135            serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
136        ),
137    }
138}
139
140fn set_stat_collection_run_time(stats: &mut MonitordStats, elapsed_runtime: Duration) {
141    stats.stat_collection_run_time_ms = elapsed_runtime.as_secs_f64() * 1000.0;
142}
143
144/// Output produced by every spawned collector future after wrapping with timing.
145type TimedCollectorOutput = (String, anyhow::Result<()>, Duration, Duration);
146
147/// Spawn a collector future onto the join set with timing instrumentation.
148///
149/// The wrapping closure records the moment the future is first polled (relative
150/// to `collect_start`) and the elapsed wall time until it completes. Both
151/// durations and the collector name are returned alongside the original result.
152fn spawn_timed<F>(
153    join_set: &mut tokio::task::JoinSet<TimedCollectorOutput>,
154    name: &'static str,
155    collect_start: Instant,
156    fut: F,
157) where
158    F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
159{
160    join_set.spawn(async move {
161        let task_first_poll = Instant::now();
162        let start_offset = task_first_poll.duration_since(collect_start);
163        let result = fut.await;
164        let elapsed = task_first_poll.elapsed();
165        (name.to_string(), result, start_offset, elapsed)
166    });
167}
168
169/// Reuse an existing D-Bus connection or create a new system bus connection.
170async fn get_or_create_dbus_connection(
171    config: &config::Config,
172    maybe_connection: Option<zbus::Connection>,
173) -> Result<zbus::Connection, MonitordError> {
174    match maybe_connection {
175        Some(conn) => Ok(conn),
176        None => Ok(zbus::connection::Builder::system()?
177            .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
178            .build()
179            .await?),
180    }
181}
182
183/// Main statistic collection function running what's required by configuration in parallel
184/// Takes an optional locked stats struct to update and to output stats to STDOUT or not.
185/// Takes an optional D-Bus connection. Returns `Some(connection)` if the
186/// collection cycle completed without errors (meaning the connection is reusable),
187/// `None` if errors occurred.
188pub async fn stat_collector(
189    config: config::Config,
190    maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
191    output_stats: bool,
192    maybe_connection: Option<zbus::Connection>,
193) -> Result<Option<zbus::Connection>, MonitordError> {
194    let mut collect_interval_ms: u128 = 0;
195    if config.monitord.daemon {
196        collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
197    }
198
199    let config = Arc::new(config);
200    let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
201        maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
202    let locked_machine_stats: Arc<RwLock<MachineStats>> =
203        Arc::new(RwLock::new(MachineStats::default()));
204    let cached_machine_connections: Arc<tokio::sync::Mutex<machines::MachineConnections>> =
205        Arc::new(tokio::sync::Mutex::new(HashMap::new()));
206    std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
207    let sdc = get_or_create_dbus_connection(&config, maybe_connection).await?;
208    let mut join_set: tokio::task::JoinSet<TimedCollectorOutput> = tokio::task::JoinSet::new();
209    let mut had_error;
210
211    loop {
212        let collect_start_time = Instant::now();
213        info!("Starting stat collection run");
214
215        // Always collect systemd version
216
217        spawn_timed(
218            &mut join_set,
219            "version",
220            collect_start_time,
221            crate::system::update_version(sdc.clone(), locked_machine_stats.clone()),
222        );
223
224        // Collect pid1 procfs stats
225        if config.pid1.enabled {
226            spawn_timed(
227                &mut join_set,
228                "pid1",
229                collect_start_time,
230                crate::pid1::update_pid1_stats(1, locked_machine_stats.clone()),
231            );
232        }
233
234        // Run networkd collector if enabled
235        if config.networkd.enabled {
236            let config_clone = Arc::clone(&config);
237            let sdc_clone = sdc.clone();
238            let stats_clone = locked_machine_stats.clone();
239            spawn_timed(&mut join_set, "networkd", collect_start_time, async move {
240                if config_clone.varlink.enabled {
241                    let socket_path = crate::varlink_networkd::NETWORK_SOCKET_PATH.to_string();
242                    match crate::varlink_networkd::get_networkd_state(&socket_path).await {
243                        Ok(networkd_stats) => {
244                            let mut machine_stats = stats_clone.write().await;
245                            machine_stats.networkd = networkd_stats;
246                            return Ok(());
247                        }
248                        Err(err) => {
249                            warn!(
250                                "Varlink networkd stats failed, falling back to file-based: {:?}",
251                                err
252                            );
253                        }
254                    }
255                }
256                crate::networkd::update_networkd_stats(
257                    config_clone.networkd.link_state_dir.clone(),
258                    None,
259                    sdc_clone,
260                    stats_clone,
261                )
262                .await
263            });
264        }
265
266        // Run system running (SystemState) state collector
267        if config.system_state.enabled {
268            spawn_timed(
269                &mut join_set,
270                "system_state",
271                collect_start_time,
272                crate::system::update_system_stats(sdc.clone(), locked_machine_stats.clone()),
273            );
274        }
275
276        // Run service collectors if there are services listed in config
277        if config.units.enabled {
278            let config_clone = Arc::clone(&config);
279            let sdc_clone = sdc.clone();
280            let stats_clone = locked_machine_stats.clone();
281            spawn_timed(&mut join_set, "units", collect_start_time, async move {
282                if config_clone.varlink.enabled {
283                    let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
284                    match crate::varlink_units::update_unit_stats(
285                        Arc::clone(&config_clone),
286                        stats_clone.clone(),
287                        socket_path,
288                    )
289                    .await
290                    {
291                        Ok(()) => {
292                            // Timer properties are not yet exposed via varlink; collect via D-Bus.
293                            match crate::timer::collect_all_timers_dbus(&sdc_clone, &config_clone)
294                                .await
295                            {
296                                Ok(timer_stats) => {
297                                    let mut ms = stats_clone.write().await;
298                                    ms.units.timer_stats = timer_stats.timer_stats;
299                                    ms.units.timer_persistent_units =
300                                        timer_stats.timer_persistent_units;
301                                    ms.units.timer_remain_after_elapse =
302                                        timer_stats.timer_remain_after_elapse;
303                                }
304                                Err(err) => {
305                                    warn!("Varlink timer stats (D-Bus fallback) failed: {:?}", err);
306                                }
307                            }
308                            return Ok(());
309                        }
310                        Err(err) => {
311                            warn!(
312                                "Varlink units stats failed, falling back to D-Bus: {:?}",
313                                err
314                            );
315                        }
316                    }
317                }
318                crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone).await
319            });
320        }
321
322        if config.machines.enabled {
323            spawn_timed(
324                &mut join_set,
325                "machines",
326                collect_start_time,
327                crate::machines::update_machines_stats(
328                    Arc::clone(&config),
329                    sdc.clone(),
330                    locked_monitord_stats.clone(),
331                    cached_machine_connections.clone(),
332                ),
333            );
334        }
335
336        if config.dbus_stats.enabled {
337            spawn_timed(
338                &mut join_set,
339                "dbus_stats",
340                collect_start_time,
341                crate::dbus_stats::update_dbus_stats(
342                    Arc::clone(&config),
343                    sdc.clone(),
344                    locked_machine_stats.clone(),
345                ),
346            );
347        }
348
349        if config.boot_blame.enabled {
350            spawn_timed(
351                &mut join_set,
352                "boot_blame",
353                collect_start_time,
354                crate::boot::update_boot_blame_stats(
355                    Arc::clone(&config),
356                    sdc.clone(),
357                    locked_machine_stats.clone(),
358                ),
359            );
360        }
361
362        if config.verify.enabled {
363            spawn_timed(
364                &mut join_set,
365                "verify",
366                collect_start_time,
367                crate::verify::update_verify_stats(
368                    sdc.clone(),
369                    locked_machine_stats.clone(),
370                    config.verify.allowlist.clone(),
371                    config.verify.blocklist.clone(),
372                ),
373            );
374        }
375
376        if join_set.len() == 1 {
377            warn!("No collectors except systemd version scheduled to run. Exiting");
378        }
379
380        // Drain join_set, collect per-collector timings + log per-collector failures
381        had_error = false;
382        let mut timings: Vec<CollectorTiming> = Vec::new();
383        while let Some(res) = join_set.join_next().await {
384            match res {
385                Ok((name, collector_result, start_offset, elapsed)) => {
386                    let success = collector_result.is_ok();
387                    if let Err(e) = collector_result {
388                        had_error = true;
389                        error!("Collector '{}' failure: {:?}", name, e);
390                    }
391                    timings.push(CollectorTiming {
392                        name,
393                        start_offset_ms: start_offset.as_secs_f64() * 1000.0,
394                        elapsed_ms: elapsed.as_secs_f64() * 1000.0,
395                        success,
396                    });
397                }
398                Err(e) => {
399                    had_error = true;
400                    error!("Join error: {:?}", e);
401                }
402            }
403        }
404
405        let elapsed_runtime = collect_start_time.elapsed();
406        let elapsed_runtime_ms = elapsed_runtime.as_millis();
407
408        // Sort timings by elapsed desc so the slowest collector is first in the JSON output
409        timings.sort_by(|a, b| {
410            b.elapsed_ms
411                .partial_cmp(&a.elapsed_ms)
412                .unwrap_or(std::cmp::Ordering::Equal)
413        });
414
415        // Per-collector lines log at debug! to keep daemon-mode noise low.
416        // The same data is on MonitordStats::collector_timings for callers that need it.
417        for t in &timings {
418            debug!(
419                "collector '{}' start_offset={:.1}ms elapsed={:.1}ms{}",
420                t.name,
421                t.start_offset_ms,
422                t.elapsed_ms,
423                if t.success { "" } else { " (FAILED)" },
424            );
425        }
426
427        {
428            // Update monitord stats with machine stats
429            let mut monitord_stats = locked_monitord_stats.write().await;
430            let machine_stats = locked_machine_stats.read().await;
431            monitord_stats.pid1 = machine_stats.pid1.clone();
432            monitord_stats.networkd = machine_stats.networkd.clone();
433            monitord_stats.system_state = machine_stats.system_state;
434            monitord_stats.version = machine_stats.version.clone();
435            monitord_stats.units = machine_stats.units.clone();
436            monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
437            monitord_stats.boot_blame = machine_stats.boot_blame.clone();
438            monitord_stats.verify_stats = machine_stats.verify_stats.clone();
439            set_stat_collection_run_time(&mut monitord_stats, elapsed_runtime);
440            monitord_stats.collector_timings = timings;
441        }
442
443        info!("stat collection run took {}ms", elapsed_runtime_ms);
444        if output_stats {
445            let monitord_stats = locked_monitord_stats.read().await;
446            print_stats(
447                &config.monitord.key_prefix,
448                &config.monitord.output_format,
449                &monitord_stats,
450            );
451        }
452        if !config.monitord.daemon {
453            break;
454        }
455        let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
456        info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
457        tokio::time::sleep(Duration::from_millis(
458            sleep_time_ms
459                .try_into()
460                .expect("Sleep time does not fit into a u64 :O"),
461        ))
462        .await;
463    }
464    Ok(if had_error { None } else { Some(sdc) })
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    #[test]
472    fn test_stat_collection_run_time_ms_conversion() {
473        let mut stats = MonitordStats::default();
474        set_stat_collection_run_time(&mut stats, Duration::from_millis(5));
475        assert_eq!(stats.stat_collection_run_time_ms, 5.0);
476
477        set_stat_collection_run_time(&mut stats, Duration::from_micros(500));
478        assert!((stats.stat_collection_run_time_ms - 0.5).abs() < f64::EPSILON);
479    }
480}