monitord/
lib.rs

1//! # monitord Crate
2//!
3//! `monitord` is a library to gather statistics about systemd.
4
5use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::error;
14use tracing::info;
15use tracing::warn;
16
17#[derive(Error, Debug)]
18pub enum MonitordError {
19    #[error("D-Bus connection error: {0}")]
20    ZbusError(#[from] zbus::Error),
21}
22
23pub mod boot;
24pub mod config;
25pub(crate) mod dbus;
26pub mod dbus_stats;
27pub mod json;
28pub mod logging;
29pub mod machines;
30pub mod networkd;
31pub mod pid1;
32pub mod system;
33pub mod timer;
34pub mod unit_constants;
35pub mod units;
36pub mod varlink;
37pub mod varlink_units;
38pub mod verify;
39
40pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
41
42/// Stats collected for a single systemd-nspawn container or VM managed by systemd-machined
43#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
44pub struct MachineStats {
45    /// systemd-networkd interface states inside the container
46    pub networkd: networkd::NetworkdState,
47    /// PID 1 process stats from procfs (using the container's leader PID)
48    pub pid1: Option<pid1::Pid1Stats>,
49    /// Overall systemd system state (e.g. running, degraded) inside the container
50    pub system_state: system::SystemdSystemState,
51    /// Aggregated systemd unit counts and per-service/timer stats inside the container
52    pub units: units::SystemdUnitStats,
53    /// systemd version running inside the container
54    pub version: system::SystemdVersion,
55    /// D-Bus daemon/broker statistics inside the container
56    pub dbus_stats: Option<dbus_stats::DBusStats>,
57    /// Boot blame statistics: slowest units at boot with activation times in seconds
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub boot_blame: Option<boot::BootBlameStats>,
60    /// Unit verification error statistics
61    pub verify_stats: Option<verify::VerifyStats>,
62}
63
64/// Root struct containing all enabled monitord metrics for the host system and containers
65#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
66pub struct MonitordStats {
67    /// systemd-networkd interface states and managed interface count
68    pub networkd: networkd::NetworkdState,
69    /// PID 1 (systemd) process stats from procfs: CPU, memory, FDs, tasks
70    pub pid1: Option<pid1::Pid1Stats>,
71    /// Overall systemd manager state (e.g. running, degraded, initializing)
72    pub system_state: system::SystemdSystemState,
73    /// Aggregated systemd unit counts by type/state and per-service/timer detailed metrics
74    pub units: units::SystemdUnitStats,
75    /// Installed systemd version (major.minor.revision.os)
76    pub version: system::SystemdVersion,
77    /// D-Bus daemon/broker statistics (connections, bus names, match rules, per-peer accounting)
78    pub dbus_stats: Option<dbus_stats::DBusStats>,
79    /// Per-container stats keyed by machine name, collected via systemd-machined
80    pub machines: HashMap<String, MachineStats>,
81    /// Boot blame statistics: slowest units at boot with activation times in seconds
82    #[serde(skip_serializing_if = "Option::is_none")]
83    pub boot_blame: Option<boot::BootBlameStats>,
84    /// Unit verification error statistics
85    pub verify_stats: Option<verify::VerifyStats>,
86}
87
88/// Print statistics in the format set in configuration
89pub fn print_stats(
90    key_prefix: &str,
91    output_format: &config::MonitordOutputFormat,
92    stats: &MonitordStats,
93) {
94    match output_format {
95        config::MonitordOutputFormat::Json => println!(
96            "{}",
97            serde_json::to_string(&stats).expect("Invalid JSON serialization")
98        ),
99        config::MonitordOutputFormat::JsonFlat => println!(
100            "{}",
101            json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
102        ),
103        config::MonitordOutputFormat::JsonPretty => println!(
104            "{}",
105            serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
106        ),
107    }
108}
109
110/// Main statictic collection function running what's required by configuration in parallel
111/// Takes an optional locked stats struct to update and to output stats to STDOUT or not
112pub async fn stat_collector(
113    config: config::Config,
114    maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
115    output_stats: bool,
116) -> Result<(), MonitordError> {
117    let mut collect_interval_ms: u128 = 0;
118    if config.monitord.daemon {
119        collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
120    }
121
122    let config = Arc::new(config);
123    let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
124        maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
125    let locked_machine_stats: Arc<RwLock<MachineStats>> =
126        Arc::new(RwLock::new(MachineStats::default()));
127    std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
128    let sdc = zbus::connection::Builder::system()?
129        .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
130        .build()
131        .await?;
132    let mut join_set = tokio::task::JoinSet::new();
133
134    loop {
135        let collect_start_time = Instant::now();
136        info!("Starting stat collection run");
137
138        // Always collect systemd version
139
140        join_set.spawn(crate::system::update_version(
141            sdc.clone(),
142            locked_machine_stats.clone(),
143        ));
144
145        // Collect pid1 procfs stats
146        if config.pid1.enabled {
147            join_set.spawn(crate::pid1::update_pid1_stats(
148                1,
149                locked_machine_stats.clone(),
150            ));
151        }
152
153        // Run networkd collector if enabled
154        if config.networkd.enabled {
155            join_set.spawn(crate::networkd::update_networkd_stats(
156                config.networkd.link_state_dir.clone(),
157                None,
158                sdc.clone(),
159                locked_machine_stats.clone(),
160            ));
161        }
162
163        // Run system running (SystemState) state collector
164        if config.system_state.enabled {
165            join_set.spawn(crate::system::update_system_stats(
166                sdc.clone(),
167                locked_machine_stats.clone(),
168            ));
169        }
170
171        // Run service collectors if there are services listed in config
172        if config.units.enabled {
173            let config_clone = Arc::clone(&config);
174            let sdc_clone = sdc.clone();
175            let stats_clone = locked_machine_stats.clone();
176            join_set.spawn(async move {
177                if config_clone.varlink.enabled {
178                    let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
179                    match crate::varlink_units::update_unit_stats(
180                        Arc::clone(&config_clone),
181                        stats_clone.clone(),
182                        socket_path,
183                    )
184                    .await
185                    {
186                        Ok(()) => return Ok(()),
187                        Err(err) => {
188                            warn!(
189                                "Varlink units stats failed, falling back to D-Bus: {:?}",
190                                err
191                            );
192                        }
193                    }
194                }
195                crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone).await
196            });
197        }
198
199        if config.machines.enabled {
200            join_set.spawn(crate::machines::update_machines_stats(
201                Arc::clone(&config),
202                sdc.clone(),
203                locked_monitord_stats.clone(),
204            ));
205        }
206
207        if config.dbus_stats.enabled {
208            join_set.spawn(crate::dbus_stats::update_dbus_stats(
209                Arc::clone(&config),
210                sdc.clone(),
211                locked_machine_stats.clone(),
212            ));
213        }
214
215        if config.boot_blame.enabled {
216            join_set.spawn(crate::boot::update_boot_blame_stats(
217                Arc::clone(&config),
218                sdc.clone(),
219                locked_machine_stats.clone(),
220            ));
221        }
222
223        if config.verify.enabled {
224            join_set.spawn(crate::verify::update_verify_stats(
225                sdc.clone(),
226                locked_machine_stats.clone(),
227                config.verify.allowlist.clone(),
228                config.verify.blocklist.clone(),
229            ));
230        }
231
232        if join_set.len() == 1 {
233            warn!("No collectors except systemd version scheduled to run. Exiting");
234        }
235
236        // Check all collection for errors and log if one fails
237        while let Some(res) = join_set.join_next().await {
238            match res {
239                Ok(r) => match r {
240                    Ok(_) => (),
241                    Err(e) => {
242                        error!("Collection specific failure: {:?}", e);
243                    }
244                },
245                Err(e) => {
246                    error!("Join error: {:?}", e);
247                }
248            }
249        }
250
251        {
252            // Update monitord stats with machine stats
253            let mut monitord_stats = locked_monitord_stats.write().await;
254            let machine_stats = locked_machine_stats.read().await;
255            monitord_stats.pid1 = machine_stats.pid1.clone();
256            monitord_stats.networkd = machine_stats.networkd.clone();
257            monitord_stats.system_state = machine_stats.system_state;
258            monitord_stats.version = machine_stats.version.clone();
259            monitord_stats.units = machine_stats.units.clone();
260            monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
261            monitord_stats.boot_blame = machine_stats.boot_blame.clone();
262            monitord_stats.verify_stats = machine_stats.verify_stats.clone();
263        }
264
265        let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
266
267        info!("stat collection run took {}ms", elapsed_runtime_ms);
268        if output_stats {
269            let monitord_stats = locked_monitord_stats.read().await;
270            print_stats(
271                &config.monitord.key_prefix,
272                &config.monitord.output_format,
273                &monitord_stats,
274            );
275        }
276        if !config.monitord.daemon {
277            break;
278        }
279        let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
280        info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
281        tokio::time::sleep(Duration::from_millis(
282            sleep_time_ms
283                .try_into()
284                .expect("Sleep time does not fit into a u64 :O"),
285        ))
286        .await;
287    }
288    Ok(())
289}