1use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::error;
14use tracing::info;
15use tracing::warn;
16
17#[derive(Error, Debug)]
18pub enum MonitordError {
19 #[error("D-Bus connection error: {0}")]
20 ZbusError(#[from] zbus::Error),
21}
22
23pub mod boot;
24pub mod config;
25pub(crate) mod dbus;
26pub mod dbus_stats;
27pub mod json;
28pub mod logging;
29pub mod machines;
30pub mod networkd;
31pub mod pid1;
32pub mod system;
33pub mod timer;
34pub mod units;
35pub mod verify;
36
37pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
38
39#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
41pub struct MachineStats {
42 pub networkd: networkd::NetworkdState,
44 pub pid1: Option<pid1::Pid1Stats>,
46 pub system_state: system::SystemdSystemState,
48 pub units: units::SystemdUnitStats,
50 pub version: system::SystemdVersion,
52 pub dbus_stats: Option<dbus_stats::DBusStats>,
54 #[serde(skip_serializing_if = "Option::is_none")]
56 pub boot_blame: Option<boot::BootBlameStats>,
57 pub verify_stats: Option<verify::VerifyStats>,
59}
60
61#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
63pub struct MonitordStats {
64 pub networkd: networkd::NetworkdState,
66 pub pid1: Option<pid1::Pid1Stats>,
68 pub system_state: system::SystemdSystemState,
70 pub units: units::SystemdUnitStats,
72 pub version: system::SystemdVersion,
74 pub dbus_stats: Option<dbus_stats::DBusStats>,
76 pub machines: HashMap<String, MachineStats>,
78 #[serde(skip_serializing_if = "Option::is_none")]
80 pub boot_blame: Option<boot::BootBlameStats>,
81 pub verify_stats: Option<verify::VerifyStats>,
83}
84
85pub fn print_stats(
87 key_prefix: &str,
88 output_format: &config::MonitordOutputFormat,
89 stats: &MonitordStats,
90) {
91 match output_format {
92 config::MonitordOutputFormat::Json => println!(
93 "{}",
94 serde_json::to_string(&stats).expect("Invalid JSON serialization")
95 ),
96 config::MonitordOutputFormat::JsonFlat => println!(
97 "{}",
98 json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
99 ),
100 config::MonitordOutputFormat::JsonPretty => println!(
101 "{}",
102 serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
103 ),
104 }
105}
106
107pub async fn stat_collector(
110 config: config::Config,
111 maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
112 output_stats: bool,
113) -> Result<(), MonitordError> {
114 let mut collect_interval_ms: u128 = 0;
115 if config.monitord.daemon {
116 collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
117 }
118
119 let config = Arc::new(config);
120 let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
121 maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
122 let locked_machine_stats: Arc<RwLock<MachineStats>> =
123 Arc::new(RwLock::new(MachineStats::default()));
124 std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
125 let sdc = zbus::connection::Builder::system()?
126 .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
127 .build()
128 .await?;
129 let mut join_set = tokio::task::JoinSet::new();
130
131 loop {
132 let collect_start_time = Instant::now();
133 info!("Starting stat collection run");
134
135 join_set.spawn(crate::system::update_version(
138 sdc.clone(),
139 locked_machine_stats.clone(),
140 ));
141
142 if config.pid1.enabled {
144 join_set.spawn(crate::pid1::update_pid1_stats(
145 1,
146 locked_machine_stats.clone(),
147 ));
148 }
149
150 if config.networkd.enabled {
152 join_set.spawn(crate::networkd::update_networkd_stats(
153 config.networkd.link_state_dir.clone(),
154 None,
155 sdc.clone(),
156 locked_machine_stats.clone(),
157 ));
158 }
159
160 if config.system_state.enabled {
162 join_set.spawn(crate::system::update_system_stats(
163 sdc.clone(),
164 locked_machine_stats.clone(),
165 ));
166 }
167
168 if config.units.enabled {
170 join_set.spawn(crate::units::update_unit_stats(
171 Arc::clone(&config),
172 sdc.clone(),
173 locked_machine_stats.clone(),
174 ));
175 }
176
177 if config.machines.enabled {
178 join_set.spawn(crate::machines::update_machines_stats(
179 Arc::clone(&config),
180 sdc.clone(),
181 locked_monitord_stats.clone(),
182 ));
183 }
184
185 if config.dbus_stats.enabled {
186 join_set.spawn(crate::dbus_stats::update_dbus_stats(
187 Arc::clone(&config),
188 sdc.clone(),
189 locked_machine_stats.clone(),
190 ));
191 }
192
193 if config.boot_blame.enabled {
194 join_set.spawn(crate::boot::update_boot_blame_stats(
195 Arc::clone(&config),
196 sdc.clone(),
197 locked_machine_stats.clone(),
198 ));
199 }
200
201 if config.verify.enabled {
202 join_set.spawn(crate::verify::update_verify_stats(
203 sdc.clone(),
204 locked_machine_stats.clone(),
205 config.verify.allowlist.clone(),
206 config.verify.blocklist.clone(),
207 ));
208 }
209
210 if join_set.len() == 1 {
211 warn!("No collectors except systemd version scheduled to run. Exiting");
212 }
213
214 while let Some(res) = join_set.join_next().await {
216 match res {
217 Ok(r) => match r {
218 Ok(_) => (),
219 Err(e) => {
220 error!("Collection specific failure: {:?}", e);
221 }
222 },
223 Err(e) => {
224 error!("Join error: {:?}", e);
225 }
226 }
227 }
228
229 {
230 let mut monitord_stats = locked_monitord_stats.write().await;
232 let machine_stats = locked_machine_stats.read().await;
233 monitord_stats.pid1 = machine_stats.pid1.clone();
234 monitord_stats.networkd = machine_stats.networkd.clone();
235 monitord_stats.system_state = machine_stats.system_state;
236 monitord_stats.version = machine_stats.version.clone();
237 monitord_stats.units = machine_stats.units.clone();
238 monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
239 monitord_stats.boot_blame = machine_stats.boot_blame.clone();
240 monitord_stats.verify_stats = machine_stats.verify_stats.clone();
241 }
242
243 let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
244
245 info!("stat collection run took {}ms", elapsed_runtime_ms);
246 if output_stats {
247 let monitord_stats = locked_monitord_stats.read().await;
248 print_stats(
249 &config.monitord.key_prefix,
250 &config.monitord.output_format,
251 &monitord_stats,
252 );
253 }
254 if !config.monitord.daemon {
255 break;
256 }
257 let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
258 info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
259 tokio::time::sleep(Duration::from_millis(
260 sleep_time_ms
261 .try_into()
262 .expect("Sleep time does not fit into a u64 :O"),
263 ))
264 .await;
265 }
266 Ok(())
267}