1use std::collections::HashMap;
6use std::fs;
7use std::io;
8use std::sync::Arc;
9
10use thiserror::Error;
11use tokio::sync::RwLock;
12use tracing::error;
13use uzers::get_user_by_uid;
14use zbus::fdo::{DBusProxy, StatsProxy};
15use zbus::names::BusName;
16use zvariant::{Dict, OwnedValue, Value};
17
18use crate::MachineStats;
19
20#[derive(Error, Debug)]
21pub enum MonitordDbusStatsError {
22 #[error("D-Bus error: {0}")]
23 ZbusError(#[from] zbus::Error),
24 #[error("D-Bus fdo error: {0}")]
25 FdoError(#[from] zbus::fdo::Error),
26}
27
28#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
35pub struct DBusBrokerPeerAccounting {
36 pub id: String,
38 pub well_known_name: Option<String>,
40
41 pub unix_user_id: Option<u32>,
44 pub process_id: Option<u32>,
46 pub unix_group_ids: Option<Vec<u32>>,
48 pub name_objects: Option<u32>,
54 pub match_bytes: Option<u32>,
56 pub matches: Option<u32>,
58 pub reply_objects: Option<u32>,
60 pub incoming_bytes: Option<u32>,
62 pub incoming_fds: Option<u32>,
64 pub outgoing_bytes: Option<u32>,
66 pub outgoing_fds: Option<u32>,
68 pub activation_request_bytes: Option<u32>,
70 pub activation_request_fds: Option<u32>,
72}
73
74impl DBusBrokerPeerAccounting {
75 pub fn has_well_known_name(&self) -> bool {
77 self.well_known_name.is_some()
78 }
79
80 pub fn get_name(&self) -> &str {
82 self.well_known_name.as_deref().unwrap_or(&self.id)
83 }
84
85 pub fn get_cgroup_name(&self) -> Result<String, io::Error> {
86 let pid = self
87 .process_id
88 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "missing process_id"))?;
89
90 let path = format!("/proc/{}/cgroup", pid);
91 let content = fs::read_to_string(&path)?;
92
93 let cgroup = content.strip_prefix("0::").ok_or_else(|| {
95 io::Error::new(io::ErrorKind::InvalidData, "unexpected cgroup format")
96 })?;
97
98 Ok(cgroup.trim().trim_matches('/').replace('/', "-"))
99 }
100}
101
102#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
106pub struct DBusBrokerCGroupAccounting {
107 pub name: String,
109
110 pub name_objects: Option<u32>,
113 pub match_bytes: Option<u32>,
115 pub matches: Option<u32>,
117 pub reply_objects: Option<u32>,
119 pub incoming_bytes: Option<u32>,
121 pub incoming_fds: Option<u32>,
123 pub outgoing_bytes: Option<u32>,
125 pub outgoing_fds: Option<u32>,
127 pub activation_request_bytes: Option<u32>,
129 pub activation_request_fds: Option<u32>,
131}
132
133impl DBusBrokerCGroupAccounting {
134 pub fn combine_with_peer(&mut self, peer: &DBusBrokerPeerAccounting) {
135 fn sum(a: &mut Option<u32>, b: &Option<u32>) {
136 *a = match (a.take(), b) {
137 (Some(x), Some(y)) => Some(x + y),
138 (Some(x), None) => Some(x),
139 (None, Some(y)) => Some(*y),
140 (None, None) => None,
141 };
142 }
143
144 sum(&mut self.name_objects, &peer.name_objects);
145 sum(&mut self.match_bytes, &peer.match_bytes);
146 sum(&mut self.matches, &peer.matches);
147 sum(&mut self.reply_objects, &peer.reply_objects);
148 sum(&mut self.incoming_bytes, &peer.incoming_bytes);
149 sum(&mut self.incoming_fds, &peer.incoming_fds);
150 sum(&mut self.outgoing_bytes, &peer.outgoing_bytes);
151 sum(&mut self.outgoing_fds, &peer.outgoing_fds);
152 sum(
153 &mut self.activation_request_bytes,
154 &peer.activation_request_bytes,
155 );
156 sum(
157 &mut self.activation_request_fds,
158 &peer.activation_request_fds,
159 );
160 }
161}
162
163#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
166pub struct CurMaxPair {
167 pub cur: u32,
169 pub max: u32,
171}
172
173impl CurMaxPair {
174 pub fn get_usage(&self) -> u32 {
175 self.max - self.cur
178 }
179}
180
181#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
184pub struct DBusBrokerUserAccounting {
185 pub uid: u32,
187 pub username: String,
189
190 pub bytes: Option<CurMaxPair>,
192 pub fds: Option<CurMaxPair>,
194 pub matches: Option<CurMaxPair>,
196 pub objects: Option<CurMaxPair>,
198 }
201
202impl DBusBrokerUserAccounting {
203 fn new(uid: u32) -> Self {
204 let username = match get_user_by_uid(uid) {
205 Some(user) => user.name().to_string_lossy().into_owned(),
206 None => uid.to_string(),
207 };
208
209 Self {
210 uid,
211 username,
212 ..Default::default()
213 }
214 }
215}
216
217#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
220pub struct DBusStats {
221 pub serial: Option<u32>,
223 pub active_connections: Option<u32>,
225 pub incomplete_connections: Option<u32>,
227 pub bus_names: Option<u32>,
229 pub peak_bus_names: Option<u32>,
231 pub peak_bus_names_per_connection: Option<u32>,
233 pub match_rules: Option<u32>,
235 pub peak_match_rules: Option<u32>,
237 pub peak_match_rules_per_connection: Option<u32>,
239
240 pub dbus_broker_peer_accounting: Option<HashMap<String, DBusBrokerPeerAccounting>>,
242 pub dbus_broker_cgroup_accounting: Option<HashMap<String, DBusBrokerCGroupAccounting>>,
244 pub dbus_broker_user_accounting: Option<HashMap<u32, DBusBrokerUserAccounting>>,
246}
247
248impl DBusStats {
249 pub fn peer_accounting(&self) -> Option<&HashMap<String, DBusBrokerPeerAccounting>> {
250 self.dbus_broker_peer_accounting.as_ref()
251 }
252
253 pub fn cgroup_accounting(&self) -> Option<&HashMap<String, DBusBrokerCGroupAccounting>> {
254 self.dbus_broker_cgroup_accounting.as_ref()
255 }
256
257 pub fn user_accounting(&self) -> Option<&HashMap<u32, DBusBrokerUserAccounting>> {
258 self.dbus_broker_user_accounting.as_ref()
259 }
260}
261
262fn get_u32(dict: &Dict, key: &str) -> Option<u32> {
263 let value_key: Value = key.into();
264 dict.get(&value_key).ok().and_then(|v| match v.flatten() {
265 Some(Value::U32(val)) => Some(*val),
266 _ => None,
267 })
268}
269
270fn get_u32_vec(dict: &Dict, key: &str) -> Option<Vec<u32>> {
271 let value_key: Value = key.into();
272 dict.get(&value_key).ok().and_then(|v| match v.flatten() {
273 Some(Value::Array(array)) => {
274 let vec: Vec<u32> = array
275 .iter()
276 .filter_map(|item| {
277 if let Value::U32(num) = item {
278 Some(*num)
279 } else {
280 None
281 }
282 })
283 .collect();
284
285 Some(vec)
286 }
287 _ => None,
288 })
289}
290
291fn parse_peer_struct(
313 peer_value: &Value,
314 well_known_to_peer_names: &HashMap<String, String>,
315) -> Option<DBusBrokerPeerAccounting> {
316 let peer_struct = match peer_value {
317 Value::Structure(peer_struct) => peer_struct,
318 _ => return None,
319 };
320
321 match peer_struct.fields() {
322 [Value::Str(id), Value::Dict(credentials), Value::Dict(stats), ..] => {
323 Some(DBusBrokerPeerAccounting {
324 id: id.to_string(),
325 well_known_name: well_known_to_peer_names.get(id.as_str()).cloned(),
326 unix_user_id: get_u32(credentials, "UnixUserID"),
327 process_id: get_u32(credentials, "ProcessID"),
328 unix_group_ids: get_u32_vec(credentials, "UnixGroupIDs"),
329 name_objects: get_u32(stats, "NameObjects"),
330 match_bytes: get_u32(stats, "MatchBytes"),
331 matches: get_u32(stats, "Matches"),
332 reply_objects: get_u32(stats, "ReplyObjects"),
333 incoming_bytes: get_u32(stats, "IncomingBytes"),
334 incoming_fds: get_u32(stats, "IncomingFds"),
335 outgoing_bytes: get_u32(stats, "OutgoingBytes"),
336 outgoing_fds: get_u32(stats, "OutgoingFds"),
337 activation_request_bytes: get_u32(stats, "ActivationRequestBytes"),
338 activation_request_fds: get_u32(stats, "ActivationRequestFds"),
339 })
340 }
341 _ => None,
342 }
343}
344
345async fn parse_peer_accounting(
346 dbus_proxy: &DBusProxy<'_>,
347 config: &crate::config::Config,
348 owned_value: Option<&OwnedValue>,
349) -> Result<Option<Vec<DBusBrokerPeerAccounting>>, MonitordDbusStatsError> {
350 if !config.dbus_stats.peer_stats && !config.dbus_stats.cgroup_stats {
353 return Ok(None);
354 }
355
356 let value: &Value = match owned_value {
357 Some(v) => v,
358 None => return Ok(None),
359 };
360
361 let peers_value = match value {
362 Value::Array(peers_value) => peers_value,
363 _ => return Ok(None),
364 };
365
366 let well_known_to_peer_names = get_well_known_to_peer_names(dbus_proxy).await?;
367
368 let result = peers_value
369 .iter()
370 .filter_map(|peer| parse_peer_struct(peer, &well_known_to_peer_names))
371 .collect();
372
373 Ok(Some(result))
374}
375
376fn filter_and_collect_peer_accounting(
377 config: &crate::config::Config,
378 peers: Option<&Vec<DBusBrokerPeerAccounting>>,
379) -> Option<HashMap<String, DBusBrokerPeerAccounting>> {
380 if !config.dbus_stats.peer_stats {
382 return None;
383 }
384
385 let result = peers?
386 .iter()
387 .filter(|peer| {
388 if config.dbus_stats.peer_well_known_names_only && !peer.has_well_known_name() {
389 return false;
390 }
391
392 let id = peer.id.as_str();
393 let name = peer.get_name();
394 if config.dbus_stats.peer_blocklist.contains(id)
395 || config.dbus_stats.peer_blocklist.contains(name)
396 {
397 return false;
398 }
399
400 if !config.dbus_stats.peer_allowlist.is_empty()
401 && !config.dbus_stats.peer_allowlist.contains(id)
402 && !config.dbus_stats.peer_allowlist.contains(name)
403 {
404 return false;
405 }
406
407 true
408 })
409 .map(|peer| (peer.id.clone(), peer.clone()))
410 .collect();
411
412 Some(result)
413}
414
415fn filter_and_collect_cgroup_accounting(
416 config: &crate::config::Config,
417 peers: Option<&Vec<DBusBrokerPeerAccounting>>,
418) -> Option<HashMap<String, DBusBrokerCGroupAccounting>> {
419 if !config.dbus_stats.cgroup_stats {
421 return None;
422 }
423
424 let mut result: HashMap<String, DBusBrokerCGroupAccounting> = HashMap::new();
425
426 for peer in peers?.iter() {
427 let cgroup_name = match peer.get_cgroup_name() {
428 Ok(name) => name,
429 Err(err) => {
430 error!("Failed to get cgroup name for peer {}: {}", peer.id, err);
431 continue;
432 }
433 };
434
435 if config.dbus_stats.cgroup_blocklist.contains(&cgroup_name) {
436 continue;
437 }
438
439 if !config.dbus_stats.cgroup_allowlist.is_empty()
440 && !config.dbus_stats.cgroup_allowlist.contains(&cgroup_name)
441 {
442 continue;
443 }
444
445 let entry =
446 result
447 .entry(cgroup_name.clone())
448 .or_insert_with(|| DBusBrokerCGroupAccounting {
449 name: cgroup_name,
450 ..Default::default()
451 });
452
453 entry.combine_with_peer(peer);
454 }
455
456 Some(result)
457}
458
459fn parse_user_struct(user_value: &Value) -> Option<DBusBrokerUserAccounting> {
490 let user_struct = match user_value {
491 Value::Structure(user_struct) => user_struct,
492 _ => return None,
493 };
494
495 match user_struct.fields() {
496 [Value::U32(uid), Value::Array(user_stats), ..] => {
497 let mut user = DBusBrokerUserAccounting::new(*uid);
498 for user_stat in user_stats.iter() {
499 if let Value::Structure(user_stat) = user_stat {
500 if let [Value::Str(name), Value::U32(cur), Value::U32(max), ..] =
501 user_stat.fields()
502 {
503 let pair = CurMaxPair {
504 cur: *cur,
505 max: *max,
506 };
507 match name.as_str() {
508 "Bytes" => user.bytes = Some(pair),
509 "Fds" => user.fds = Some(pair),
510 "Matches" => user.matches = Some(pair),
511 "Objects" => user.objects = Some(pair),
512 _ => {} }
514 }
515 }
516 }
517
518 Some(user)
519 }
520 _ => None,
521 }
522}
523
524fn parse_user_accounting(
525 config: &crate::config::Config,
526 owned_value: &OwnedValue,
527) -> Option<HashMap<u32, DBusBrokerUserAccounting>> {
528 if !config.dbus_stats.user_stats {
530 return None;
531 }
532
533 let value: &Value = owned_value;
534 let users_value = match value {
535 Value::Array(users_value) => users_value,
536 _ => return None,
537 };
538
539 let result = users_value
540 .iter()
541 .filter_map(parse_user_struct)
542 .filter(|user| {
543 let uid = user.uid.to_string();
544 if config.dbus_stats.user_blocklist.contains(&uid)
545 || config.dbus_stats.user_blocklist.contains(&user.username)
546 {
547 return false;
548 }
549
550 if !config.dbus_stats.user_allowlist.is_empty()
551 && !config.dbus_stats.user_allowlist.contains(&uid)
552 && !config.dbus_stats.user_allowlist.contains(&user.username)
553 {
554 return false;
555 }
556
557 true
558 })
559 .map(|user| (user.uid, user))
560 .collect();
561
562 Some(result)
563}
564
565async fn get_well_known_to_peer_names(
566 dbus_proxy: &DBusProxy<'_>,
567) -> Result<HashMap<String, String>, MonitordDbusStatsError> {
568 let dbus_names = dbus_proxy.list_names().await?;
569 let mut result = HashMap::new();
570
571 for owned_busname in dbus_names.iter() {
572 let name: &BusName = owned_busname;
573 if let BusName::WellKnown(_) = name {
574 let owner = dbus_proxy.get_name_owner(name.clone()).await?;
576 result.insert(owner.to_string(), name.to_string());
577 }
578 }
579
580 Ok(result)
581}
582
583pub async fn parse_dbus_stats(
585 config: &crate::config::Config,
586 connection: &zbus::Connection,
587) -> Result<DBusStats, MonitordDbusStatsError> {
588 let dbus_proxy = DBusProxy::new(connection).await?;
589
590 let stats_proxy = StatsProxy::new(connection).await?;
591 let stats = stats_proxy.get_stats().await?;
592 let peers = parse_peer_accounting(
593 &dbus_proxy,
594 config,
595 stats.rest().get("org.bus1.DBus.Debug.Stats.PeerAccounting"),
596 )
597 .await?;
598
599 let dbus_stats = DBusStats {
600 serial: stats.serial(),
601 active_connections: stats.active_connections(),
602 incomplete_connections: stats.incomplete_connections(),
603 bus_names: stats.bus_names(),
604 peak_bus_names: stats.peak_bus_names(),
605 peak_bus_names_per_connection: stats.peak_bus_names_per_connection(),
606 match_rules: stats.match_rules(),
607 peak_match_rules: stats.peak_match_rules(),
608 peak_match_rules_per_connection: stats.peak_match_rules_per_connection(),
609
610 dbus_broker_peer_accounting: filter_and_collect_peer_accounting(config, peers.as_ref()),
612 dbus_broker_cgroup_accounting: filter_and_collect_cgroup_accounting(config, peers.as_ref()),
613 dbus_broker_user_accounting: stats
614 .rest()
615 .get("org.bus1.DBus.Debug.Stats.UserAccounting")
616 .map(|user| parse_user_accounting(config, user))
617 .unwrap_or_default(),
618 };
619
620 Ok(dbus_stats)
621}
622
623pub async fn update_dbus_stats(
625 config: Arc<crate::config::Config>,
626 connection: zbus::Connection,
627 locked_machine_stats: Arc<RwLock<MachineStats>>,
628) -> anyhow::Result<()> {
629 match parse_dbus_stats(&config, &connection).await {
630 Ok(dbus_stats) => {
631 let mut machine_stats = locked_machine_stats.write().await;
632 machine_stats.dbus_stats = Some(dbus_stats)
633 }
634 Err(err) => error!("dbus stats failed: {:?}", err),
635 }
636 Ok(())
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642 use zvariant::{Array, OwnedValue, Str, Structure, Value};
643
644 #[test]
645 fn test_cur_max_pair_usage() {
646 let p = CurMaxPair { cur: 10, max: 100 };
647 assert_eq!(p.get_usage(), 90);
648 }
649
650 #[test]
651 fn test_combine_with_peer_option_summing() {
652 let mut cg = DBusBrokerCGroupAccounting {
653 name: "cg1".to_string(),
654 name_objects: Some(5),
655 match_bytes: None,
656 matches: Some(3),
657 reply_objects: None,
658 incoming_bytes: Some(10),
659 incoming_fds: None,
660 outgoing_bytes: Some(7),
661 outgoing_fds: Some(2),
662 activation_request_bytes: None,
663 activation_request_fds: Some(1),
664 };
665
666 let peer = DBusBrokerPeerAccounting {
667 id: ":1.1".to_string(),
668 well_known_name: Some("com.example".to_string()),
669 unix_user_id: Some(1000),
670 process_id: Some(1234),
671 unix_group_ids: Some(vec![1000]),
672 name_objects: Some(2),
673 match_bytes: Some(4),
674 matches: None,
675 reply_objects: Some(1),
676 incoming_bytes: None,
677 incoming_fds: Some(5),
678 outgoing_bytes: Some(3),
679 outgoing_fds: None,
680 activation_request_bytes: Some(8),
681 activation_request_fds: None,
682 };
683
684 cg.combine_with_peer(&peer);
685
686 assert_eq!(cg.name_objects, Some(7));
687 assert_eq!(cg.match_bytes, Some(4));
688 assert_eq!(cg.matches, Some(3));
689 assert_eq!(cg.reply_objects, Some(1));
690 assert_eq!(cg.incoming_bytes, Some(10));
691 assert_eq!(cg.incoming_fds, Some(5));
692 assert_eq!(cg.outgoing_bytes, Some(10));
693 assert_eq!(cg.outgoing_fds, Some(2));
694 assert_eq!(cg.activation_request_bytes, Some(8));
695 assert_eq!(cg.activation_request_fds, Some(1));
696 }
697
698 #[test]
699 fn test_parse_user_accounting_gating_and_parse() {
700 let mut cfg = crate::config::Config::default();
702 cfg.dbus_stats.user_stats = false;
703 let empty_val = Value::Array(Array::from(Vec::<Value>::new()));
704 let empty_owned = OwnedValue::try_from(empty_val).expect("owned value conversion");
705 assert!(parse_user_accounting(&cfg, &empty_owned).is_none());
706
707 cfg.dbus_stats.user_stats = true;
709 let empty_val = Value::Array(Array::from(Vec::<Value>::new()));
710 let owned = OwnedValue::try_from(empty_val).expect("should convert empty array");
711 let parsed = parse_user_accounting(&cfg, &owned).expect("should parse empty");
712 assert_eq!(parsed.len(), 0);
713
714 let non_array = OwnedValue::try_from(Value::U32(0)).expect("should convert u32 value");
716 assert!(parse_user_accounting(&cfg, &non_array).is_none());
717 }
718
719 #[test]
720 fn test_parse_user_struct_invalid_returns_none() {
721 let invalid = Value::Structure(Structure::from((
723 Value::Str(Str::from_static("not_uid")),
724 Value::U32(10),
725 Value::U32(20),
726 )));
727 assert!(parse_user_struct(&invalid).is_none());
728 }
729
730 #[test]
731 fn test_user_username_fallback() {
732 let mut user = DBusBrokerUserAccounting::new(999_999);
734 user.bytes = Some(CurMaxPair { cur: 5, max: 10 });
735 assert_eq!(&user.username, "999999");
737 }
738}