monitord/
lib.rs

1//! # monitord Crate
2//!
3//! `monitord` is a library to gather statistics about systemd.
4
5use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::error;
14use tracing::info;
15use tracing::warn;
16
17#[derive(Error, Debug)]
18pub enum MonitordError {
19    #[error("D-Bus connection error: {0}")]
20    ZbusError(#[from] zbus::Error),
21}
22
23pub mod boot;
24pub mod config;
25pub(crate) mod dbus;
26pub mod dbus_stats;
27pub mod json;
28pub mod logging;
29pub mod machines;
30pub mod networkd;
31pub mod pid1;
32pub mod system;
33pub mod timer;
34pub mod unit_constants;
35pub mod units;
36pub mod varlink;
37pub mod varlink_units;
38pub mod verify;
39
40pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
41
42/// Stats collected for a single systemd-nspawn container or VM managed by systemd-machined
43#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
44pub struct MachineStats {
45    /// systemd-networkd interface states inside the container
46    pub networkd: networkd::NetworkdState,
47    /// PID 1 process stats from procfs (using the container's leader PID)
48    pub pid1: Option<pid1::Pid1Stats>,
49    /// Overall systemd system state (e.g. running, degraded) inside the container
50    pub system_state: system::SystemdSystemState,
51    /// Aggregated systemd unit counts and per-service/timer stats inside the container
52    pub units: units::SystemdUnitStats,
53    /// systemd version running inside the container
54    pub version: system::SystemdVersion,
55    /// D-Bus daemon/broker statistics inside the container
56    pub dbus_stats: Option<dbus_stats::DBusStats>,
57    /// Boot blame statistics: slowest units at boot with activation times in seconds
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub boot_blame: Option<boot::BootBlameStats>,
60    /// Unit verification error statistics
61    pub verify_stats: Option<verify::VerifyStats>,
62}
63
64/// Root struct containing all enabled monitord metrics for the host system and containers
65#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
66pub struct MonitordStats {
67    /// systemd-networkd interface states and managed interface count
68    pub networkd: networkd::NetworkdState,
69    /// PID 1 (systemd) process stats from procfs: CPU, memory, FDs, tasks
70    pub pid1: Option<pid1::Pid1Stats>,
71    /// Overall systemd manager state (e.g. running, degraded, initializing)
72    pub system_state: system::SystemdSystemState,
73    /// Aggregated systemd unit counts by type/state and per-service/timer detailed metrics
74    pub units: units::SystemdUnitStats,
75    /// Installed systemd version (major.minor.revision.os)
76    pub version: system::SystemdVersion,
77    /// D-Bus daemon/broker statistics (connections, bus names, match rules, per-peer accounting)
78    pub dbus_stats: Option<dbus_stats::DBusStats>,
79    /// Per-container stats keyed by machine name, collected via systemd-machined
80    pub machines: HashMap<String, MachineStats>,
81    /// Boot blame statistics: slowest units at boot with activation times in seconds
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub boot_blame: Option<boot::BootBlameStats>,
84    /// Unit verification error statistics
85    pub verify_stats: Option<verify::VerifyStats>,
86}
87
88/// Print statistics in the format set in configuration
89pub fn print_stats(
90    key_prefix: &str,
91    output_format: &config::MonitordOutputFormat,
92    stats: &MonitordStats,
93) {
94    match output_format {
95        config::MonitordOutputFormat::Json => println!(
96            "{}",
97            serde_json::to_string(&stats).expect("Invalid JSON serialization")
98        ),
99        config::MonitordOutputFormat::JsonFlat => println!(
100            "{}",
101            json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
102        ),
103        config::MonitordOutputFormat::JsonPretty => println!(
104            "{}",
105            serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
106        ),
107    }
108}
109
110/// Reuse an existing D-Bus connection or create a new system bus connection.
111async fn get_or_create_dbus_connection(
112    config: &config::Config,
113    maybe_connection: Option<zbus::Connection>,
114) -> Result<zbus::Connection, MonitordError> {
115    match maybe_connection {
116        Some(conn) => Ok(conn),
117        None => Ok(zbus::connection::Builder::system()?
118            .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
119            .build()
120            .await?),
121    }
122}
123
124/// Main statictic collection function running what's required by configuration in parallel
125/// Takes an optional locked stats struct to update and to output stats to STDOUT or not.
126/// Takes an optional D-Bus connection. Returns `Some(connection)` if the
127/// collection cycle completed without errors (meaning the connection is reusable),
128/// `None` if errors occurred.
129pub async fn stat_collector(
130    config: config::Config,
131    maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
132    output_stats: bool,
133    maybe_connection: Option<zbus::Connection>,
134) -> Result<Option<zbus::Connection>, MonitordError> {
135    let mut collect_interval_ms: u128 = 0;
136    if config.monitord.daemon {
137        collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
138    }
139
140    let config = Arc::new(config);
141    let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
142        maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
143    let locked_machine_stats: Arc<RwLock<MachineStats>> =
144        Arc::new(RwLock::new(MachineStats::default()));
145    let cached_machine_connections: Arc<tokio::sync::Mutex<machines::MachineConnections>> =
146        Arc::new(tokio::sync::Mutex::new(HashMap::new()));
147    std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
148    let sdc = get_or_create_dbus_connection(&config, maybe_connection).await?;
149    let mut join_set = tokio::task::JoinSet::new();
150    let mut had_error;
151
152    loop {
153        let collect_start_time = Instant::now();
154        info!("Starting stat collection run");
155
156        // Always collect systemd version
157
158        join_set.spawn(crate::system::update_version(
159            sdc.clone(),
160            locked_machine_stats.clone(),
161        ));
162
163        // Collect pid1 procfs stats
164        if config.pid1.enabled {
165            join_set.spawn(crate::pid1::update_pid1_stats(
166                1,
167                locked_machine_stats.clone(),
168            ));
169        }
170
171        // Run networkd collector if enabled
172        if config.networkd.enabled {
173            join_set.spawn(crate::networkd::update_networkd_stats(
174                config.networkd.link_state_dir.clone(),
175                None,
176                sdc.clone(),
177                locked_machine_stats.clone(),
178            ));
179        }
180
181        // Run system running (SystemState) state collector
182        if config.system_state.enabled {
183            join_set.spawn(crate::system::update_system_stats(
184                sdc.clone(),
185                locked_machine_stats.clone(),
186            ));
187        }
188
189        // Run service collectors if there are services listed in config
190        if config.units.enabled {
191            let config_clone = Arc::clone(&config);
192            let sdc_clone = sdc.clone();
193            let stats_clone = locked_machine_stats.clone();
194            join_set.spawn(async move {
195                if config_clone.varlink.enabled {
196                    let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
197                    match crate::varlink_units::update_unit_stats(
198                        Arc::clone(&config_clone),
199                        stats_clone.clone(),
200                        socket_path,
201                    )
202                    .await
203                    {
204                        Ok(()) => return Ok(()),
205                        Err(err) => {
206                            warn!(
207                                "Varlink units stats failed, falling back to D-Bus: {:?}",
208                                err
209                            );
210                        }
211                    }
212                }
213                crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone).await
214            });
215        }
216
217        if config.machines.enabled {
218            join_set.spawn(crate::machines::update_machines_stats(
219                Arc::clone(&config),
220                sdc.clone(),
221                locked_monitord_stats.clone(),
222                cached_machine_connections.clone(),
223            ));
224        }
225
226        if config.dbus_stats.enabled {
227            join_set.spawn(crate::dbus_stats::update_dbus_stats(
228                Arc::clone(&config),
229                sdc.clone(),
230                locked_machine_stats.clone(),
231            ));
232        }
233
234        if config.boot_blame.enabled {
235            join_set.spawn(crate::boot::update_boot_blame_stats(
236                Arc::clone(&config),
237                sdc.clone(),
238                locked_machine_stats.clone(),
239            ));
240        }
241
242        if config.verify.enabled {
243            join_set.spawn(crate::verify::update_verify_stats(
244                sdc.clone(),
245                locked_machine_stats.clone(),
246                config.verify.allowlist.clone(),
247                config.verify.blocklist.clone(),
248            ));
249        }
250
251        if join_set.len() == 1 {
252            warn!("No collectors except systemd version scheduled to run. Exiting");
253        }
254
255        // Check all collection for errors and log if one fails
256        had_error = false;
257        while let Some(res) = join_set.join_next().await {
258            match res {
259                Ok(r) => match r {
260                    Ok(_) => (),
261                    Err(e) => {
262                        had_error = true;
263                        error!("Collection specific failure: {:?}", e);
264                    }
265                },
266                Err(e) => {
267                    had_error = true;
268                    error!("Join error: {:?}", e);
269                }
270            }
271        }
272
273        {
274            // Update monitord stats with machine stats
275            let mut monitord_stats = locked_monitord_stats.write().await;
276            let machine_stats = locked_machine_stats.read().await;
277            monitord_stats.pid1 = machine_stats.pid1.clone();
278            monitord_stats.networkd = machine_stats.networkd.clone();
279            monitord_stats.system_state = machine_stats.system_state;
280            monitord_stats.version = machine_stats.version.clone();
281            monitord_stats.units = machine_stats.units.clone();
282            monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
283            monitord_stats.boot_blame = machine_stats.boot_blame.clone();
284            monitord_stats.verify_stats = machine_stats.verify_stats.clone();
285        }
286
287        let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
288
289        info!("stat collection run took {}ms", elapsed_runtime_ms);
290        if output_stats {
291            let monitord_stats = locked_monitord_stats.read().await;
292            print_stats(
293                &config.monitord.key_prefix,
294                &config.monitord.output_format,
295                &monitord_stats,
296            );
297        }
298        if !config.monitord.daemon {
299            break;
300        }
301        let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
302        info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
303        tokio::time::sleep(Duration::from_millis(
304            sleep_time_ms
305                .try_into()
306                .expect("Sleep time does not fit into a u64 :O"),
307        ))
308        .await;
309    }
310    Ok(if had_error { None } else { Some(sdc) })
311}