1use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use thiserror::Error;
12use tokio::sync::RwLock;
13use tracing::error;
14use tracing::info;
15use tracing::warn;
16
17#[derive(Error, Debug)]
18pub enum MonitordError {
19 #[error("D-Bus connection error: {0}")]
20 ZbusError(#[from] zbus::Error),
21}
22
23pub mod boot;
24pub mod config;
25pub(crate) mod dbus;
26pub mod dbus_stats;
27pub mod json;
28pub mod logging;
29pub mod machines;
30pub mod networkd;
31pub mod pid1;
32pub mod system;
33pub mod timer;
34pub mod unit_constants;
35pub mod units;
36pub mod varlink;
37pub mod varlink_units;
38pub mod verify;
39
40pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
41
42#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
44pub struct MachineStats {
45 pub networkd: networkd::NetworkdState,
47 pub pid1: Option<pid1::Pid1Stats>,
49 pub system_state: system::SystemdSystemState,
51 pub units: units::SystemdUnitStats,
53 pub version: system::SystemdVersion,
55 pub dbus_stats: Option<dbus_stats::DBusStats>,
57 #[serde(skip_serializing_if = "Option::is_none")]
59 pub boot_blame: Option<boot::BootBlameStats>,
60 pub verify_stats: Option<verify::VerifyStats>,
62}
63
64#[derive(serde::Serialize, serde::Deserialize, Debug, Default, PartialEq)]
66pub struct MonitordStats {
67 pub networkd: networkd::NetworkdState,
69 pub pid1: Option<pid1::Pid1Stats>,
71 pub system_state: system::SystemdSystemState,
73 pub units: units::SystemdUnitStats,
75 pub version: system::SystemdVersion,
77 pub dbus_stats: Option<dbus_stats::DBusStats>,
79 pub machines: HashMap<String, MachineStats>,
81 #[serde(skip_serializing_if = "Option::is_none")]
83 pub boot_blame: Option<boot::BootBlameStats>,
84 pub verify_stats: Option<verify::VerifyStats>,
86}
87
88pub fn print_stats(
90 key_prefix: &str,
91 output_format: &config::MonitordOutputFormat,
92 stats: &MonitordStats,
93) {
94 match output_format {
95 config::MonitordOutputFormat::Json => println!(
96 "{}",
97 serde_json::to_string(&stats).expect("Invalid JSON serialization")
98 ),
99 config::MonitordOutputFormat::JsonFlat => println!(
100 "{}",
101 json::flatten(stats, key_prefix).expect("Invalid JSON serialization")
102 ),
103 config::MonitordOutputFormat::JsonPretty => println!(
104 "{}",
105 serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
106 ),
107 }
108}
109
110pub async fn stat_collector(
113 config: config::Config,
114 maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
115 output_stats: bool,
116) -> Result<(), MonitordError> {
117 let mut collect_interval_ms: u128 = 0;
118 if config.monitord.daemon {
119 collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
120 }
121
122 let config = Arc::new(config);
123 let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
124 maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
125 let locked_machine_stats: Arc<RwLock<MachineStats>> =
126 Arc::new(RwLock::new(MachineStats::default()));
127 std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
128 let sdc = zbus::connection::Builder::system()?
129 .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
130 .build()
131 .await?;
132 let mut join_set = tokio::task::JoinSet::new();
133
134 loop {
135 let collect_start_time = Instant::now();
136 info!("Starting stat collection run");
137
138 join_set.spawn(crate::system::update_version(
141 sdc.clone(),
142 locked_machine_stats.clone(),
143 ));
144
145 if config.pid1.enabled {
147 join_set.spawn(crate::pid1::update_pid1_stats(
148 1,
149 locked_machine_stats.clone(),
150 ));
151 }
152
153 if config.networkd.enabled {
155 join_set.spawn(crate::networkd::update_networkd_stats(
156 config.networkd.link_state_dir.clone(),
157 None,
158 sdc.clone(),
159 locked_machine_stats.clone(),
160 ));
161 }
162
163 if config.system_state.enabled {
165 join_set.spawn(crate::system::update_system_stats(
166 sdc.clone(),
167 locked_machine_stats.clone(),
168 ));
169 }
170
171 if config.units.enabled {
173 let config_clone = Arc::clone(&config);
174 let sdc_clone = sdc.clone();
175 let stats_clone = locked_machine_stats.clone();
176 join_set.spawn(async move {
177 if config_clone.varlink.enabled {
178 let socket_path = crate::varlink_units::METRICS_SOCKET_PATH.to_string();
179 match crate::varlink_units::update_unit_stats(
180 Arc::clone(&config_clone),
181 stats_clone.clone(),
182 socket_path,
183 )
184 .await
185 {
186 Ok(()) => return Ok(()),
187 Err(err) => {
188 warn!(
189 "Varlink units stats failed, falling back to D-Bus: {:?}",
190 err
191 );
192 }
193 }
194 }
195 crate::units::update_unit_stats(config_clone, sdc_clone, stats_clone).await
196 });
197 }
198
199 if config.machines.enabled {
200 join_set.spawn(crate::machines::update_machines_stats(
201 Arc::clone(&config),
202 sdc.clone(),
203 locked_monitord_stats.clone(),
204 ));
205 }
206
207 if config.dbus_stats.enabled {
208 join_set.spawn(crate::dbus_stats::update_dbus_stats(
209 Arc::clone(&config),
210 sdc.clone(),
211 locked_machine_stats.clone(),
212 ));
213 }
214
215 if config.boot_blame.enabled {
216 join_set.spawn(crate::boot::update_boot_blame_stats(
217 Arc::clone(&config),
218 sdc.clone(),
219 locked_machine_stats.clone(),
220 ));
221 }
222
223 if config.verify.enabled {
224 join_set.spawn(crate::verify::update_verify_stats(
225 sdc.clone(),
226 locked_machine_stats.clone(),
227 config.verify.allowlist.clone(),
228 config.verify.blocklist.clone(),
229 ));
230 }
231
232 if join_set.len() == 1 {
233 warn!("No collectors except systemd version scheduled to run. Exiting");
234 }
235
236 while let Some(res) = join_set.join_next().await {
238 match res {
239 Ok(r) => match r {
240 Ok(_) => (),
241 Err(e) => {
242 error!("Collection specific failure: {:?}", e);
243 }
244 },
245 Err(e) => {
246 error!("Join error: {:?}", e);
247 }
248 }
249 }
250
251 {
252 let mut monitord_stats = locked_monitord_stats.write().await;
254 let machine_stats = locked_machine_stats.read().await;
255 monitord_stats.pid1 = machine_stats.pid1.clone();
256 monitord_stats.networkd = machine_stats.networkd.clone();
257 monitord_stats.system_state = machine_stats.system_state;
258 monitord_stats.version = machine_stats.version.clone();
259 monitord_stats.units = machine_stats.units.clone();
260 monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
261 monitord_stats.boot_blame = machine_stats.boot_blame.clone();
262 monitord_stats.verify_stats = machine_stats.verify_stats.clone();
263 }
264
265 let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
266
267 info!("stat collection run took {}ms", elapsed_runtime_ms);
268 if output_stats {
269 let monitord_stats = locked_monitord_stats.read().await;
270 print_stats(
271 &config.monitord.key_prefix,
272 &config.monitord.output_format,
273 &monitord_stats,
274 );
275 }
276 if !config.monitord.daemon {
277 break;
278 }
279 let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
280 info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
281 tokio::time::sleep(Duration::from_millis(
282 sleep_time_ms
283 .try_into()
284 .expect("Sleep time does not fit into a u64 :O"),
285 ))
286 .await;
287 }
288 Ok(())
289}