1use std::collections::HashMap;
7use std::str::FromStr;
8use std::sync::Arc;
9use std::time::Instant;
10use std::time::SystemTime;
11use std::time::UNIX_EPOCH;
12
13use struct_field_names_as_array::FieldNamesAsArray;
14use thiserror::Error;
15use tokio::sync::RwLock;
16use tracing::debug;
17use tracing::error;
18use zbus::zvariant::ObjectPath;
19use zbus::zvariant::OwnedObjectPath;
20
21#[derive(Error, Debug)]
22pub enum MonitordUnitsError {
23 #[error("Units D-Bus error: {0}")]
24 ZbusError(#[from] zbus::Error),
25 #[error("Integer conversion error: {0}")]
26 IntConversion(#[from] std::num::TryFromIntError),
27 #[error("System time error: {0}")]
28 SystemTimeError(#[from] std::time::SystemTimeError),
29}
30
31use crate::timer::TimerStats;
32use crate::MachineStats;
33
34pub use crate::unit_constants::is_unit_unhealthy;
36pub use crate::unit_constants::SystemdUnitActiveState;
37pub use crate::unit_constants::SystemdUnitLoadState;
38
39#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
44pub struct UnitsCollectionTimings {
45 pub list_units_ms: f64,
47 pub per_unit_loop_ms: f64,
50 pub timer_dbus_fetches: u64,
52 pub state_dbus_fetches: u64,
54 pub service_dbus_fetches: u64,
56}
57
58#[derive(
59 serde::Serialize, serde::Deserialize, Clone, Debug, Default, FieldNamesAsArray, PartialEq,
60)]
61
62pub struct SystemdUnitStats {
65 pub activating_units: u64,
67 pub active_units: u64,
69 pub automount_units: u64,
71 pub device_units: u64,
73 pub failed_units: u64,
75 pub inactive_units: u64,
77 pub jobs_queued: u64,
79 pub loaded_units: u64,
81 pub masked_units: u64,
83 pub mount_units: u64,
85 pub not_found_units: u64,
87 pub path_units: u64,
89 pub scope_units: u64,
91 pub service_units: u64,
93 pub slice_units: u64,
95 pub socket_units: u64,
97 pub target_units: u64,
99 pub timer_units: u64,
101 pub timer_persistent_units: u64,
103 pub timer_remain_after_elapse: u64,
105 pub total_units: u64,
107 pub service_stats: HashMap<String, ServiceStats>,
109 pub timer_stats: HashMap<String, TimerStats>,
111 pub unit_states: HashMap<String, UnitStates>,
113 pub collection_timings: UnitsCollectionTimings,
116}
117
118#[derive(
121 serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
122)]
123pub struct ServiceStats {
124 pub active_enter_timestamp: u64,
126 pub active_exit_timestamp: u64,
128 pub cpuusage_nsec: u64,
130 pub inactive_exit_timestamp: u64,
132 pub ioread_bytes: u64,
134 pub ioread_operations: u64,
136 pub memory_available: u64,
138 pub memory_current: u64,
140 pub nrestarts: u32,
142 pub processes: u32,
144 pub restart_usec: u64,
146 pub state_change_timestamp: u64,
148 pub status_errno: i32,
150 pub tasks_current: u64,
152 pub timeout_clean_usec: u64,
154 pub watchdog_usec: u64,
156}
157
158#[derive(
161 serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
162)]
163pub struct UnitStates {
164 pub active_state: SystemdUnitActiveState,
166 pub load_state: SystemdUnitLoadState,
168 pub unhealthy: bool,
171 pub time_in_state_usecs: Option<u64>,
174}
175
176#[derive(Debug)]
181pub struct ListedUnit {
182 pub name: String, pub description: String, pub load_state: String, pub active_state: String, pub sub_state: String, pub follow_unit: String, pub unit_object_path: OwnedObjectPath, pub job_id: u32, pub job_type: String, pub job_object_path: OwnedObjectPath, }
193impl
194 From<(
195 String,
196 String,
197 String,
198 String,
199 String,
200 String,
201 OwnedObjectPath,
202 u32,
203 String,
204 OwnedObjectPath,
205 )> for ListedUnit
206{
207 fn from(
208 tuple: (
209 String,
210 String,
211 String,
212 String,
213 String,
214 String,
215 OwnedObjectPath,
216 u32,
217 String,
218 OwnedObjectPath,
219 ),
220 ) -> Self {
221 ListedUnit {
222 name: tuple.0,
223 description: tuple.1,
224 load_state: tuple.2,
225 active_state: tuple.3,
226 sub_state: tuple.4,
227 follow_unit: tuple.5,
228 unit_object_path: tuple.6,
229 job_id: tuple.7,
230 job_type: tuple.8,
231 job_object_path: tuple.9,
232 }
233 }
234}
235
236pub const SERVICE_FIELD_NAMES: &[&str] = &ServiceStats::FIELD_NAMES_AS_ARRAY;
237pub const UNIT_FIELD_NAMES: &[&str] = &SystemdUnitStats::FIELD_NAMES_AS_ARRAY;
238pub const UNIT_STATES_FIELD_NAMES: &[&str] = &UnitStates::FIELD_NAMES_AS_ARRAY;
239
240async fn parse_service(
242 connection: &zbus::Connection,
243 name: &str,
244 object_path: &OwnedObjectPath,
245) -> Result<ServiceStats, MonitordUnitsError> {
246 debug!("Parsing service {} stats", name);
247
248 let sp = crate::dbus::zbus_service::ServiceProxy::builder(connection)
249 .cache_properties(zbus::proxy::CacheProperties::No)
250 .path(object_path.clone())?
251 .build()
252 .await?;
253 let up = crate::dbus::zbus_unit::UnitProxy::builder(connection)
254 .cache_properties(zbus::proxy::CacheProperties::No)
255 .path(object_path.clone())?
256 .build()
257 .await?;
258
259 let (
262 active_enter_timestamp,
263 active_exit_timestamp,
264 cpuusage_nsec,
265 inactive_exit_timestamp,
266 ioread_bytes,
267 ioread_operations,
268 memory_current,
269 memory_available,
270 nrestarts,
271 processes,
272 restart_usec,
273 state_change_timestamp,
274 status_errno,
275 tasks_current,
276 timeout_clean_usec,
277 watchdog_usec,
278 ) = tokio::join!(
279 up.active_enter_timestamp(),
280 up.active_exit_timestamp(),
281 sp.cpuusage_nsec(),
282 up.inactive_exit_timestamp(),
283 sp.ioread_bytes(),
284 sp.ioread_operations(),
285 sp.memory_current(),
286 sp.memory_available(),
287 sp.nrestarts(),
288 sp.get_processes(),
289 sp.restart_usec(),
290 up.state_change_timestamp(),
291 sp.status_errno(),
292 sp.tasks_current(),
293 sp.timeout_clean_usec(),
294 sp.watchdog_usec(),
295 );
296
297 Ok(ServiceStats {
298 active_enter_timestamp: active_enter_timestamp?,
299 active_exit_timestamp: active_exit_timestamp?,
300 cpuusage_nsec: cpuusage_nsec?,
301 inactive_exit_timestamp: inactive_exit_timestamp?,
302 ioread_bytes: ioread_bytes?,
303 ioread_operations: ioread_operations?,
304 memory_current: memory_current?,
305 memory_available: memory_available?,
306 nrestarts: nrestarts?,
307 processes: processes?.len().try_into()?,
308 restart_usec: restart_usec?,
309 state_change_timestamp: state_change_timestamp?,
310 status_errno: status_errno?,
311 tasks_current: tasks_current?,
312 timeout_clean_usec: timeout_clean_usec?,
313 watchdog_usec: watchdog_usec?,
314 })
315}
316
317async fn get_time_in_state(
318 connection: Option<&zbus::Connection>,
319 unit: &ListedUnit,
320) -> Result<Option<u64>, MonitordUnitsError> {
321 match connection {
322 Some(c) => {
323 let up = crate::dbus::zbus_unit::UnitProxy::builder(c)
324 .cache_properties(zbus::proxy::CacheProperties::No)
325 .path(ObjectPath::from(unit.unit_object_path.clone()))?
326 .build()
327 .await?;
328 let now: u64 = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() * 1_000_000;
329 let state_change_timestamp = match up.state_change_timestamp().await {
330 Ok(sct) => sct,
331 Err(err) => {
332 error!(
333 "Unable to get state_change_timestamp for {} - Setting to 0: {:?}",
334 &unit.name, err,
335 );
336 0
337 }
338 };
339 Ok(Some(now - state_change_timestamp))
340 }
341 None => {
342 error!("No zbus connection passed, but time_in_state_usecs enabled");
343 Ok(None)
344 }
345 }
346}
347
348pub async fn parse_state(
354 stats: &mut SystemdUnitStats,
355 unit: &ListedUnit,
356 config: &crate::config::UnitsConfig,
357 connection: Option<&zbus::Connection>,
358) -> Result<bool, MonitordUnitsError> {
359 if config.state_stats_blocklist.contains(&unit.name) {
360 debug!("Skipping state stats for {} due to blocklist", &unit.name);
361 return Ok(false);
362 }
363 if !config.state_stats_allowlist.is_empty()
364 && !config.state_stats_allowlist.contains(&unit.name)
365 {
366 return Ok(false);
367 }
368 let active_state = SystemdUnitActiveState::from_str(&unit.active_state)
369 .unwrap_or(SystemdUnitActiveState::unknown);
370 let load_state = SystemdUnitLoadState::from_str(&unit.load_state.replace('-', "_"))
371 .unwrap_or(SystemdUnitLoadState::unknown);
372
373 let mut time_in_state_usecs: Option<u64> = None;
375 let mut did_dbus_fetch = false;
376 if config.state_stats_time_in_state {
377 time_in_state_usecs = get_time_in_state(connection, unit).await?;
378 did_dbus_fetch = connection.is_some();
381 }
382
383 stats.unit_states.insert(
384 unit.name.clone(),
385 UnitStates {
386 active_state,
387 load_state,
388 unhealthy: is_unit_unhealthy(active_state, load_state),
389 time_in_state_usecs,
390 },
391 );
392 Ok(did_dbus_fetch)
393}
394
395fn parse_unit(stats: &mut SystemdUnitStats, unit: &ListedUnit) {
397 match unit.name.rsplit('.').next() {
399 Some("automount") => stats.automount_units += 1,
400 Some("device") => stats.device_units += 1,
401 Some("mount") => stats.mount_units += 1,
402 Some("path") => stats.path_units += 1,
403 Some("scope") => stats.scope_units += 1,
404 Some("service") => stats.service_units += 1,
405 Some("slice") => stats.slice_units += 1,
406 Some("socket") => stats.socket_units += 1,
407 Some("target") => stats.target_units += 1,
408 Some("timer") => stats.timer_units += 1,
409 unknown => debug!("Found unhandled '{:?}' unit type", unknown),
410 };
411 match unit.load_state.as_str() {
413 "loaded" => stats.loaded_units += 1,
414 "masked" => stats.masked_units += 1,
415 "not-found" => stats.not_found_units += 1,
416 _ => debug!("{} is not loaded. It's {}", unit.name, unit.load_state),
417 };
418 match unit.active_state.as_str() {
420 "activating" => stats.activating_units += 1,
421 "active" => stats.active_units += 1,
422 "failed" => stats.failed_units += 1,
423 "inactive" => stats.inactive_units += 1,
424 unknown => debug!("Found unhandled '{}' unit state", unknown),
425 };
426 if unit.job_id != 0 {
428 stats.jobs_queued += 1;
429 }
430}
431
432pub async fn parse_unit_state(
434 config: &crate::config::Config,
435 connection: &zbus::Connection,
436) -> Result<SystemdUnitStats, MonitordUnitsError> {
437 if !config.units.state_stats_allowlist.is_empty() {
438 debug!(
439 "Using unit state allowlist: {:?}",
440 config.units.state_stats_allowlist
441 );
442 }
443
444 if !config.units.state_stats_blocklist.is_empty() {
445 debug!(
446 "Using unit state blocklist: {:?}",
447 config.units.state_stats_blocklist,
448 );
449 }
450
451 let mut stats = SystemdUnitStats::default();
452 let p = crate::dbus::zbus_systemd::ManagerProxy::builder(connection)
453 .cache_properties(zbus::proxy::CacheProperties::No)
454 .build()
455 .await?;
456
457 let list_units_start = Instant::now();
458 let units = p.list_units().await?;
459 let list_units_elapsed = list_units_start.elapsed();
460 stats.collection_timings.list_units_ms = list_units_elapsed.as_secs_f64() * 1000.0;
461
462 stats.total_units = units.len() as u64;
463
464 let per_unit_loop_start = Instant::now();
465 let mut state_dbus_fetches: u64 = 0;
466 let mut service_dbus_fetches: u64 = 0;
467 let mut timer_dbus_fetches: u64 = 0;
468
469 for unit_raw in units {
470 let unit: ListedUnit = unit_raw.into();
471 parse_unit(&mut stats, &unit);
473
474 if config.units.state_stats {
477 let did_dbus_fetch =
478 parse_state(&mut stats, &unit, &config.units, Some(connection)).await?;
479 if did_dbus_fetch {
480 state_dbus_fetches += 1;
481 }
482 }
483
484 if config.services.contains(&unit.name) {
486 debug!("Collecting service stats for {:?}", &unit);
487 match parse_service(connection, &unit.name, &unit.unit_object_path).await {
488 Ok(service_stats) => {
489 stats.service_stats.insert(unit.name.clone(), service_stats);
490 service_dbus_fetches += 1;
491 }
492 Err(err) => error!(
493 "Unable to get service stats for {} {}: {:#?}",
494 &unit.name, &unit.unit_object_path, err
495 ),
496 }
497 }
498
499 if config.timers.enabled && unit.name.contains(".timer") {
501 if config.timers.blocklist.contains(&unit.name) {
502 debug!("Skipping timer stats for {} due to blocklist", &unit.name);
503 continue;
504 }
505 if !config.timers.allowlist.is_empty() && !config.timers.allowlist.contains(&unit.name)
506 {
507 continue;
508 }
509 let timer_stats: Option<TimerStats> =
510 match crate::timer::collect_timer_stats(connection, &mut stats, &unit).await {
511 Ok(ts) => {
512 timer_dbus_fetches += 1;
513 Some(ts)
514 }
515 Err(err) => {
516 error!("Failed to get {} stats: {:#?}", &unit.name, err);
517 None
518 }
519 };
520 if let Some(ts) = timer_stats {
521 stats.timer_stats.insert(unit.name.clone(), ts);
522 }
523 }
524 }
525 let per_unit_loop_elapsed = per_unit_loop_start.elapsed();
526 stats.collection_timings.per_unit_loop_ms = per_unit_loop_elapsed.as_secs_f64() * 1000.0;
527 stats.collection_timings.state_dbus_fetches = state_dbus_fetches;
528 stats.collection_timings.service_dbus_fetches = service_dbus_fetches;
529 stats.collection_timings.timer_dbus_fetches = timer_dbus_fetches;
530
531 debug!("unit stats: {:?}", stats);
532 Ok(stats)
533}
534
535pub async fn update_unit_stats(
537 config: Arc<crate::config::Config>,
538 connection: zbus::Connection,
539 locked_machine_stats: Arc<RwLock<MachineStats>>,
540) -> anyhow::Result<()> {
541 let mut machine_stats = locked_machine_stats.write().await;
542 match parse_unit_state(&config, &connection).await {
543 Ok(units_stats) => machine_stats.units = units_stats,
544 Err(err) => error!("units stats failed: {:?}", err),
545 }
546 Ok(())
547}
548
549#[cfg(test)]
550mod tests {
551 use super::*;
552 use std::collections::HashSet;
553 use strum::IntoEnumIterator;
554
555 fn get_unit_file() -> ListedUnit {
556 ListedUnit {
557 name: String::from("apport-autoreport.timer"),
558 description: String::from(
559 "Process error reports when automatic reporting is enabled (timer based)",
560 ),
561 load_state: String::from("loaded"),
562 active_state: String::from("inactive"),
563 sub_state: String::from("dead"),
564 follow_unit: String::from(""),
565 unit_object_path: ObjectPath::try_from(
566 "/org/freedesktop/systemd1/unit/apport_2dautoreport_2etimer",
567 )
568 .expect("Unable to make an object path")
569 .into(),
570 job_id: 0,
571 job_type: String::from(""),
572 job_object_path: ObjectPath::try_from("/").unwrap().into(),
573 }
574 }
575
576 #[tokio::test]
577 async fn test_state_parse() -> Result<(), MonitordUnitsError> {
578 let test_unit_name = String::from("apport-autoreport.timer");
579 let expected_stats = SystemdUnitStats {
580 activating_units: 0,
581 active_units: 0,
582 automount_units: 0,
583 device_units: 0,
584 failed_units: 0,
585 inactive_units: 0,
586 jobs_queued: 0,
587 loaded_units: 0,
588 masked_units: 0,
589 mount_units: 0,
590 not_found_units: 0,
591 path_units: 0,
592 scope_units: 0,
593 service_units: 0,
594 slice_units: 0,
595 socket_units: 0,
596 target_units: 0,
597 timer_units: 0,
598 timer_persistent_units: 0,
599 timer_remain_after_elapse: 0,
600 total_units: 0,
601 service_stats: HashMap::new(),
602 timer_stats: HashMap::new(),
603 unit_states: HashMap::from([(
604 test_unit_name.clone(),
605 UnitStates {
606 active_state: SystemdUnitActiveState::inactive,
607 load_state: SystemdUnitLoadState::loaded,
608 unhealthy: true,
609 time_in_state_usecs: None,
610 },
611 )]),
612 collection_timings: UnitsCollectionTimings::default(),
613 };
614 let mut stats = SystemdUnitStats::default();
615 let systemd_unit = get_unit_file();
616 let mut config = crate::config::UnitsConfig::default();
617
618 let did_fetch = parse_state(&mut stats, &systemd_unit, &config, None).await?;
621 assert_eq!(expected_stats, stats);
622 assert!(!did_fetch);
623
624 config.state_stats_allowlist = HashSet::from([test_unit_name.clone()]);
626
627 let mut allowlist_stats = SystemdUnitStats::default();
629 let did_fetch = parse_state(&mut allowlist_stats, &systemd_unit, &config, None).await?;
630 assert_eq!(expected_stats, allowlist_stats);
631 assert!(!did_fetch);
632
633 config.state_stats_blocklist = HashSet::from([test_unit_name]);
635
636 let mut blocklist_stats = SystemdUnitStats::default();
638 let expected_blocklist_stats = SystemdUnitStats::default();
639 let did_fetch = parse_state(&mut blocklist_stats, &systemd_unit, &config, None).await?;
640 assert_eq!(expected_blocklist_stats, blocklist_stats);
641 assert!(!did_fetch);
643 Ok(())
644 }
645
646 #[test]
647 fn test_unit_parse() {
648 let expected_stats = SystemdUnitStats {
649 activating_units: 0,
650 active_units: 0,
651 automount_units: 0,
652 device_units: 0,
653 failed_units: 0,
654 inactive_units: 1,
655 jobs_queued: 0,
656 loaded_units: 1,
657 masked_units: 0,
658 mount_units: 0,
659 not_found_units: 0,
660 path_units: 0,
661 scope_units: 0,
662 service_units: 0,
663 slice_units: 0,
664 socket_units: 0,
665 target_units: 0,
666 timer_units: 1,
667 timer_persistent_units: 0,
668 timer_remain_after_elapse: 0,
669 total_units: 0,
670 service_stats: HashMap::new(),
671 timer_stats: HashMap::new(),
672 unit_states: HashMap::new(),
673 collection_timings: UnitsCollectionTimings::default(),
674 };
675 let mut stats = SystemdUnitStats::default();
676 let systemd_unit = get_unit_file();
677 parse_unit(&mut stats, &systemd_unit);
678 assert_eq!(expected_stats, stats);
679 }
680
681 #[test]
682 fn test_unit_parse_activating() {
683 let mut activating_unit = get_unit_file();
684 activating_unit.active_state = String::from("activating");
685 let mut stats = SystemdUnitStats::default();
686 parse_unit(&mut stats, &activating_unit);
687 assert_eq!(stats.activating_units, 1);
688 assert_eq!(stats.active_units, 0);
689 assert_eq!(stats.inactive_units, 0);
690 }
691
692 #[test]
693 fn test_iterators() {
694 assert!(SystemdUnitActiveState::iter().collect::<Vec<_>>().len() > 0);
695 assert!(SystemdUnitLoadState::iter().collect::<Vec<_>>().len() > 0);
696 }
697}