1use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::error;
14use tracing::info;
15use tracing::warn;
16
17#[derive(Error, Debug)]
18pub enum MonitordError {
19 #[error("D-Bus connection error: {0}")]
20 ZbusError(#[from] zbus::Error),
21}
22
23pub mod boot;
24pub mod config;
25pub(crate) mod dbus;
26pub mod dbus_stats;
27pub mod json;
28pub mod logging;
29pub mod machines;
30pub mod networkd;
31pub mod pid1;
32pub mod system;
33pub mod timer;
34pub mod unit_constants;
35pub mod units;
36pub mod varlink;
37pub mod varlink_units;
38pub mod verify;
39
40pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
41
42#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
44pub struct MachineStats {
45 pub networkd: networkd::NetworkdState,
47 pub pid1: Option<pid1::Pid1Stats>,
49 pub system_state: system::SystemdSystemState,
51 pub units: units::SystemdUnitStats,
53 pub version: system::SystemdVersion,
55 pub dbus_stats: Option<dbus_stats::DBusStats>,
57 #[serde(skip_serializing_if = "Option::is_none")]
59 pub boot_blame: Option<boot::BootBlameStats>,
60 pub verify_stats: Option<verify::VerifyStats>,
62}
63
64#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
66pub struct MonitordStats {
67 pub networkd: networkd::NetworkdState,
69 pub pid1: Option<pid1::Pid1Stats>,
71 pub system_state: system::SystemdSystemState,
73 pub units: units::SystemdUnitStats,
75 pub version: system::SystemdVersion,
77 pub dbus_stats: Option<dbus_stats::DBusStats>,
79 pub machines: HashMap<String, MachineStats>,
81 #[serde(skip_serializing_if = "Option::is_none")]
83 pub boot_blame: Option<boot::BootBlameStats>,
84 pub verify_stats: Option<verify::VerifyStats>,
86}
87
88pub fn print_stats(
90 key_prefix: &str,
91 output_format: &config::MonitordOutputFormat,
92 stats: &MonitordStats,
93) {
94 match output_format {
95 config::MonitordOutputFormat::Json => println!(
96 "{}",
97 serde_json::to_string(&stats).expect("Invalid JSON serialization")
98 ),
99 config::MonitordOutputFormat::JsonFlat => println!(
100 "{}",
101 json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
102 ),
103 config::MonitordOutputFormat::JsonPretty => println!(
104 "{}",
105 serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
106 ),
107 }
108}
109
110async fn get_or_create_dbus_connection(
112 config: &config::Config,
113 maybe_connection: Option<zbus::Connection>,
114) -> Result<zbus::Connection, MonitordError> {
115 match maybe_connection {
116 Some(conn) => Ok(conn),
117 None => Ok(zbus::connection::Builder::system()?
118 .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
119 .build()
120 .await?),
121 }
122}
123
124pub async fn stat_collector(
130 config: config::Config,
131 maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
132 output_stats: bool,
133 maybe_connection: Option<zbus::Connection>,
134) -> Result<Option<zbus::Connection>, MonitordError> {
135 let mut collect_interval_ms: u128 = 0;
136 if config.monitord.daemon {
137 collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
138 }
139
140 let config = Arc::new(config);
141 let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
142 maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
143 let locked_machine_stats: Arc<RwLock<MachineStats>> =
144 Arc::new(RwLock::new(MachineStats::default()));
145 let cached_machine_connections: Arc<tokio::sync::Mutex<machines::MachineConnections>> =
146 Arc::new(tokio::sync::Mutex::new(HashMap::new()));
147 std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
148 let sdc = get_or_create_dbus_connection(&config, maybe_connection).await?;
149 let mut join_set = tokio::task::JoinSet::new();
150 let mut had_error;
151
152 loop {
153 let collect_start_time = Instant::now();
154 info!("Starting stat collection run");
155
156 join_set.spawn(crate::system::update_version(
159 sdc.clone(),
160 locked_machine_stats.clone(),
161 ));
162
163 if config.pid1.enabled {
165 join_set.spawn(crate::pid1::update_pid1_stats(
166 1,
167 locked_machine_stats.clone(),
168 ));
169 }
170
171 if config.networkd.enabled {
173 join_set.spawn(crate::networkd::update_networkd_stats(
174 config.networkd.link_state_dir.clone(),
175 None,
176 sdc.clone(),
177 locked_machine_stats.clone(),
178 ));
179 }
180
181 if config.system_state.enabled {
183 join_set.spawn(crate::system::update_system_stats(
184 sdc.clone(),
185 locked_machine_stats.clone(),
186 ));
187 }
188
189 if config.units.enabled {
191 let config_clone = Arc::clone(&config);
192 let sdc_clone = sdc.clone();
193 let stats_clone = locked_machine_stats.clone();
194 join_set.spawn(async move {
195 if config_clone.varlink.enabled {
196 let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
197 match crate::varlink_units::update_unit_stats(
198 Arc::clone(&config_clone),
199 stats_clone.clone(),
200 socket_path,
201 )
202 .await
203 {
204 Ok(()) => return Ok(()),
205 Err(err) => {
206 warn!(
207 "Varlink units stats failed, falling back to D-Bus: {:?}",
208 err
209 );
210 }
211 }
212 }
213 crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone).await
214 });
215 }
216
217 if config.machines.enabled {
218 join_set.spawn(crate::machines::update_machines_stats(
219 Arc::clone(&config),
220 sdc.clone(),
221 locked_monitord_stats.clone(),
222 cached_machine_connections.clone(),
223 ));
224 }
225
226 if config.dbus_stats.enabled {
227 join_set.spawn(crate::dbus_stats::update_dbus_stats(
228 Arc::clone(&config),
229 sdc.clone(),
230 locked_machine_stats.clone(),
231 ));
232 }
233
234 if config.boot_blame.enabled {
235 join_set.spawn(crate::boot::update_boot_blame_stats(
236 Arc::clone(&config),
237 sdc.clone(),
238 locked_machine_stats.clone(),
239 ));
240 }
241
242 if config.verify.enabled {
243 join_set.spawn(crate::verify::update_verify_stats(
244 sdc.clone(),
245 locked_machine_stats.clone(),
246 config.verify.allowlist.clone(),
247 config.verify.blocklist.clone(),
248 ));
249 }
250
251 if join_set.len() == 1 {
252 warn!("No collectors except systemd version scheduled to run. Exiting");
253 }
254
255 had_error = false;
257 while let Some(res) = join_set.join_next().await {
258 match res {
259 Ok(r) => match r {
260 Ok(_) => (),
261 Err(e) => {
262 had_error = true;
263 error!("Collection specific failure: {:?}", e);
264 }
265 },
266 Err(e) => {
267 had_error = true;
268 error!("Join error: {:?}", e);
269 }
270 }
271 }
272
273 {
274 let mut monitord_stats = locked_monitord_stats.write().await;
276 let machine_stats = locked_machine_stats.read().await;
277 monitord_stats.pid1 = machine_stats.pid1.clone();
278 monitord_stats.networkd = machine_stats.networkd.clone();
279 monitord_stats.system_state = machine_stats.system_state;
280 monitord_stats.version = machine_stats.version.clone();
281 monitord_stats.units = machine_stats.units.clone();
282 monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
283 monitord_stats.boot_blame = machine_stats.boot_blame.clone();
284 monitord_stats.verify_stats = machine_stats.verify_stats.clone();
285 }
286
287 let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
288
289 info!("stat collection run took {}ms", elapsed_runtime_ms);
290 if output_stats {
291 let monitord_stats = locked_monitord_stats.read().await;
292 print_stats(
293 &config.monitord.key_prefix,
294 &config.monitord.output_format,
295 &monitord_stats,
296 );
297 }
298 if !config.monitord.daemon {
299 break;
300 }
301 let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
302 info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
303 tokio::time::sleep(Duration::from_millis(
304 sleep_time_ms
305 .try_into()
306 .expect("Sleep time does not fit into a u64 :O"),
307 ))
308 .await;
309 }
310 Ok(if had_error { None } else { Some(sdc) })
311}