monitord/
lib.rs

1//! # monitord Crate
2//!
3//! `monitord` is a library to gather statistics about systemd.
4
5use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use tokio::sync::RwLock;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16pub mod config;
17pub(crate) mod dbus;
18pub mod dbus_stats;
19pub mod json;
20pub mod logging;
21pub mod machines;
22pub mod networkd;
23pub mod pid1;
24pub mod system;
25pub mod timer;
26pub mod units;
27
28pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
29
30#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
31pub struct MachineStats {
32    pub networkd: networkd::NetworkdState,
33    pub pid1: Option<pid1::Pid1Stats>,
34    pub system_state: system::SystemdSystemState,
35    pub units: units::SystemdUnitStats,
36    pub version: system::SystemdVersion,
37    pub dbus_stats: Option<dbus_stats::DBusStats>,
38}
39
40/// Main monitord stats struct collection all enabled stats
41#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Eq, PartialEq)]
42pub struct MonitordStats {
43    pub networkd: networkd::NetworkdState,
44    pub pid1: Option<pid1::Pid1Stats>,
45    pub system_state: system::SystemdSystemState,
46    pub units: units::SystemdUnitStats,
47    pub version: system::SystemdVersion,
48    pub dbus_stats: Option<dbus_stats::DBusStats>,
49    pub machines: HashMap<String, MachineStats>,
50}
51
52/// Print statistics in the format set in configuration
53pub fn print_stats(
54    key_prefix: &str,
55    output_format: &config::MonitordOutputFormat,
56    stats: &MonitordStats,
57) {
58    match output_format {
59        config::MonitordOutputFormat::Json => println!(
60            "{}",
61            serde_json::to_string(&stats).expect("Invalid JSON serialization")
62        ),
63        config::MonitordOutputFormat::JsonFlat => println!(
64            "{}",
65            json::flatten(stats, &key_prefix.to_string()).expect("Invalid JSON serialization")
66        ),
67        config::MonitordOutputFormat::JsonPretty => println!(
68            "{}",
69            serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
70        ),
71    }
72}
73
74/// Main statictic collection function running what's required by configuration in parallel
75/// Takes an optional locked stats struct to update and to output stats to STDOUT or not
76pub async fn stat_collector(
77    config: config::Config,
78    maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
79    output_stats: bool,
80) -> anyhow::Result<()> {
81    let mut collect_interval_ms: u128 = 0;
82    if config.monitord.daemon {
83        collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
84    }
85
86    let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
87        maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
88    let locked_machine_stats: Arc<RwLock<MachineStats>> =
89        Arc::new(RwLock::new(MachineStats::default()));
90    std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
91    let sdc = zbus::connection::Builder::system()?
92        .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
93        .build()
94        .await?;
95    let mut join_set = tokio::task::JoinSet::new();
96
97    loop {
98        let collect_start_time = Instant::now();
99        info!("Starting stat collection run");
100
101        // Always collect systemd version
102
103        join_set.spawn(crate::system::update_version(
104            sdc.clone(),
105            locked_machine_stats.clone(),
106        ));
107
108        // Collect pid1 procfs stats
109        if config.pid1.enabled {
110            join_set.spawn(crate::pid1::update_pid1_stats(
111                1,
112                locked_machine_stats.clone(),
113            ));
114        }
115
116        // Run networkd collector if enabled
117        if config.networkd.enabled {
118            let config_clone = config.clone();
119            join_set.spawn(crate::networkd::update_networkd_stats(
120                config_clone.networkd.link_state_dir.clone(),
121                None,
122                sdc.clone(),
123                locked_machine_stats.clone(),
124            ));
125        }
126
127        // Run system running (SystemState) state collector
128        if config.system_state.enabled {
129            join_set.spawn(crate::system::update_system_stats(
130                sdc.clone(),
131                locked_machine_stats.clone(),
132            ));
133        }
134
135        // Run service collectors if there are services listed in config
136        if config.units.enabled {
137            join_set.spawn(crate::units::update_unit_stats(
138                config.clone(),
139                sdc.clone(),
140                locked_machine_stats.clone(),
141            ));
142        }
143
144        if config.machines.enabled {
145            join_set.spawn(crate::machines::update_machines_stats(
146                config.clone(),
147                sdc.clone(),
148                locked_monitord_stats.clone(),
149            ));
150        }
151
152        if config.dbus_stats.enabled {
153            join_set.spawn(crate::dbus_stats::update_dbus_stats(
154                sdc.clone(),
155                locked_machine_stats.clone(),
156            ));
157        }
158
159        if join_set.len() == 1 {
160            warn!("No collectors execpt systemd version scheduled to run. Exiting");
161        }
162
163        // Check all collection for errors and log if one fails
164        while let Some(res) = join_set.join_next().await {
165            match res {
166                Ok(r) => match r {
167                    Ok(_) => (),
168                    Err(e) => {
169                        error!("Collection specific failure: {:?}", e);
170                    }
171                },
172                Err(e) => {
173                    error!("Join error: {:?}", e);
174                }
175            }
176        }
177
178        {
179            // Update monitord stats with machine stats
180            let mut monitord_stats = locked_monitord_stats.write().await;
181            let machine_stats = locked_machine_stats.read().await;
182            monitord_stats.pid1 = machine_stats.pid1.clone();
183            monitord_stats.networkd = machine_stats.networkd.clone();
184            monitord_stats.system_state = machine_stats.system_state;
185            monitord_stats.version = machine_stats.version.clone();
186            monitord_stats.units = machine_stats.units.clone();
187            monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
188        }
189
190        let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
191
192        info!("stat collection run took {}ms", elapsed_runtime_ms);
193        if output_stats {
194            let monitord_stats = locked_monitord_stats.read().await;
195            print_stats(
196                &config.monitord.key_prefix,
197                &config.monitord.output_format,
198                &monitord_stats,
199            );
200        }
201        if !config.monitord.daemon {
202            break;
203        }
204        let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
205        info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
206        tokio::time::sleep(Duration::from_millis(
207            sleep_time_ms
208                .try_into()
209                .expect("Sleep time does not fit into a u64 :O"),
210        ))
211        .await;
212    }
213    Ok(())
214}