monitord/
lib.rs

1//! # monitord Crate
2//!
3//! `monitord` is a library to gather statistics about systemd.
4
5use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::error;
14use tracing::info;
15use tracing::warn;
16
17#[derive(Error, Debug)]
18pub enum MonitordError {
19    #[error("D-Bus connection error: {0}")]
20    ZbusError(#[from] zbus::Error),
21}
22
23pub mod boot;
24pub mod config;
25pub(crate) mod dbus;
26pub mod dbus_stats;
27pub mod json;
28pub mod logging;
29pub mod machines;
30pub mod networkd;
31pub mod pid1;
32pub mod system;
33pub mod timer;
34pub mod units;
35pub mod verify;
36
37pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
38
39/// Stats collected for a single systemd-nspawn container or VM managed by systemd-machined
40#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
41pub struct MachineStats {
42    /// systemd-networkd interface states inside the container
43    pub networkd: networkd::NetworkdState,
44    /// PID 1 process stats from procfs (using the container's leader PID)
45    pub pid1: Option<pid1::Pid1Stats>,
46    /// Overall systemd system state (e.g. running, degraded) inside the container
47    pub system_state: system::SystemdSystemState,
48    /// Aggregated systemd unit counts and per-service/timer stats inside the container
49    pub units: units::SystemdUnitStats,
50    /// systemd version running inside the container
51    pub version: system::SystemdVersion,
52    /// D-Bus daemon/broker statistics inside the container
53    pub dbus_stats: Option<dbus_stats::DBusStats>,
54    /// Boot blame statistics: slowest units at boot with activation times in seconds
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub boot_blame: Option<boot::BootBlameStats>,
57    /// Unit verification error statistics
58    pub verify_stats: Option<verify::VerifyStats>,
59}
60
61/// Root struct containing all enabled monitord metrics for the host system and containers
62#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
63pub struct MonitordStats {
64    /// systemd-networkd interface states and managed interface count
65    pub networkd: networkd::NetworkdState,
66    /// PID 1 (systemd) process stats from procfs: CPU, memory, FDs, tasks
67    pub pid1: Option<pid1::Pid1Stats>,
68    /// Overall systemd manager state (e.g. running, degraded, initializing)
69    pub system_state: system::SystemdSystemState,
70    /// Aggregated systemd unit counts by type/state and per-service/timer detailed metrics
71    pub units: units::SystemdUnitStats,
72    /// Installed systemd version (major.minor.revision.os)
73    pub version: system::SystemdVersion,
74    /// D-Bus daemon/broker statistics (connections, bus names, match rules, per-peer accounting)
75    pub dbus_stats: Option<dbus_stats::DBusStats>,
76    /// Per-container stats keyed by machine name, collected via systemd-machined
77    pub machines: HashMap<String, MachineStats>,
78    /// Boot blame statistics: slowest units at boot with activation times in seconds
79    #[serde(skip_serializing_if = "Option::is_none")]
80    pub boot_blame: Option<boot::BootBlameStats>,
81    /// Unit verification error statistics
82    pub verify_stats: Option<verify::VerifyStats>,
83}
84
85/// Print statistics in the format set in configuration
86pub fn print_stats(
87    key_prefix: &str,
88    output_format: &config::MonitordOutputFormat,
89    stats: &MonitordStats,
90) {
91    match output_format {
92        config::MonitordOutputFormat::Json => println!(
93            "{}",
94            serde_json::to_string(&stats).expect("Invalid JSON serialization")
95        ),
96        config::MonitordOutputFormat::JsonFlat => println!(
97            "{}",
98            json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
99        ),
100        config::MonitordOutputFormat::JsonPretty => println!(
101            "{}",
102            serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
103        ),
104    }
105}
106
107/// Main statictic collection function running what's required by configuration in parallel
108/// Takes an optional locked stats struct to update and to output stats to STDOUT or not
109pub async fn stat_collector(
110    config: config::Config,
111    maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
112    output_stats: bool,
113) -> Result<(), MonitordError> {
114    let mut collect_interval_ms: u128 = 0;
115    if config.monitord.daemon {
116        collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
117    }
118
119    let config = Arc::new(config);
120    let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
121        maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
122    let locked_machine_stats: Arc<RwLock<MachineStats>> =
123        Arc::new(RwLock::new(MachineStats::default()));
124    std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
125    let sdc = zbus::connection::Builder::system()?
126        .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
127        .build()
128        .await?;
129    let mut join_set = tokio::task::JoinSet::new();
130
131    loop {
132        let collect_start_time = Instant::now();
133        info!("Starting stat collection run");
134
135        // Always collect systemd version
136
137        join_set.spawn(crate::system::update_version(
138            sdc.clone(),
139            locked_machine_stats.clone(),
140        ));
141
142        // Collect pid1 procfs stats
143        if config.pid1.enabled {
144            join_set.spawn(crate::pid1::update_pid1_stats(
145                1,
146                locked_machine_stats.clone(),
147            ));
148        }
149
150        // Run networkd collector if enabled
151        if config.networkd.enabled {
152            join_set.spawn(crate::networkd::update_networkd_stats(
153                config.networkd.link_state_dir.clone(),
154                None,
155                sdc.clone(),
156                locked_machine_stats.clone(),
157            ));
158        }
159
160        // Run system running (SystemState) state collector
161        if config.system_state.enabled {
162            join_set.spawn(crate::system::update_system_stats(
163                sdc.clone(),
164                locked_machine_stats.clone(),
165            ));
166        }
167
168        // Run service collectors if there are services listed in config
169        if config.units.enabled {
170            join_set.spawn(crate::units::update_unit_stats(
171                Arc::clone(&config),
172                sdc.clone(),
173                locked_machine_stats.clone(),
174            ));
175        }
176
177        if config.machines.enabled {
178            join_set.spawn(crate::machines::update_machines_stats(
179                Arc::clone(&config),
180                sdc.clone(),
181                locked_monitord_stats.clone(),
182            ));
183        }
184
185        if config.dbus_stats.enabled {
186            join_set.spawn(crate::dbus_stats::update_dbus_stats(
187                Arc::clone(&config),
188                sdc.clone(),
189                locked_machine_stats.clone(),
190            ));
191        }
192
193        if config.boot_blame.enabled {
194            join_set.spawn(crate::boot::update_boot_blame_stats(
195                Arc::clone(&config),
196                sdc.clone(),
197                locked_machine_stats.clone(),
198            ));
199        }
200
201        if config.verify.enabled {
202            join_set.spawn(crate::verify::update_verify_stats(
203                sdc.clone(),
204                locked_machine_stats.clone(),
205                config.verify.allowlist.clone(),
206                config.verify.blocklist.clone(),
207            ));
208        }
209
210        if join_set.len() == 1 {
211            warn!("No collectors except systemd version scheduled to run. Exiting");
212        }
213
214        // Check all collection for errors and log if one fails
215        while let Some(res) = join_set.join_next().await {
216            match res {
217                Ok(r) => match r {
218                    Ok(_) => (),
219                    Err(e) => {
220                        error!("Collection specific failure: {:?}", e);
221                    }
222                },
223                Err(e) => {
224                    error!("Join error: {:?}", e);
225                }
226            }
227        }
228
229        {
230            // Update monitord stats with machine stats
231            let mut monitord_stats = locked_monitord_stats.write().await;
232            let machine_stats = locked_machine_stats.read().await;
233            monitord_stats.pid1 = machine_stats.pid1.clone();
234            monitord_stats.networkd = machine_stats.networkd.clone();
235            monitord_stats.system_state = machine_stats.system_state;
236            monitord_stats.version = machine_stats.version.clone();
237            monitord_stats.units = machine_stats.units.clone();
238            monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
239            monitord_stats.boot_blame = machine_stats.boot_blame.clone();
240            monitord_stats.verify_stats = machine_stats.verify_stats.clone();
241        }
242
243        let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
244
245        info!("stat collection run took {}ms", elapsed_runtime_ms);
246        if output_stats {
247            let monitord_stats = locked_monitord_stats.read().await;
248            print_stats(
249                &config.monitord.key_prefix,
250                &config.monitord.output_format,
251                &monitord_stats,
252            );
253        }
254        if !config.monitord.daemon {
255            break;
256        }
257        let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
258        info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
259        tokio::time::sleep(Duration::from_millis(
260            sleep_time_ms
261                .try_into()
262                .expect("Sleep time does not fit into a u64 :O"),
263        ))
264        .await;
265    }
266    Ok(())
267}