1use std::collections::HashMap;
7use std::str::FromStr;
8use std::sync::Arc;
9use std::time::SystemTime;
10use std::time::UNIX_EPOCH;
11
12use struct_field_names_as_array::FieldNamesAsArray;
13use thiserror::Error;
14use tokio::sync::RwLock;
15use tracing::debug;
16use tracing::error;
17use zbus::zvariant::ObjectPath;
18use zbus::zvariant::OwnedObjectPath;
19
20#[derive(Error, Debug)]
21pub enum MonitordUnitsError {
22 #[error("Units D-Bus error: {0}")]
23 ZbusError(#[from] zbus::Error),
24 #[error("Integer conversion error: {0}")]
25 IntConversion(#[from] std::num::TryFromIntError),
26 #[error("System time error: {0}")]
27 SystemTimeError(#[from] std::time::SystemTimeError),
28}
29
30use crate::timer::TimerStats;
31use crate::MachineStats;
32
33pub use crate::unit_constants::is_unit_unhealthy;
35pub use crate::unit_constants::SystemdUnitActiveState;
36pub use crate::unit_constants::SystemdUnitLoadState;
37
38#[derive(
39 serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
40)]
41
42pub struct SystemdUnitStats {
45 pub activating_units: u64,
47 pub active_units: u64,
49 pub automount_units: u64,
51 pub device_units: u64,
53 pub failed_units: u64,
55 pub inactive_units: u64,
57 pub jobs_queued: u64,
59 pub loaded_units: u64,
61 pub masked_units: u64,
63 pub mount_units: u64,
65 pub not_found_units: u64,
67 pub path_units: u64,
69 pub scope_units: u64,
71 pub service_units: u64,
73 pub slice_units: u64,
75 pub socket_units: u64,
77 pub target_units: u64,
79 pub timer_units: u64,
81 pub timer_persistent_units: u64,
83 pub timer_remain_after_elapse: u64,
85 pub total_units: u64,
87 pub service_stats: HashMap<String, ServiceStats>,
89 pub timer_stats: HashMap<String, TimerStats>,
91 pub unit_states: HashMap<String, UnitStates>,
93}
94
95#[derive(
98 serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
99)]
100pub struct ServiceStats {
101 pub active_enter_timestamp: u64,
103 pub active_exit_timestamp: u64,
105 pub cpuusage_nsec: u64,
107 pub inactive_exit_timestamp: u64,
109 pub ioread_bytes: u64,
111 pub ioread_operations: u64,
113 pub memory_available: u64,
115 pub memory_current: u64,
117 pub nrestarts: u32,
119 pub processes: u32,
121 pub restart_usec: u64,
123 pub state_change_timestamp: u64,
125 pub status_errno: i32,
127 pub tasks_current: u64,
129 pub timeout_clean_usec: u64,
131 pub watchdog_usec: u64,
133}
134
135#[derive(
138 serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
139)]
140pub struct UnitStates {
141 pub active_state: SystemdUnitActiveState,
143 pub load_state: SystemdUnitLoadState,
145 pub unhealthy: bool,
148 pub time_in_state_usecs: Option<u64>,
151}
152
153#[derive(Debug)]
158pub struct ListedUnit {
159 pub name: String, pub description: String, pub load_state: String, pub active_state: String, pub sub_state: String, pub follow_unit: String, pub unit_object_path: OwnedObjectPath, pub job_id: u32, pub job_type: String, pub job_object_path: OwnedObjectPath, }
170impl
171 From<(
172 String,
173 String,
174 String,
175 String,
176 String,
177 String,
178 OwnedObjectPath,
179 u32,
180 String,
181 OwnedObjectPath,
182 )> for ListedUnit
183{
184 fn from(
185 tuple: (
186 String,
187 String,
188 String,
189 String,
190 String,
191 String,
192 OwnedObjectPath,
193 u32,
194 String,
195 OwnedObjectPath,
196 ),
197 ) -> Self {
198 ListedUnit {
199 name: tuple.0,
200 description: tuple.1,
201 load_state: tuple.2,
202 active_state: tuple.3,
203 sub_state: tuple.4,
204 follow_unit: tuple.5,
205 unit_object_path: tuple.6,
206 job_id: tuple.7,
207 job_type: tuple.8,
208 job_object_path: tuple.9,
209 }
210 }
211}
212
213pub const SERVICE_FIELD_NAMES: &[&str] = &ServiceStats::FIELD_NAMES_AS_ARRAY;
214pub const UNIT_FIELD_NAMES: &[&str] = &SystemdUnitStats::FIELD_NAMES_AS_ARRAY;
215pub const UNIT_STATES_FIELD_NAMES: &[&str] = &UnitStates::FIELD_NAMES_AS_ARRAY;
216
217async fn parse_service(
219 connection: &zbus::Connection,
220 name: &str,
221 object_path: &OwnedObjectPath,
222) -> Result<ServiceStats, MonitordUnitsError> {
223 debug!("Parsing service {} stats", name);
224
225 let sp = crate::dbus::zbus_service::ServiceProxy::builder(connection)
226 .path(object_path.clone())?
227 .build()
228 .await?;
229 let up = crate::dbus::zbus_unit::UnitProxy::builder(connection)
230 .path(object_path.clone())?
231 .build()
232 .await?;
233
234 let (
237 active_enter_timestamp,
238 active_exit_timestamp,
239 cpuusage_nsec,
240 inactive_exit_timestamp,
241 ioread_bytes,
242 ioread_operations,
243 memory_current,
244 memory_available,
245 nrestarts,
246 processes,
247 restart_usec,
248 state_change_timestamp,
249 status_errno,
250 tasks_current,
251 timeout_clean_usec,
252 watchdog_usec,
253 ) = tokio::join!(
254 up.active_enter_timestamp(),
255 up.active_exit_timestamp(),
256 sp.cpuusage_nsec(),
257 up.inactive_exit_timestamp(),
258 sp.ioread_bytes(),
259 sp.ioread_operations(),
260 sp.memory_current(),
261 sp.memory_available(),
262 sp.nrestarts(),
263 sp.get_processes(),
264 sp.restart_usec(),
265 up.state_change_timestamp(),
266 sp.status_errno(),
267 sp.tasks_current(),
268 sp.timeout_clean_usec(),
269 sp.watchdog_usec(),
270 );
271
272 Ok(ServiceStats {
273 active_enter_timestamp: active_enter_timestamp?,
274 active_exit_timestamp: active_exit_timestamp?,
275 cpuusage_nsec: cpuusage_nsec?,
276 inactive_exit_timestamp: inactive_exit_timestamp?,
277 ioread_bytes: ioread_bytes?,
278 ioread_operations: ioread_operations?,
279 memory_current: memory_current?,
280 memory_available: memory_available?,
281 nrestarts: nrestarts?,
282 processes: processes?.len().try_into()?,
283 restart_usec: restart_usec?,
284 state_change_timestamp: state_change_timestamp?,
285 status_errno: status_errno?,
286 tasks_current: tasks_current?,
287 timeout_clean_usec: timeout_clean_usec?,
288 watchdog_usec: watchdog_usec?,
289 })
290}
291
292async fn get_time_in_state(
293 connection: Option<&zbus::Connection>,
294 unit: &ListedUnit,
295) -> Result<Option<u64>, MonitordUnitsError> {
296 match connection {
297 Some(c) => {
298 let up = crate::dbus::zbus_unit::UnitProxy::builder(c)
299 .path(ObjectPath::from(unit.unit_object_path.clone()))?
300 .build()
301 .await?;
302 let now: u64 = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() * 1_000_000;
303 let state_change_timestamp = match up.state_change_timestamp().await {
304 Ok(sct) => sct,
305 Err(err) => {
306 error!(
307 "Unable to get state_change_timestamp for {} - Setting to 0: {:?}",
308 &unit.name, err,
309 );
310 0
311 }
312 };
313 Ok(Some(now - state_change_timestamp))
314 }
315 None => {
316 error!("No zbus connection passed, but time_in_state_usecs enabled");
317 Ok(None)
318 }
319 }
320}
321
322pub async fn parse_state(
324 stats: &mut SystemdUnitStats,
325 unit: &ListedUnit,
326 config: &crate::config::UnitsConfig,
327 connection: Option<&zbus::Connection>,
328) -> Result<(), MonitordUnitsError> {
329 if config.state_stats_blocklist.contains(&unit.name) {
330 debug!("Skipping state stats for {} due to blocklist", &unit.name);
331 return Ok(());
332 }
333 if !config.state_stats_allowlist.is_empty()
334 && !config.state_stats_allowlist.contains(&unit.name)
335 {
336 return Ok(());
337 }
338 let active_state = SystemdUnitActiveState::from_str(&unit.active_state)
339 .unwrap_or(SystemdUnitActiveState::unknown);
340 let load_state = SystemdUnitLoadState::from_str(&unit.load_state.replace('-', "_"))
341 .unwrap_or(SystemdUnitLoadState::unknown);
342
343 let mut time_in_state_usecs: Option<u64> = None;
345 if config.state_stats_time_in_state {
346 time_in_state_usecs = get_time_in_state(connection, unit).await?;
347 }
348
349 stats.unit_states.insert(
350 unit.name.clone(),
351 UnitStates {
352 active_state,
353 load_state,
354 unhealthy: is_unit_unhealthy(active_state, load_state),
355 time_in_state_usecs,
356 },
357 );
358 Ok(())
359}
360
361fn parse_unit(stats: &mut SystemdUnitStats, unit: &ListedUnit) {
363 match unit.name.rsplit('.').next() {
365 Some("automount") => stats.automount_units += 1,
366 Some("device") => stats.device_units += 1,
367 Some("mount") => stats.mount_units += 1,
368 Some("path") => stats.path_units += 1,
369 Some("scope") => stats.scope_units += 1,
370 Some("service") => stats.service_units += 1,
371 Some("slice") => stats.slice_units += 1,
372 Some("socket") => stats.socket_units += 1,
373 Some("target") => stats.target_units += 1,
374 Some("timer") => stats.timer_units += 1,
375 unknown => debug!("Found unhandled '{:?}' unit type", unknown),
376 };
377 match unit.load_state.as_str() {
379 "loaded" => stats.loaded_units += 1,
380 "masked" => stats.masked_units += 1,
381 "not-found" => stats.not_found_units += 1,
382 _ => debug!("{} is not loaded. It's {}", unit.name, unit.load_state),
383 };
384 match unit.active_state.as_str() {
386 "activating" => stats.activating_units += 1,
387 "active" => stats.active_units += 1,
388 "failed" => stats.failed_units += 1,
389 "inactive" => stats.inactive_units += 1,
390 unknown => debug!("Found unhandled '{}' unit state", unknown),
391 };
392 if unit.job_id != 0 {
394 stats.jobs_queued += 1;
395 }
396}
397
398pub async fn parse_unit_state(
400 config: &crate::config::Config,
401 connection: &zbus::Connection,
402) -> Result<SystemdUnitStats, MonitordUnitsError> {
403 if !config.units.state_stats_allowlist.is_empty() {
404 debug!(
405 "Using unit state allowlist: {:?}",
406 config.units.state_stats_allowlist
407 );
408 }
409
410 if !config.units.state_stats_blocklist.is_empty() {
411 debug!(
412 "Using unit state blocklist: {:?}",
413 config.units.state_stats_blocklist,
414 );
415 }
416
417 let mut stats = SystemdUnitStats::default();
418 let p = crate::dbus::zbus_systemd::ManagerProxy::new(connection).await?;
419 let units = p.list_units().await?;
420
421 stats.total_units = units.len() as u64;
422 for unit_raw in units {
423 let unit: ListedUnit = unit_raw.into();
424 parse_unit(&mut stats, &unit);
426
427 if config.units.state_stats {
430 parse_state(&mut stats, &unit, &config.units, Some(connection)).await?;
431 }
432
433 if config.services.contains(&unit.name) {
435 debug!("Collecting service stats for {:?}", &unit);
436 match parse_service(connection, &unit.name, &unit.unit_object_path).await {
437 Ok(service_stats) => {
438 stats.service_stats.insert(unit.name.clone(), service_stats);
439 }
440 Err(err) => error!(
441 "Unable to get service stats for {} {}: {:#?}",
442 &unit.name, &unit.unit_object_path, err
443 ),
444 }
445 }
446
447 if config.timers.enabled && unit.name.contains(".timer") {
449 if config.timers.blocklist.contains(&unit.name) {
450 debug!("Skipping timer stats for {} due to blocklist", &unit.name);
451 continue;
452 }
453 if !config.timers.allowlist.is_empty() && !config.timers.allowlist.contains(&unit.name)
454 {
455 continue;
456 }
457 let timer_stats: Option<TimerStats> =
458 match crate::timer::collect_timer_stats(connection, &mut stats, &unit).await {
459 Ok(ts) => Some(ts),
460 Err(err) => {
461 error!("Failed to get {} stats: {:#?}", &unit.name, err);
462 None
463 }
464 };
465 if let Some(ts) = timer_stats {
466 stats.timer_stats.insert(unit.name.clone(), ts);
467 }
468 }
469 }
470 debug!("unit stats: {:?}", stats);
471 Ok(stats)
472}
473
474pub async fn update_unit_stats(
476 config: Arc<crate::config::Config>,
477 connection: zbus::Connection,
478 locked_machine_stats: Arc<RwLock<MachineStats>>,
479) -> anyhow::Result<()> {
480 let mut machine_stats = locked_machine_stats.write().await;
481 match parse_unit_state(&config, &connection).await {
482 Ok(units_stats) => machine_stats.units = units_stats,
483 Err(err) => error!("units stats failed: {:?}", err),
484 }
485 Ok(())
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491 use std::collections::HashSet;
492 use strum::IntoEnumIterator;
493
494 fn get_unit_file() -> ListedUnit {
495 ListedUnit {
496 name: String::from("apport-autoreport.timer"),
497 description: String::from(
498 "Process error reports when automatic reporting is enabled (timer based)",
499 ),
500 load_state: String::from("loaded"),
501 active_state: String::from("inactive"),
502 sub_state: String::from("dead"),
503 follow_unit: String::from(""),
504 unit_object_path: ObjectPath::try_from(
505 "/org/freedesktop/systemd1/unit/apport_2dautoreport_2etimer",
506 )
507 .expect("Unable to make an object path")
508 .into(),
509 job_id: 0,
510 job_type: String::from(""),
511 job_object_path: ObjectPath::try_from("/").unwrap().into(),
512 }
513 }
514
515 #[tokio::test]
516 async fn test_state_parse() -> Result<(), MonitordUnitsError> {
517 let test_unit_name = String::from("apport-autoreport.timer");
518 let expected_stats = SystemdUnitStats {
519 activating_units: 0,
520 active_units: 0,
521 automount_units: 0,
522 device_units: 0,
523 failed_units: 0,
524 inactive_units: 0,
525 jobs_queued: 0,
526 loaded_units: 0,
527 masked_units: 0,
528 mount_units: 0,
529 not_found_units: 0,
530 path_units: 0,
531 scope_units: 0,
532 service_units: 0,
533 slice_units: 0,
534 socket_units: 0,
535 target_units: 0,
536 timer_units: 0,
537 timer_persistent_units: 0,
538 timer_remain_after_elapse: 0,
539 total_units: 0,
540 service_stats: HashMap::new(),
541 timer_stats: HashMap::new(),
542 unit_states: HashMap::from([(
543 test_unit_name.clone(),
544 UnitStates {
545 active_state: SystemdUnitActiveState::inactive,
546 load_state: SystemdUnitLoadState::loaded,
547 unhealthy: true,
548 time_in_state_usecs: None,
549 },
550 )]),
551 };
552 let mut stats = SystemdUnitStats::default();
553 let systemd_unit = get_unit_file();
554 let mut config = crate::config::UnitsConfig::default();
555
556 parse_state(&mut stats, &systemd_unit, &config, None).await?;
558 assert_eq!(expected_stats, stats);
559
560 config.state_stats_allowlist = HashSet::from([test_unit_name.clone()]);
562
563 let mut allowlist_stats = SystemdUnitStats::default();
565 parse_state(&mut allowlist_stats, &systemd_unit, &config, None).await?;
566 assert_eq!(expected_stats, allowlist_stats);
567
568 config.state_stats_blocklist = HashSet::from([test_unit_name]);
570
571 let mut blocklist_stats = SystemdUnitStats::default();
573 let expected_blocklist_stats = SystemdUnitStats::default();
574 parse_state(&mut blocklist_stats, &systemd_unit, &config, None).await?;
575 assert_eq!(expected_blocklist_stats, blocklist_stats);
576 Ok(())
577 }
578
579 #[test]
580 fn test_unit_parse() {
581 let expected_stats = SystemdUnitStats {
582 activating_units: 0,
583 active_units: 0,
584 automount_units: 0,
585 device_units: 0,
586 failed_units: 0,
587 inactive_units: 1,
588 jobs_queued: 0,
589 loaded_units: 1,
590 masked_units: 0,
591 mount_units: 0,
592 not_found_units: 0,
593 path_units: 0,
594 scope_units: 0,
595 service_units: 0,
596 slice_units: 0,
597 socket_units: 0,
598 target_units: 0,
599 timer_units: 1,
600 timer_persistent_units: 0,
601 timer_remain_after_elapse: 0,
602 total_units: 0,
603 service_stats: HashMap::new(),
604 timer_stats: HashMap::new(),
605 unit_states: HashMap::new(),
606 };
607 let mut stats = SystemdUnitStats::default();
608 let systemd_unit = get_unit_file();
609 parse_unit(&mut stats, &systemd_unit);
610 assert_eq!(expected_stats, stats);
611 }
612
613 #[test]
614 fn test_unit_parse_activating() {
615 let mut activating_unit = get_unit_file();
616 activating_unit.active_state = String::from("activating");
617 let mut stats = SystemdUnitStats::default();
618 parse_unit(&mut stats, &activating_unit);
619 assert_eq!(stats.activating_units, 1);
620 assert_eq!(stats.active_units, 0);
621 assert_eq!(stats.inactive_units, 0);
622 }
623
624 #[test]
625 fn test_iterators() {
626 assert!(SystemdUnitActiveState::iter().collect::<Vec<_>>().len() > 0);
627 assert!(SystemdUnitLoadState::iter().collect::<Vec<_>>().len() > 0);
628 }
629}