monitord/
units.rs

1//! # units module
2//!
3//! All main systemd unit statistics. Counts of types of units, unit states and
4//! queued jobs. We also house service specific statistics and system unit states.
5
6use std::collections::HashMap;
7use std::str::FromStr;
8use std::sync::Arc;
9use std::time::SystemTime;
10use std::time::UNIX_EPOCH;
11
12use anyhow::Result;
13use int_enum::IntEnum;
14use serde_repr::*;
15use struct_field_names_as_array::FieldNamesAsArray;
16use strum_macros::EnumIter;
17use strum_macros::EnumString;
18use tokio::sync::RwLock;
19use tracing::debug;
20use tracing::error;
21use zbus::zvariant::ObjectPath;
22use zbus::zvariant::OwnedObjectPath;
23
24use crate::timer::TimerStats;
25use crate::MachineStats;
26
27#[derive(
28    serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
29)]
30
31/// Struct with all the unit count statistics
32pub struct SystemdUnitStats {
33    pub active_units: u64,
34    pub automount_units: u64,
35    pub device_units: u64,
36    pub failed_units: u64,
37    pub inactive_units: u64,
38    pub jobs_queued: u64,
39    pub loaded_units: u64,
40    pub masked_units: u64,
41    pub mount_units: u64,
42    pub not_found_units: u64,
43    pub path_units: u64,
44    pub scope_units: u64,
45    pub service_units: u64,
46    pub slice_units: u64,
47    pub socket_units: u64,
48    pub target_units: u64,
49    pub timer_units: u64,
50    pub timer_persistent_units: u64,
51    pub timer_remain_after_elapse: u64,
52    pub total_units: u64,
53    pub service_stats: HashMap<String, ServiceStats>,
54    pub timer_stats: HashMap<String, TimerStats>,
55    pub unit_states: HashMap<String, UnitStates>,
56}
57
58/// Selected subset of metrics collected from systemd OrgFreedesktopSystemd1Service
59#[derive(
60    serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
61)]
62pub struct ServiceStats {
63    pub active_enter_timestamp: u64,
64    pub active_exit_timestamp: u64,
65    pub cpuusage_nsec: u64,
66    pub inactive_exit_timestamp: u64,
67    pub ioread_bytes: u64,
68    pub ioread_operations: u64,
69    pub memory_available: u64,
70    pub memory_current: u64,
71    pub nrestarts: u32,
72    pub processes: u32,
73    pub restart_usec: u64,
74    pub state_change_timestamp: u64,
75    pub status_errno: i32,
76    pub tasks_current: u64,
77    pub timeout_clean_usec: u64,
78    pub watchdog_usec: u64,
79}
80
81/// Collection of a Unit active and load state: <https://www.freedesktop.org/software/systemd/man/org.freedesktop.systemd1.html>
82#[derive(
83    serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, FieldNamesAsArray, PartialEq,
84)]
85pub struct UnitStates {
86    pub active_state: SystemdUnitActiveState,
87    pub load_state: SystemdUnitLoadState,
88    // Unhealthy is only calculated for SystemdUnitLoadState::loaded units based on !SystemdActiveState::active
89    // and !SystemdUnitLoadState::masked
90    pub unhealthy: bool,
91    // Time in microseconds since the unit state has changed ...
92    // Expensive to lookup, so config disable available - Use optional to show that
93    pub time_in_state_usecs: Option<u64>,
94}
95
96// Declare state types
97// Reference: https://www.freedesktop.org/software/systemd/man/org.freedesktop.systemd1.html
98// SubState can be unit-type-specific so can't enum
99
100/// Possible systemd unit active states enumerated
101#[allow(non_camel_case_types)]
102#[derive(
103    Serialize_repr,
104    Deserialize_repr,
105    Clone,
106    Copy,
107    Debug,
108    Default,
109    Eq,
110    PartialEq,
111    EnumIter,
112    EnumString,
113    IntEnum,
114    strum_macros::Display,
115)]
116#[repr(u8)]
117pub enum SystemdUnitActiveState {
118    #[default]
119    unknown = 0,
120    active = 1,
121    reloading = 2,
122    inactive = 3,
123    failed = 4,
124    activating = 5,
125    deactivating = 6,
126}
127
128/// Possible systemd unit load states enumerated
129#[allow(non_camel_case_types)]
130#[derive(
131    Serialize_repr,
132    Deserialize_repr,
133    Clone,
134    Copy,
135    Debug,
136    Default,
137    Eq,
138    PartialEq,
139    EnumIter,
140    EnumString,
141    IntEnum,
142    strum_macros::Display,
143)]
144#[repr(u8)]
145pub enum SystemdUnitLoadState {
146    #[default]
147    unknown = 0,
148    loaded = 1,
149    error = 2,
150    masked = 3,
151    not_found = 4,
152}
153
154/// Representation of the returned Tuple from list_units - Better typing etc.
155#[derive(Debug)]
156pub struct ListedUnit {
157    pub name: String,                      // The primary unit name
158    pub description: String,               // The human readable description
159    pub load_state: String, // The load state (i.e. whether the unit file has been loaded successfully)
160    pub active_state: String, // The active state (i.e. whether the unit is currently started or not)
161    pub sub_state: String,    // The sub state (i.e. unit type more specific state)
162    pub follow_unit: String, // A unit that is being followed in its state by this unit, if there is any, otherwise the empty string
163    pub unit_object_path: OwnedObjectPath, // The unit object path
164    pub job_id: u32, // If there is a job queued for the job unit, the numeric job id, 0 otherwise
165    pub job_type: String, // The job type as string
166    pub job_object_path: OwnedObjectPath, // The job object path
167}
168impl
169    From<(
170        String,
171        String,
172        String,
173        String,
174        String,
175        String,
176        OwnedObjectPath,
177        u32,
178        String,
179        OwnedObjectPath,
180    )> for ListedUnit
181{
182    fn from(
183        tuple: (
184            String,
185            String,
186            String,
187            String,
188            String,
189            String,
190            OwnedObjectPath,
191            u32,
192            String,
193            OwnedObjectPath,
194        ),
195    ) -> Self {
196        ListedUnit {
197            name: tuple.0,
198            description: tuple.1,
199            load_state: tuple.2,
200            active_state: tuple.3,
201            sub_state: tuple.4,
202            follow_unit: tuple.5,
203            unit_object_path: tuple.6,
204            job_id: tuple.7,
205            job_type: tuple.8,
206            job_object_path: tuple.9,
207        }
208    }
209}
210
211pub const SERVICE_FIELD_NAMES: &[&str] = &ServiceStats::FIELD_NAMES_AS_ARRAY;
212pub const UNIT_FIELD_NAMES: &[&str] = &SystemdUnitStats::FIELD_NAMES_AS_ARRAY;
213pub const UNIT_STATES_FIELD_NAMES: &[&str] = &UnitStates::FIELD_NAMES_AS_ARRAY;
214
215/// Pull out selected systemd service statistics
216async fn parse_service(
217    connection: &zbus::Connection,
218    name: &str,
219    object_path: &OwnedObjectPath,
220) -> Result<ServiceStats> {
221    debug!("Parsing service {} stats", name);
222
223    let sp = Arc::new(
224        crate::dbus::zbus_service::ServiceProxy::builder(connection)
225            .path(object_path.clone())?
226            .build()
227            .await?,
228    );
229    let up = Arc::new(
230        crate::dbus::zbus_unit::UnitProxy::builder(connection)
231            .path(object_path.clone())?
232            .build()
233            .await?,
234    );
235
236    // TODO: Maybe introduce a semaphore to limit how many execute at once
237    let (
238        active_enter_timestamp,
239        active_exit_timestamp,
240        cpuusage_nsec,
241        inactive_exit_timestamp,
242        ioread_bytes,
243        ioread_operations,
244        memory_current,
245        memory_available,
246        nrestarts,
247        processes,
248        restart_usec,
249        state_change_timestamp,
250        status_errno,
251        tasks_current,
252        timeout_clean_usec,
253        watchdog_usec,
254    ) = tokio::join!(
255        tokio::spawn({
256            let spawn_up = up.clone();
257            async move { spawn_up.active_enter_timestamp().await }
258        }),
259        tokio::spawn({
260            let spawn_up = up.clone();
261            async move { spawn_up.active_exit_timestamp().await }
262        }),
263        tokio::spawn({
264            let spawn_sp = sp.clone();
265            async move { spawn_sp.cpuusage_nsec().await }
266        }),
267        tokio::spawn({
268            let spawn_up = up.clone();
269            async move { spawn_up.inactive_exit_timestamp().await }
270        }),
271        tokio::spawn({
272            let spawn_sp = sp.clone();
273            async move { spawn_sp.ioread_bytes().await }
274        }),
275        tokio::spawn({
276            let spawn_sp = sp.clone();
277            async move { spawn_sp.ioread_operations().await }
278        }),
279        tokio::spawn({
280            let spawn_sp = sp.clone();
281            async move { spawn_sp.memory_current().await }
282        }),
283        tokio::spawn({
284            let spawn_sp = sp.clone();
285            async move { spawn_sp.memory_available().await }
286        }),
287        tokio::spawn({
288            let spawn_sp = sp.clone();
289            async move { spawn_sp.nrestarts().await }
290        }),
291        tokio::spawn({
292            let spawn_sp = sp.clone();
293            async move { spawn_sp.get_processes().await }
294        }),
295        tokio::spawn({
296            let spawn_sp = sp.clone();
297            async move { spawn_sp.restart_usec().await }
298        }),
299        tokio::spawn({
300            let spawn_up = up.clone();
301            async move { spawn_up.state_change_timestamp().await }
302        }),
303        tokio::spawn({
304            let spawn_sp = sp.clone();
305            async move { spawn_sp.status_errno().await }
306        }),
307        tokio::spawn({
308            let spawn_sp = sp.clone();
309            async move { spawn_sp.tasks_current().await }
310        }),
311        tokio::spawn({
312            let spawn_sp = sp.clone();
313            async move { spawn_sp.timeout_clean_usec().await }
314        }),
315        tokio::spawn({
316            let spawn_sp = sp.clone();
317            async move { spawn_sp.watchdog_usec().await }
318        }),
319    );
320
321    Ok(ServiceStats {
322        active_enter_timestamp: active_enter_timestamp??,
323        active_exit_timestamp: active_exit_timestamp??,
324        cpuusage_nsec: cpuusage_nsec??,
325        inactive_exit_timestamp: inactive_exit_timestamp??,
326        ioread_bytes: ioread_bytes??,
327        ioread_operations: ioread_operations??,
328        memory_current: memory_current??,
329        memory_available: memory_available??,
330        nrestarts: nrestarts??,
331        processes: processes??.len().try_into()?,
332        restart_usec: restart_usec??,
333        state_change_timestamp: state_change_timestamp??,
334        status_errno: status_errno??,
335        tasks_current: tasks_current??,
336        timeout_clean_usec: timeout_clean_usec??,
337        watchdog_usec: watchdog_usec??,
338    })
339}
340
341/// Check if we're a loaded unit and if so evaluate if we're acitive or not
342/// If we're not
343/// Only potentially mark unhealthy for LOADED units that are not active
344pub fn is_unit_unhealthy(
345    active_state: SystemdUnitActiveState,
346    load_state: SystemdUnitLoadState,
347) -> bool {
348    match load_state {
349        // We're loaded so let's see if we're active or not
350        SystemdUnitLoadState::loaded => !matches!(active_state, SystemdUnitActiveState::active),
351        // An admin can change a unit to be masked on purpose
352        // so we are going to ignore all masked units due to that
353        SystemdUnitLoadState::masked => false,
354        // Otherwise, we're unhealthy
355        _ => true,
356    }
357}
358
359async fn get_time_in_state(
360    connection: Option<&zbus::Connection>,
361    unit: &ListedUnit,
362) -> Result<Option<u64>> {
363    match connection {
364        Some(c) => {
365            let up = crate::dbus::zbus_unit::UnitProxy::builder(c)
366                .path(ObjectPath::from(unit.unit_object_path.clone()))?
367                .build()
368                .await?;
369            let now: u64 = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs() * 1_000_000;
370            let state_change_timestamp = match up.state_change_timestamp().await {
371                Ok(sct) => sct,
372                Err(err) => {
373                    error!(
374                        "Unable to get state_change_timestamp for {} - Setting to 0: {:?}",
375                        &unit.name, err,
376                    );
377                    0
378                }
379            };
380            Ok(Some(now - state_change_timestamp))
381        }
382        None => {
383            error!("No zbus connection passed, but time_in_state_usecs enabled");
384            Ok(None)
385        }
386    }
387}
388
389/// Parse state of a unit into our unit_states hash
390pub async fn parse_state(
391    stats: &mut SystemdUnitStats,
392    unit: &ListedUnit,
393    config: &crate::config::UnitsConfig,
394    connection: Option<&zbus::Connection>,
395) -> Result<()> {
396    if config.state_stats_blocklist.contains(&unit.name) {
397        debug!("Skipping state stats for {} due to blocklist", &unit.name);
398        return Ok(());
399    }
400    if !config.state_stats_allowlist.is_empty()
401        && !config.state_stats_allowlist.contains(&unit.name)
402    {
403        return Ok(());
404    }
405    let active_state = SystemdUnitActiveState::from_str(&unit.active_state)
406        .unwrap_or(SystemdUnitActiveState::unknown);
407    let load_state = SystemdUnitLoadState::from_str(&unit.load_state.replace('-', "_"))
408        .unwrap_or(SystemdUnitLoadState::unknown);
409
410    // Get the state_change_timestamp to determine time in usecs we've been in current state
411    let mut time_in_state_usecs: Option<u64> = None;
412    if config.state_stats_time_in_state {
413        time_in_state_usecs = get_time_in_state(connection, unit).await?;
414    }
415
416    stats.unit_states.insert(
417        unit.name.clone(),
418        UnitStates {
419            active_state,
420            load_state,
421            unhealthy: is_unit_unhealthy(active_state, load_state),
422            time_in_state_usecs,
423        },
424    );
425    Ok(())
426}
427
428/// Parse a unit and add to overall counts of state, type etc.
429fn parse_unit(stats: &mut SystemdUnitStats, unit: &ListedUnit) {
430    // Count unit type
431    match unit.name.rsplit('.').next() {
432        Some("automount") => stats.automount_units += 1,
433        Some("device") => stats.device_units += 1,
434        Some("mount") => stats.mount_units += 1,
435        Some("path") => stats.path_units += 1,
436        Some("scope") => stats.scope_units += 1,
437        Some("service") => stats.service_units += 1,
438        Some("slice") => stats.slice_units += 1,
439        Some("socket") => stats.socket_units += 1,
440        Some("target") => stats.target_units += 1,
441        Some("timer") => stats.timer_units += 1,
442        unknown => debug!("Found unhandled '{:?}' unit type", unknown),
443    };
444    // Count load state
445    match unit.load_state.as_str() {
446        "loaded" => stats.loaded_units += 1,
447        "masked" => stats.masked_units += 1,
448        "not-found" => stats.not_found_units += 1,
449        _ => debug!("{} is not loaded. It's {}", unit.name, unit.load_state),
450    };
451    // Count unit status
452    match unit.active_state.as_str() {
453        "active" => stats.active_units += 1,
454        "failed" => stats.failed_units += 1,
455        "inactive" => stats.inactive_units += 1,
456        unknown => debug!("Found unhandled '{}' unit state", unknown),
457    };
458    // Count jobs queued
459    if unit.job_id != 0 {
460        stats.jobs_queued += 1;
461    }
462}
463
464/// Pull all units from dbus and count how system is setup and behaving
465pub async fn parse_unit_state(
466    config: &crate::config::Config,
467    connection: &zbus::Connection,
468) -> Result<SystemdUnitStats, Box<dyn std::error::Error + Send + Sync>> {
469    if !config.units.state_stats_allowlist.is_empty() {
470        debug!(
471            "Using unit state allowlist: {:?}",
472            config.units.state_stats_allowlist
473        );
474    }
475
476    if !config.units.state_stats_blocklist.is_empty() {
477        debug!(
478            "Using unit state blocklist: {:?}",
479            config.units.state_stats_blocklist,
480        );
481    }
482
483    let mut stats = SystemdUnitStats::default();
484    let p = crate::dbus::zbus_systemd::ManagerProxy::new(connection).await?;
485    let units = p.list_units().await?;
486
487    stats.total_units = units.len() as u64;
488    for unit_raw in units {
489        let unit: ListedUnit = unit_raw.into();
490        // Collect unit types + states counts
491        parse_unit(&mut stats, &unit);
492
493        // Collect per unit state stats - ActiveState + LoadState
494        // Not collecting SubState (yet)
495        if config.units.state_stats {
496            parse_state(&mut stats, &unit, &config.units, Some(connection)).await?;
497        }
498
499        // Collect service stats
500        if config.services.contains(&unit.name) {
501            debug!("Collecting service stats for {:?}", &unit);
502            match parse_service(connection, &unit.name, &unit.unit_object_path).await {
503                Ok(service_stats) => {
504                    stats.service_stats.insert(unit.name.clone(), service_stats);
505                }
506                Err(err) => error!(
507                    "Unable to get service stats for {} {}: {:#?}",
508                    &unit.name, &unit.unit_object_path, err
509                ),
510            }
511        }
512
513        // Collect timer stats
514        if config.timers.enabled && unit.name.contains(".timer") {
515            if config.timers.blocklist.contains(&unit.name) {
516                debug!("Skipping timer stats for {} due to blocklist", &unit.name);
517                continue;
518            }
519            if !config.timers.allowlist.is_empty() && !config.timers.allowlist.contains(&unit.name)
520            {
521                continue;
522            }
523            let timer_stats: Option<TimerStats> =
524                match crate::timer::collect_timer_stats(connection, &mut stats, &unit).await {
525                    Ok(ts) => Some(ts),
526                    Err(err) => {
527                        error!("Failed to get {} stats: {:#?}", &unit.name, err);
528                        None
529                    }
530                };
531            if let Some(ts) = timer_stats {
532                stats.timer_stats.insert(unit.name.clone(), ts);
533            }
534        }
535    }
536    debug!("unit stats: {:?}", stats);
537    Ok(stats)
538}
539
540/// Async wrapper than can update uni stats when passed a locked struct
541pub async fn update_unit_stats(
542    config: crate::config::Config,
543    connection: zbus::Connection,
544    locked_machine_stats: Arc<RwLock<MachineStats>>,
545) -> anyhow::Result<()> {
546    let mut machine_stats = locked_machine_stats.write().await;
547    match parse_unit_state(&config, &connection).await {
548        Ok(units_stats) => machine_stats.units = units_stats,
549        Err(err) => error!("units stats failed: {:?}", err),
550    }
551    Ok(())
552}
553
554#[cfg(test)]
555mod tests {
556    use super::*;
557    use strum::IntoEnumIterator;
558
559    fn get_unit_file() -> ListedUnit {
560        ListedUnit {
561            name: String::from("apport-autoreport.timer"),
562            description: String::from(
563                "Process error reports when automatic reporting is enabled (timer based)",
564            ),
565            load_state: String::from("loaded"),
566            active_state: String::from("inactive"),
567            sub_state: String::from("dead"),
568            follow_unit: String::from(""),
569            unit_object_path: ObjectPath::try_from(
570                "/org/freedesktop/systemd1/unit/apport_2dautoreport_2etimer",
571            )
572            .expect("Unable to make an object path")
573            .into(),
574            job_id: 0,
575            job_type: String::from(""),
576            job_object_path: ObjectPath::try_from("/").unwrap().into(),
577        }
578    }
579
580    #[test]
581    fn test_is_unit_healthy() {
582        // Obvious active/loaded is healthy
583        assert!(!is_unit_unhealthy(
584            SystemdUnitActiveState::active,
585            SystemdUnitLoadState::loaded
586        ));
587        // Not active + loaded is not healthy
588        assert!(is_unit_unhealthy(
589            SystemdUnitActiveState::activating,
590            SystemdUnitLoadState::loaded
591        ));
592        // Not loaded + anything is just marked healthy as we're not expecting it to ever be healthy
593        assert!(!is_unit_unhealthy(
594            SystemdUnitActiveState::activating,
595            SystemdUnitLoadState::masked
596        ));
597        // Make error + not_found unhealthy too
598        assert!(is_unit_unhealthy(
599            SystemdUnitActiveState::deactivating,
600            SystemdUnitLoadState::not_found
601        ));
602        assert!(is_unit_unhealthy(
603            // Can never really be active here with error, but check we ignore it
604            SystemdUnitActiveState::active,
605            SystemdUnitLoadState::error,
606        ));
607    }
608
609    #[tokio::test]
610    async fn test_state_parse() -> Result<()> {
611        let test_unit_name = String::from("apport-autoreport.timer");
612        let expected_stats = SystemdUnitStats {
613            active_units: 0,
614            automount_units: 0,
615            device_units: 0,
616            failed_units: 0,
617            inactive_units: 0,
618            jobs_queued: 0,
619            loaded_units: 0,
620            masked_units: 0,
621            mount_units: 0,
622            not_found_units: 0,
623            path_units: 0,
624            scope_units: 0,
625            service_units: 0,
626            slice_units: 0,
627            socket_units: 0,
628            target_units: 0,
629            timer_units: 0,
630            timer_persistent_units: 0,
631            timer_remain_after_elapse: 0,
632            total_units: 0,
633            service_stats: HashMap::new(),
634            timer_stats: HashMap::new(),
635            unit_states: HashMap::from([(
636                test_unit_name.clone(),
637                UnitStates {
638                    active_state: SystemdUnitActiveState::inactive,
639                    load_state: SystemdUnitLoadState::loaded,
640                    unhealthy: true,
641                    time_in_state_usecs: None,
642                },
643            )]),
644        };
645        let mut stats = SystemdUnitStats::default();
646        let systemd_unit = get_unit_file();
647        let mut config = crate::config::UnitsConfig::default();
648
649        // Test no allow list or blocklist
650        parse_state(&mut stats, &systemd_unit, &config, None).await?;
651        assert_eq!(expected_stats, stats);
652
653        // Create an allow list
654        config.state_stats_allowlist = Vec::from([test_unit_name.clone()]);
655
656        // test no blocklist and only allow list - Should equal the same as no lists above
657        let mut allowlist_stats = SystemdUnitStats::default();
658        parse_state(&mut allowlist_stats, &systemd_unit, &config, None).await?;
659        assert_eq!(expected_stats, allowlist_stats);
660
661        // Now add a blocklist
662        config.state_stats_blocklist = Vec::from([test_unit_name]);
663
664        // test blocklist with allow list (show it's preferred)
665        let mut blocklist_stats = SystemdUnitStats::default();
666        let expected_blocklist_stats = SystemdUnitStats::default();
667        parse_state(&mut blocklist_stats, &systemd_unit, &config, None).await?;
668        assert_eq!(expected_blocklist_stats, blocklist_stats);
669        Ok(())
670    }
671
672    #[test]
673    fn test_unit_parse() {
674        let expected_stats = SystemdUnitStats {
675            active_units: 0,
676            automount_units: 0,
677            device_units: 0,
678            failed_units: 0,
679            inactive_units: 1,
680            jobs_queued: 0,
681            loaded_units: 1,
682            masked_units: 0,
683            mount_units: 0,
684            not_found_units: 0,
685            path_units: 0,
686            scope_units: 0,
687            service_units: 0,
688            slice_units: 0,
689            socket_units: 0,
690            target_units: 0,
691            timer_units: 1,
692            timer_persistent_units: 0,
693            timer_remain_after_elapse: 0,
694            total_units: 0,
695            service_stats: HashMap::new(),
696            timer_stats: HashMap::new(),
697            unit_states: HashMap::new(),
698        };
699        let mut stats = SystemdUnitStats::default();
700        let systemd_unit = get_unit_file();
701        parse_unit(&mut stats, &systemd_unit);
702        assert_eq!(expected_stats, stats);
703    }
704
705    #[test]
706    fn test_iterators() {
707        assert!(SystemdUnitActiveState::iter().collect::<Vec<_>>().len() > 0);
708        assert!(SystemdUnitLoadState::iter().collect::<Vec<_>>().len() > 0);
709    }
710}