1use std::collections::HashMap;
6use std::fs;
7use std::io;
8use std::sync::Arc;
9
10use anyhow::Result;
11use tokio::sync::RwLock;
12use tracing::error;
13use users::get_user_by_uid;
14use zbus::fdo::{DBusProxy, StatsProxy};
15use zbus::names::BusName;
16use zvariant::{Dict, OwnedValue, Value};
17
18use crate::MachineStats;
19
20#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
25pub struct DBusBrokerPeerAccounting {
26 pub id: String,
27 pub well_known_name: Option<String>,
28
29 pub unix_user_id: Option<u32>,
31 pub process_id: Option<u32>,
32 pub unix_group_ids: Option<Vec<u32>>,
33 pub name_objects: Option<u32>,
38 pub match_bytes: Option<u32>,
39 pub matches: Option<u32>,
40 pub reply_objects: Option<u32>,
41 pub incoming_bytes: Option<u32>,
42 pub incoming_fds: Option<u32>,
43 pub outgoing_bytes: Option<u32>,
44 pub outgoing_fds: Option<u32>,
45 pub activation_request_bytes: Option<u32>,
46 pub activation_request_fds: Option<u32>,
47}
48
49impl DBusBrokerPeerAccounting {
50 pub fn get_cgroup_name(&self) -> Result<String, io::Error> {
51 let pid = self
52 .process_id
53 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "missing process_id"))?;
54
55 let path = format!("/proc/{}/cgroup", pid);
56 let content = fs::read_to_string(&path)?;
57
58 let cgroup = content.strip_prefix("0::").ok_or_else(|| {
60 io::Error::new(io::ErrorKind::InvalidData, "unexpected cgroup format")
61 })?;
62
63 Ok(cgroup.trim().trim_matches('/').replace('/', "-"))
64 }
65}
66
67#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
71pub struct DBusBrokerCGroupAccounting {
72 pub name: String,
73
74 pub name_objects: Option<u32>,
76 pub match_bytes: Option<u32>,
77 pub matches: Option<u32>,
78 pub reply_objects: Option<u32>,
79 pub incoming_bytes: Option<u32>,
80 pub incoming_fds: Option<u32>,
81 pub outgoing_bytes: Option<u32>,
82 pub outgoing_fds: Option<u32>,
83 pub activation_request_bytes: Option<u32>,
84 pub activation_request_fds: Option<u32>,
85}
86
87impl DBusBrokerCGroupAccounting {
88 pub fn combine_with_peer(&mut self, peer: &DBusBrokerPeerAccounting) {
89 fn sum(a: &mut Option<u32>, b: &Option<u32>) {
90 *a = match (a.take(), b) {
91 (Some(x), Some(y)) => Some(x + y),
92 (Some(x), None) => Some(x),
93 (None, Some(y)) => Some(*y),
94 (None, None) => None,
95 };
96 }
97
98 sum(&mut self.name_objects, &peer.name_objects);
99 sum(&mut self.match_bytes, &peer.match_bytes);
100 sum(&mut self.matches, &peer.matches);
101 sum(&mut self.reply_objects, &peer.reply_objects);
102 sum(&mut self.incoming_bytes, &peer.incoming_bytes);
103 sum(&mut self.incoming_fds, &peer.incoming_fds);
104 sum(&mut self.outgoing_bytes, &peer.outgoing_bytes);
105 sum(&mut self.outgoing_fds, &peer.outgoing_fds);
106 sum(
107 &mut self.activation_request_bytes,
108 &peer.activation_request_bytes,
109 );
110 sum(
111 &mut self.activation_request_fds,
112 &peer.activation_request_fds,
113 );
114 }
115}
116
117#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
118pub struct CurMaxPair {
119 pub cur: u32,
121 pub max: u32,
122}
123
124impl CurMaxPair {
125 pub fn get_usage(&self) -> u32 {
126 self.max - self.cur
129 }
130}
131
132#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
133pub struct DBusBrokerUserAccounting {
134 pub uid: u32,
135
136 pub bytes: Option<CurMaxPair>,
138 pub fds: Option<CurMaxPair>,
139 pub matches: Option<CurMaxPair>,
140 pub objects: Option<CurMaxPair>,
141 }
144
145impl DBusBrokerUserAccounting {
146 fn new(uid: u32) -> Self {
147 Self {
148 uid,
149 ..Default::default()
150 }
151 }
152
153 pub fn get_name_for_metric(&self) -> String {
154 match get_user_by_uid(self.uid) {
155 Some(user) => user.name().to_string_lossy().into_owned(),
156 None => self.uid.to_string(),
157 }
158 }
159}
160
161#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
162pub struct DBusStats {
163 pub serial: Option<u32>,
164 pub active_connections: Option<u32>,
165 pub incomplete_connections: Option<u32>,
166 pub bus_names: Option<u32>,
167 pub peak_bus_names: Option<u32>,
168 pub peak_bus_names_per_connection: Option<u32>,
169 pub match_rules: Option<u32>,
170 pub peak_match_rules: Option<u32>,
171 pub peak_match_rules_per_connection: Option<u32>,
172
173 pub dbus_broker_peer_accounting: Option<HashMap<String, DBusBrokerPeerAccounting>>,
175 pub dbus_broker_user_accounting: Option<HashMap<u32, DBusBrokerUserAccounting>>,
176
177 pub peer_stats: bool,
179 pub cgroup_stats: bool,
180}
181
182impl DBusStats {
183 pub fn peer_accounting(&self) -> Option<&HashMap<String, DBusBrokerPeerAccounting>> {
184 match self.peer_stats {
185 true => self.dbus_broker_peer_accounting.as_ref(),
186 false => None,
187 }
188 }
189
190 pub fn cgroup_accounting(&self) -> Option<HashMap<String, DBusBrokerCGroupAccounting>> {
191 if !self.cgroup_stats {
192 return None;
193 }
194
195 let peer_accounting = self.dbus_broker_peer_accounting.as_ref()?;
196 let mut result: HashMap<String, DBusBrokerCGroupAccounting> = HashMap::new();
197
198 for peer in peer_accounting.values() {
199 let cgroup_name = match peer.get_cgroup_name() {
200 Ok(name) => name,
201 Err(err) => {
202 error!("Failed to get cgroup name for peer {}: {}", peer.id, err);
203 continue;
204 }
205 };
206
207 let entry =
208 result
209 .entry(cgroup_name.clone())
210 .or_insert_with(|| DBusBrokerCGroupAccounting {
211 name: cgroup_name,
212 ..Default::default()
213 });
214
215 entry.combine_with_peer(peer);
216 }
217
218 Some(result)
219 }
220
221 pub fn user_accounting(&self) -> Option<&HashMap<u32, DBusBrokerUserAccounting>> {
222 self.dbus_broker_user_accounting.as_ref()
223 }
224}
225
226fn get_u32(dict: &Dict, key: &str) -> Option<u32> {
227 let value_key: Value = key.into();
228 dict.get(&value_key).ok().and_then(|v| match v.flatten() {
229 Some(Value::U32(val)) => Some(*val),
230 _ => None,
231 })
232}
233
234fn get_u32_vec(dict: &Dict, key: &str) -> Option<Vec<u32>> {
235 let value_key: Value = key.into();
236 dict.get(&value_key).ok().and_then(|v| match v.flatten() {
237 Some(Value::Array(array)) => {
238 let vec: Vec<u32> = array
239 .iter()
240 .filter_map(|item| {
241 if let Value::U32(num) = item {
242 Some(*num)
243 } else {
244 None
245 }
246 })
247 .collect();
248
249 Some(vec)
250 }
251 _ => None,
252 })
253}
254
255fn parse_peer_struct(
277 peer_value: &Value,
278 well_known_to_peer_names: &HashMap<String, String>,
279) -> Option<DBusBrokerPeerAccounting> {
280 let peer_struct = match peer_value {
281 Value::Structure(peer_struct) => peer_struct,
282 _ => return None,
283 };
284
285 match peer_struct.fields() {
286 [Value::Str(id), Value::Dict(credentials), Value::Dict(stats), ..] => {
287 Some(DBusBrokerPeerAccounting {
288 id: id.to_string(),
289 well_known_name: well_known_to_peer_names.get(id.as_str()).cloned(),
290 unix_user_id: get_u32(credentials, "UnixUserID"),
291 process_id: get_u32(credentials, "ProcessID"),
292 unix_group_ids: get_u32_vec(credentials, "UnixGroupIDs"),
293 name_objects: get_u32(stats, "NameObjects"),
294 match_bytes: get_u32(stats, "MatchBytes"),
295 matches: get_u32(stats, "Matches"),
296 reply_objects: get_u32(stats, "ReplyObjects"),
297 incoming_bytes: get_u32(stats, "IncomingBytes"),
298 incoming_fds: get_u32(stats, "IncomingFds"),
299 outgoing_bytes: get_u32(stats, "OutgoingBytes"),
300 outgoing_fds: get_u32(stats, "OutgoingFds"),
301 activation_request_bytes: get_u32(stats, "ActivationRequestBytes"),
302 activation_request_fds: get_u32(stats, "ActivationRequestFds"),
303 })
304 }
305 _ => None,
306 }
307}
308
309fn parse_peer_accounting(
310 config: &crate::config::Config,
311 owned_value: &OwnedValue,
312 well_known_to_peer_names: &HashMap<String, String>,
313) -> Option<HashMap<String, DBusBrokerPeerAccounting>> {
314 if !config.dbus_stats.peer_stats && !config.dbus_stats.cgroup_stats {
317 return None;
318 }
319
320 let value: &Value = owned_value;
321 let peers_value = match value {
322 Value::Array(peers_value) => peers_value,
323 _ => return None,
324 };
325
326 let result = peers_value
327 .iter()
328 .filter_map(|peer| parse_peer_struct(peer, well_known_to_peer_names))
329 .map(|peer| (peer.id.clone(), peer))
330 .collect();
331
332 Some(result)
333}
334
335fn parse_user_struct(user_value: &Value) -> Option<DBusBrokerUserAccounting> {
366 let user_struct = match user_value {
367 Value::Structure(user_struct) => user_struct,
368 _ => return None,
369 };
370
371 match user_struct.fields() {
372 [Value::U32(uid), Value::Array(user_stats), ..] => {
373 let mut user = DBusBrokerUserAccounting::new(*uid);
374 for user_stat in user_stats.iter() {
375 if let Value::Structure(user_stat) = user_stat {
376 if let [Value::Str(name), Value::U32(cur), Value::U32(max), ..] =
377 user_stat.fields()
378 {
379 let pair = CurMaxPair {
380 cur: *cur,
381 max: *max,
382 };
383 match name.as_str() {
384 "Bytes" => user.bytes = Some(pair),
385 "Fds" => user.fds = Some(pair),
386 "Matches" => user.matches = Some(pair),
387 "Objects" => user.objects = Some(pair),
388 _ => {} }
390 }
391 }
392 }
393
394 Some(user)
395 }
396 _ => None,
397 }
398}
399
400fn parse_user_accounting(
401 config: &crate::config::Config,
402 owned_value: &OwnedValue,
403) -> Option<HashMap<u32, DBusBrokerUserAccounting>> {
404 if !config.dbus_stats.user_stats {
406 return None;
407 }
408
409 let value: &Value = owned_value;
410 let users_value = match value {
411 Value::Array(users_value) => users_value,
412 _ => return None,
413 };
414
415 let result = users_value
416 .iter()
417 .filter_map(parse_user_struct)
418 .map(|user| (user.uid, user))
419 .collect();
420
421 Some(result)
422}
423
424async fn get_well_known_to_peer_names(
425 dbus_proxy: &DBusProxy<'_>,
426) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync>> {
427 let dbus_names = dbus_proxy.list_names().await?;
428 let mut result = HashMap::new();
429
430 for owned_busname in dbus_names.iter() {
431 let name: &BusName = owned_busname;
432 if let BusName::WellKnown(_) = name {
433 let owner = dbus_proxy.get_name_owner(name.clone()).await?;
435 result.insert(owner.to_string(), name.to_string());
436 }
437 }
438
439 Ok(result)
440}
441
442pub async fn parse_dbus_stats(
444 config: &crate::config::Config,
445 connection: &zbus::Connection,
446) -> Result<DBusStats, Box<dyn std::error::Error + Send + Sync>> {
447 let dbus_proxy = DBusProxy::new(connection).await?;
448 let well_known_to_peer_names = get_well_known_to_peer_names(&dbus_proxy).await?;
449
450 let stats_proxy = StatsProxy::new(connection).await?;
451 let stats = stats_proxy.get_stats().await?;
452
453 let dbus_stats = DBusStats {
454 serial: stats.serial(),
455 active_connections: stats.active_connections(),
456 incomplete_connections: stats.incomplete_connections(),
457 bus_names: stats.bus_names(),
458 peak_bus_names: stats.peak_bus_names(),
459 peak_bus_names_per_connection: stats.peak_bus_names_per_connection(),
460 match_rules: stats.match_rules(),
461 peak_match_rules: stats.peak_match_rules(),
462 peak_match_rules_per_connection: stats.peak_match_rules_per_connection(),
463
464 dbus_broker_peer_accounting: stats
466 .rest()
467 .get("org.bus1.DBus.Debug.Stats.PeerAccounting")
468 .map(|peer| parse_peer_accounting(config, peer, &well_known_to_peer_names))
469 .unwrap_or_default(),
470 dbus_broker_user_accounting: stats
471 .rest()
472 .get("org.bus1.DBus.Debug.Stats.UserAccounting")
473 .map(|user| parse_user_accounting(config, user))
474 .unwrap_or_default(),
475
476 peer_stats: config.dbus_stats.peer_stats,
478 cgroup_stats: config.dbus_stats.cgroup_stats,
479 };
480
481 Ok(dbus_stats)
482}
483
484pub async fn update_dbus_stats(
486 config: crate::config::Config,
487 connection: zbus::Connection,
488 locked_machine_stats: Arc<RwLock<MachineStats>>,
489) -> anyhow::Result<()> {
490 match parse_dbus_stats(&config, &connection).await {
491 Ok(dbus_stats) => {
492 let mut machine_stats = locked_machine_stats.write().await;
493 machine_stats.dbus_stats = Some(dbus_stats)
494 }
495 Err(err) => error!("dbus stats failed: {:?}", err),
496 }
497 Ok(())
498}