1use std::sync::Arc;
6
7use std::collections::HashMap;
8use std::time::Duration;
9use std::time::Instant;
10
11use tokio::sync::RwLock;
12use tracing::error;
13use tracing::info;
14use tracing::warn;
15
16pub mod config;
17pub(crate) mod dbus;
18pub mod dbus_stats;
19pub mod json;
20pub mod logging;
21pub mod machines;
22pub mod networkd;
23pub mod pid1;
24pub mod system;
25pub mod timer;
26pub mod units;
27
28pub const DEFAULT_DBUS_ADDRESS: &str = "unix:path=/run/dbus/system_bus_socket";
29
30#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
31pub struct MachineStats {
32 pub networkd: networkd::NetworkdState,
33 pub pid1: Option<pid1::Pid1Stats>,
34 pub system_state: system::SystemdSystemState,
35 pub units: units::SystemdUnitStats,
36 pub version: system::SystemdVersion,
37 pub dbus_stats: Option<dbus_stats::DBusStats>,
38}
39
40#[derive(serde::Serialize, serde::Deserialize, Debug, Default, Eq, PartialEq)]
42pub struct MonitordStats {
43 pub networkd: networkd::NetworkdState,
44 pub pid1: Option<pid1::Pid1Stats>,
45 pub system_state: system::SystemdSystemState,
46 pub units: units::SystemdUnitStats,
47 pub version: system::SystemdVersion,
48 pub dbus_stats: Option<dbus_stats::DBusStats>,
49 pub machines: HashMap<String, MachineStats>,
50}
51
52pub fn print_stats(
54 key_prefix: &str,
55 output_format: &config::MonitordOutputFormat,
56 stats: &MonitordStats,
57) {
58 match output_format {
59 config::MonitordOutputFormat::Json => println!(
60 "{}",
61 serde_json::to_string(&stats).expect("Invalid JSON serialization")
62 ),
63 config::MonitordOutputFormat::JsonFlat => println!(
64 "{}",
65 json::flatten(stats, &key_prefix.to_string()).expect("Invalid JSON serialization")
66 ),
67 config::MonitordOutputFormat::JsonPretty => println!(
68 "{}",
69 serde_json::to_string_pretty(&stats).expect("Invalid JSON serialization")
70 ),
71 }
72}
73
74pub async fn stat_collector(
77 config: config::Config,
78 maybe_locked_stats: Option<Arc<RwLock<MonitordStats>>>,
79 output_stats: bool,
80) -> anyhow::Result<()> {
81 let mut collect_interval_ms: u128 = 0;
82 if config.monitord.daemon {
83 collect_interval_ms = (config.monitord.daemon_stats_refresh_secs * 1000).into();
84 }
85
86 let locked_monitord_stats: Arc<RwLock<MonitordStats>> =
87 maybe_locked_stats.unwrap_or(Arc::new(RwLock::new(MonitordStats::default())));
88 let locked_machine_stats: Arc<RwLock<MachineStats>> =
89 Arc::new(RwLock::new(MachineStats::default()));
90 std::env::set_var("DBUS_SYSTEM_BUS_ADDRESS", &config.monitord.dbus_address);
91 let sdc = zbus::connection::Builder::system()?
92 .method_timeout(std::time::Duration::from_secs(config.monitord.dbus_timeout))
93 .build()
94 .await?;
95 let mut join_set = tokio::task::JoinSet::new();
96
97 loop {
98 let collect_start_time = Instant::now();
99 info!("Starting stat collection run");
100
101 join_set.spawn(crate::system::update_version(
104 sdc.clone(),
105 locked_machine_stats.clone(),
106 ));
107
108 if config.pid1.enabled {
110 join_set.spawn(crate::pid1::update_pid1_stats(
111 1,
112 locked_machine_stats.clone(),
113 ));
114 }
115
116 if config.networkd.enabled {
118 let config_clone = config.clone();
119 join_set.spawn(crate::networkd::update_networkd_stats(
120 config_clone.networkd.link_state_dir.clone(),
121 None,
122 sdc.clone(),
123 locked_machine_stats.clone(),
124 ));
125 }
126
127 if config.system_state.enabled {
129 join_set.spawn(crate::system::update_system_stats(
130 sdc.clone(),
131 locked_machine_stats.clone(),
132 ));
133 }
134
135 if config.units.enabled {
137 join_set.spawn(crate::units::update_unit_stats(
138 config.clone(),
139 sdc.clone(),
140 locked_machine_stats.clone(),
141 ));
142 }
143
144 if config.machines.enabled {
145 join_set.spawn(crate::machines::update_machines_stats(
146 config.clone(),
147 sdc.clone(),
148 locked_monitord_stats.clone(),
149 ));
150 }
151
152 if config.dbus_stats.enabled {
153 join_set.spawn(crate::dbus_stats::update_dbus_stats(
154 sdc.clone(),
155 locked_machine_stats.clone(),
156 ));
157 }
158
159 if join_set.len() == 1 {
160 warn!("No collectors execpt systemd version scheduled to run. Exiting");
161 }
162
163 while let Some(res) = join_set.join_next().await {
165 match res {
166 Ok(r) => match r {
167 Ok(_) => (),
168 Err(e) => {
169 error!("Collection specific failure: {:?}", e);
170 }
171 },
172 Err(e) => {
173 error!("Join error: {:?}", e);
174 }
175 }
176 }
177
178 {
179 let mut monitord_stats = locked_monitord_stats.write().await;
181 let machine_stats = locked_machine_stats.read().await;
182 monitord_stats.pid1 = machine_stats.pid1.clone();
183 monitord_stats.networkd = machine_stats.networkd.clone();
184 monitord_stats.system_state = machine_stats.system_state;
185 monitord_stats.version = machine_stats.version.clone();
186 monitord_stats.units = machine_stats.units.clone();
187 monitord_stats.dbus_stats = machine_stats.dbus_stats.clone();
188 }
189
190 let elapsed_runtime_ms = collect_start_time.elapsed().as_millis();
191
192 info!("stat collection run took {}ms", elapsed_runtime_ms);
193 if output_stats {
194 let monitord_stats = locked_monitord_stats.read().await;
195 print_stats(
196 &config.monitord.key_prefix,
197 &config.monitord.output_format,
198 &monitord_stats,
199 );
200 }
201 if !config.monitord.daemon {
202 break;
203 }
204 let sleep_time_ms = collect_interval_ms - elapsed_runtime_ms;
205 info!("stat collection sleeping for {}s 😴", sleep_time_ms / 1000);
206 tokio::time::sleep(Duration::from_millis(
207 sleep_time_ms
208 .try_into()
209 .expect("Sleep time does not fit into a u64 :O"),
210 ))
211 .await;
212 }
213 Ok(())
214}