1use std::collections::HashMap;
7use std::str::FromStr;
8use std::sync::Arc;
9use std::time::SystemTime;
10use std::time::UNIX_EPOCH;
11
12use struct_field_names_as_array::FieldNamesAsArray;
13use thiserror::Error;
14use tokio::sync::RwLock;
15use tracing::debug;
16use tracing::error;
17use zbus::zvariant::ObjectPath;
18use zbus::zvariant::OwnedObjectPath;
19
20#[derive(Error, Debug)]
21pub enum MonitordUnitsError {
22 #[error("Units D-Bus error: {0}")]
23 ZbusError(#[from] zbus::Error),
24 #[error("Integer conversion error: {0}")]
25 IntConversion(#[from] std::num::TryFromIntError),
26 #[error("System time error: {0}")]
27 SystemTimeError(#[from] std::time::SystemTimeError),
28}
29
30use crate::timer::TimerStats;
31use crate::MachineStats;
32
33pub use crate::unit_constants::is_unit_unhealthy;
35pub use crate::unit_constants::SystemdUnitActiveState;
36pub use crate::unit_constants::SystemdUnitLoadState;
37
38#[derive(
39 serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
40)]
41
42pub struct SystemdUnitStats {
45 pub activating_units: u64,
47 pub active_units: u64,
49 pub automount_units: u64,
51 pub device_units: u64,
53 pub failed_units: u64,
55 pub inactive_units: u64,
57 pub jobs_queued: u64,
59 pub loaded_units: u64,
61 pub masked_units: u64,
63 pub mount_units: u64,
65 pub not_found_units: u64,
67 pub path_units: u64,
69 pub scope_units: u64,
71 pub service_units: u64,
73 pub slice_units: u64,
75 pub socket_units: u64,
77 pub target_units: u64,
79 pub timer_units: u64,
81 pub timer_persistent_units: u64,
83 pub timer_remain_after_elapse: u64,
85 pub total_units: u64,
87 pub service_stats: HashMap<String, ServiceStats>,
89 pub timer_stats: HashMap<String, TimerStats>,
91 pub unit_states: HashMap<String, UnitStates>,
93}
94
95#[derive(
98 serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
99)]
100pub struct ServiceStats {
101 pub active_enter_timestamp: u64,
103 pub active_exit_timestamp: u64,
105 pub cpuusage_nsec: u64,
107 pub inactive_exit_timestamp: u64,
109 pub ioread_bytes: u64,
111 pub ioread_operations: u64,
113 pub memory_available: u64,
115 pub memory_current: u64,
117 pub nrestarts: u32,
119 pub processes: u32,
121 pub restart_usec: u64,
123 pub state_change_timestamp: u64,
125 pub status_errno: i32,
127 pub tasks_current: u64,
129 pub timeout_clean_usec: u64,
131 pub watchdog_usec: u64,
133}
134
135#[derive(
138 serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
139)]
140pub struct UnitStates {
141 pub active_state: SystemdUnitActiveState,
143 pub load_state: SystemdUnitLoadState,
145 pub unhealthy: bool,
148 pub time_in_state_usecs: Option<u64>,
151}
152
153#[derive(Debug)]
158pub struct ListedUnit {
159 pub name: String, pub description: String, pub load_state: String, pub active_state: String, pub sub_state: String, pub follow_unit: String, pub unit_object_path: OwnedObjectPath, pub job_id: u32, pub job_type: String, pub job_object_path: OwnedObjectPath, }
170impl
171 From<(
172 String,
173 String,
174 String,
175 String,
176 String,
177 String,
178 OwnedObjectPath,
179 u32,
180 String,
181 OwnedObjectPath,
182 )> for ListedUnit
183{
184 fn from(
185 tuple: (
186 String,
187 String,
188 String,
189 String,
190 String,
191 String,
192 OwnedObjectPath,
193 u32,
194 String,
195 OwnedObjectPath,
196 ),
197 ) -> Self {
198 ListedUnit {
199 name: tuple.0,
200 description: tuple.1,
201 load_state: tuple.2,
202 active_state: tuple.3,
203 sub_state: tuple.4,
204 follow_unit: tuple.5,
205 unit_object_path: tuple.6,
206 job_id: tuple.7,
207 job_type: tuple.8,
208 job_object_path: tuple.9,
209 }
210 }
211}
212
213pub const SERVICE_FIELD_NAMES: &[&str] = &ServiceStats::FIELD_NAMES_AS_ARRAY;
214pub const UNIT_FIELD_NAMES: &[&str] = &SystemdUnitStats::FIELD_NAMES_AS_ARRAY;
215pub const UNIT_STATES_FIELD_NAMES: &[&str] = &UnitStates::FIELD_NAMES_AS_ARRAY;
216
217async fn parse_service(
219 connection: &zbus::Connection,
220 name: &str,
221 object_path: &OwnedObjectPath,
222) -> Result<ServiceStats, MonitordUnitsError> {
223 debug!("Parsing service {} stats", name);
224
225 let sp = crate::dbus::zbus_service::ServiceProxy::builder(connection)
226 .cache_properties(zbus::proxy::CacheProperties::No)
227 .path(object_path.clone())?
228 .build()
229 .await?;
230 let up = crate::dbus::zbus_unit::UnitProxy::builder(connection)
231 .cache_properties(zbus::proxy::CacheProperties::No)
232 .path(object_path.clone())?
233 .build()
234 .await?;
235
236 let (
239 active_enter_timestamp,
240 active_exit_timestamp,
241 cpuusage_nsec,
242 inactive_exit_timestamp,
243 ioread_bytes,
244 ioread_operations,
245 memory_current,
246 memory_available,
247 nrestarts,
248 processes,
249 restart_usec,
250 state_change_timestamp,
251 status_errno,
252 tasks_current,
253 timeout_clean_usec,
254 watchdog_usec,
255 ) = tokio::join!(
256 up.active_enter_timestamp(),
257 up.active_exit_timestamp(),
258 sp.cpuusage_nsec(),
259 up.inactive_exit_timestamp(),
260 sp.ioread_bytes(),
261 sp.ioread_operations(),
262 sp.memory_current(),
263 sp.memory_available(),
264 sp.nrestarts(),
265 sp.get_processes(),
266 sp.restart_usec(),
267 up.state_change_timestamp(),
268 sp.status_errno(),
269 sp.tasks_current(),
270 sp.timeout_clean_usec(),
271 sp.watchdog_usec(),
272 );
273
274 Ok(ServiceStats {
275 active_enter_timestamp: active_enter_timestamp?,
276 active_exit_timestamp: active_exit_timestamp?,
277 cpuusage_nsec: cpuusage_nsec?,
278 inactive_exit_timestamp: inactive_exit_timestamp?,
279 ioread_bytes: ioread_bytes?,
280 ioread_operations: ioread_operations?,
281 memory_current: memory_current?,
282 memory_available: memory_available?,
283 nrestarts: nrestarts?,
284 processes: processes?.len().try_into()?,
285 restart_usec: restart_usec?,
286 state_change_timestamp: state_change_timestamp?,
287 status_errno: status_errno?,
288 tasks_current: tasks_current?,
289 timeout_clean_usec: timeout_clean_usec?,
290 watchdog_usec: watchdog_usec?,
291 })
292}
293
294async fn get_time_in_state(
295 connection: Option<&zbus::Connection>,
296 unit: &ListedUnit,
297) -> Result<Option<u64>, MonitordUnitsError> {
298 match connection {
299 Some(c) => {
300 let up = crate::dbus::zbus_unit::UnitProxy::builder(c)
301 .cache_properties(zbus::proxy::CacheProperties::No)
302 .path(ObjectPath::from(unit.unit_object_path.clone()))?
303 .build()
304 .await?;
305 let now: u64 = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() * 1_000_000;
306 let state_change_timestamp = match up.state_change_timestamp().await {
307 Ok(sct) => sct,
308 Err(err) => {
309 error!(
310 "Unable to get state_change_timestamp for {} - Setting to 0: {:?}",
311 &unit.name, err,
312 );
313 0
314 }
315 };
316 Ok(Some(now - state_change_timestamp))
317 }
318 None => {
319 error!("No zbus connection passed, but time_in_state_usecs enabled");
320 Ok(None)
321 }
322 }
323}
324
325pub async fn parse_state(
327 stats: &mut SystemdUnitStats,
328 unit: &ListedUnit,
329 config: &crate::config::UnitsConfig,
330 connection: Option<&zbus::Connection>,
331) -> Result<(), MonitordUnitsError> {
332 if config.state_stats_blocklist.contains(&unit.name) {
333 debug!("Skipping state stats for {} due to blocklist", &unit.name);
334 return Ok(());
335 }
336 if !config.state_stats_allowlist.is_empty()
337 && !config.state_stats_allowlist.contains(&unit.name)
338 {
339 return Ok(());
340 }
341 let active_state = SystemdUnitActiveState::from_str(&unit.active_state)
342 .unwrap_or(SystemdUnitActiveState::unknown);
343 let load_state = SystemdUnitLoadState::from_str(&unit.load_state.replace('-', "_"))
344 .unwrap_or(SystemdUnitLoadState::unknown);
345
346 let mut time_in_state_usecs: Option<u64> = None;
348 if config.state_stats_time_in_state {
349 time_in_state_usecs = get_time_in_state(connection, unit).await?;
350 }
351
352 stats.unit_states.insert(
353 unit.name.clone(),
354 UnitStates {
355 active_state,
356 load_state,
357 unhealthy: is_unit_unhealthy(active_state, load_state),
358 time_in_state_usecs,
359 },
360 );
361 Ok(())
362}
363
364fn parse_unit(stats: &mut SystemdUnitStats, unit: &ListedUnit) {
366 match unit.name.rsplit('.').next() {
368 Some("automount") => stats.automount_units += 1,
369 Some("device") => stats.device_units += 1,
370 Some("mount") => stats.mount_units += 1,
371 Some("path") => stats.path_units += 1,
372 Some("scope") => stats.scope_units += 1,
373 Some("service") => stats.service_units += 1,
374 Some("slice") => stats.slice_units += 1,
375 Some("socket") => stats.socket_units += 1,
376 Some("target") => stats.target_units += 1,
377 Some("timer") => stats.timer_units += 1,
378 unknown => debug!("Found unhandled '{:?}' unit type", unknown),
379 };
380 match unit.load_state.as_str() {
382 "loaded" => stats.loaded_units += 1,
383 "masked" => stats.masked_units += 1,
384 "not-found" => stats.not_found_units += 1,
385 _ => debug!("{} is not loaded. It's {}", unit.name, unit.load_state),
386 };
387 match unit.active_state.as_str() {
389 "activating" => stats.activating_units += 1,
390 "active" => stats.active_units += 1,
391 "failed" => stats.failed_units += 1,
392 "inactive" => stats.inactive_units += 1,
393 unknown => debug!("Found unhandled '{}' unit state", unknown),
394 };
395 if unit.job_id != 0 {
397 stats.jobs_queued += 1;
398 }
399}
400
401pub async fn parse_unit_state(
403 config: &crate::config::Config,
404 connection: &zbus::Connection,
405) -> Result<SystemdUnitStats, MonitordUnitsError> {
406 if !config.units.state_stats_allowlist.is_empty() {
407 debug!(
408 "Using unit state allowlist: {:?}",
409 config.units.state_stats_allowlist
410 );
411 }
412
413 if !config.units.state_stats_blocklist.is_empty() {
414 debug!(
415 "Using unit state blocklist: {:?}",
416 config.units.state_stats_blocklist,
417 );
418 }
419
420 let mut stats = SystemdUnitStats::default();
421 let p = crate::dbus::zbus_systemd::ManagerProxy::builder(connection)
422 .cache_properties(zbus::proxy::CacheProperties::No)
423 .build()
424 .await?;
425 let units = p.list_units().await?;
426
427 stats.total_units = units.len() as u64;
428 for unit_raw in units {
429 let unit: ListedUnit = unit_raw.into();
430 parse_unit(&mut stats, &unit);
432
433 if config.units.state_stats {
436 parse_state(&mut stats, &unit, &config.units, Some(connection)).await?;
437 }
438
439 if config.services.contains(&unit.name) {
441 debug!("Collecting service stats for {:?}", &unit);
442 match parse_service(connection, &unit.name, &unit.unit_object_path).await {
443 Ok(service_stats) => {
444 stats.service_stats.insert(unit.name.clone(), service_stats);
445 }
446 Err(err) => error!(
447 "Unable to get service stats for {} {}: {:#?}",
448 &unit.name, &unit.unit_object_path, err
449 ),
450 }
451 }
452
453 if config.timers.enabled && unit.name.contains(".timer") {
455 if config.timers.blocklist.contains(&unit.name) {
456 debug!("Skipping timer stats for {} due to blocklist", &unit.name);
457 continue;
458 }
459 if !config.timers.allowlist.is_empty() && !config.timers.allowlist.contains(&unit.name)
460 {
461 continue;
462 }
463 let timer_stats: Option<TimerStats> =
464 match crate::timer::collect_timer_stats(connection, &mut stats, &unit).await {
465 Ok(ts) => Some(ts),
466 Err(err) => {
467 error!("Failed to get {} stats: {:#?}", &unit.name, err);
468 None
469 }
470 };
471 if let Some(ts) = timer_stats {
472 stats.timer_stats.insert(unit.name.clone(), ts);
473 }
474 }
475 }
476 debug!("unit stats: {:?}", stats);
477 Ok(stats)
478}
479
480pub async fn update_unit_stats(
482 config: Arc<crate::config::Config>,
483 connection: zbus::Connection,
484 locked_machine_stats: Arc<RwLock<MachineStats>>,
485) -> anyhow::Result<()> {
486 let mut machine_stats = locked_machine_stats.write().await;
487 match parse_unit_state(&config, &connection).await {
488 Ok(units_stats) => machine_stats.units = units_stats,
489 Err(err) => error!("units stats failed: {:?}", err),
490 }
491 Ok(())
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use std::collections::HashSet;
498 use strum::IntoEnumIterator;
499
500 fn get_unit_file() -> ListedUnit {
501 ListedUnit {
502 name: String::from("apport-autoreport.timer"),
503 description: String::from(
504 "Process error reports when automatic reporting is enabled (timer based)",
505 ),
506 load_state: String::from("loaded"),
507 active_state: String::from("inactive"),
508 sub_state: String::from("dead"),
509 follow_unit: String::from(""),
510 unit_object_path: ObjectPath::try_from(
511 "/org/freedesktop/systemd1/unit/apport_2dautoreport_2etimer",
512 )
513 .expect("Unable to make an object path")
514 .into(),
515 job_id: 0,
516 job_type: String::from(""),
517 job_object_path: ObjectPath::try_from("/").unwrap().into(),
518 }
519 }
520
521 #[tokio::test]
522 async fn test_state_parse() -> Result<(), MonitordUnitsError> {
523 let test_unit_name = String::from("apport-autoreport.timer");
524 let expected_stats = SystemdUnitStats {
525 activating_units: 0,
526 active_units: 0,
527 automount_units: 0,
528 device_units: 0,
529 failed_units: 0,
530 inactive_units: 0,
531 jobs_queued: 0,
532 loaded_units: 0,
533 masked_units: 0,
534 mount_units: 0,
535 not_found_units: 0,
536 path_units: 0,
537 scope_units: 0,
538 service_units: 0,
539 slice_units: 0,
540 socket_units: 0,
541 target_units: 0,
542 timer_units: 0,
543 timer_persistent_units: 0,
544 timer_remain_after_elapse: 0,
545 total_units: 0,
546 service_stats: HashMap::new(),
547 timer_stats: HashMap::new(),
548 unit_states: HashMap::from([(
549 test_unit_name.clone(),
550 UnitStates {
551 active_state: SystemdUnitActiveState::inactive,
552 load_state: SystemdUnitLoadState::loaded,
553 unhealthy: true,
554 time_in_state_usecs: None,
555 },
556 )]),
557 };
558 let mut stats = SystemdUnitStats::default();
559 let systemd_unit = get_unit_file();
560 let mut config = crate::config::UnitsConfig::default();
561
562 parse_state(&mut stats, &systemd_unit, &config, None).await?;
564 assert_eq!(expected_stats, stats);
565
566 config.state_stats_allowlist = HashSet::from([test_unit_name.clone()]);
568
569 let mut allowlist_stats = SystemdUnitStats::default();
571 parse_state(&mut allowlist_stats, &systemd_unit, &config, None).await?;
572 assert_eq!(expected_stats, allowlist_stats);
573
574 config.state_stats_blocklist = HashSet::from([test_unit_name]);
576
577 let mut blocklist_stats = SystemdUnitStats::default();
579 let expected_blocklist_stats = SystemdUnitStats::default();
580 parse_state(&mut blocklist_stats, &systemd_unit, &config, None).await?;
581 assert_eq!(expected_blocklist_stats, blocklist_stats);
582 Ok(())
583 }
584
585 #[test]
586 fn test_unit_parse() {
587 let expected_stats = SystemdUnitStats {
588 activating_units: 0,
589 active_units: 0,
590 automount_units: 0,
591 device_units: 0,
592 failed_units: 0,
593 inactive_units: 1,
594 jobs_queued: 0,
595 loaded_units: 1,
596 masked_units: 0,
597 mount_units: 0,
598 not_found_units: 0,
599 path_units: 0,
600 scope_units: 0,
601 service_units: 0,
602 slice_units: 0,
603 socket_units: 0,
604 target_units: 0,
605 timer_units: 1,
606 timer_persistent_units: 0,
607 timer_remain_after_elapse: 0,
608 total_units: 0,
609 service_stats: HashMap::new(),
610 timer_stats: HashMap::new(),
611 unit_states: HashMap::new(),
612 };
613 let mut stats = SystemdUnitStats::default();
614 let systemd_unit = get_unit_file();
615 parse_unit(&mut stats, &systemd_unit);
616 assert_eq!(expected_stats, stats);
617 }
618
619 #[test]
620 fn test_unit_parse_activating() {
621 let mut activating_unit = get_unit_file();
622 activating_unit.active_state = String::from("activating");
623 let mut stats = SystemdUnitStats::default();
624 parse_unit(&mut stats, &activating_unit);
625 assert_eq!(stats.activating_units, 1);
626 assert_eq!(stats.active_units, 0);
627 assert_eq!(stats.inactive_units, 0);
628 }
629
630 #[test]
631 fn test_iterators() {
632 assert!(SystemdUnitActiveState::iter().collect::<Vec<_>>().len() > 0);
633 assert!(SystemdUnitLoadState::iter().collect::<Vec<_>>().len() > 0);
634 }
635}