1use std::collections::HashMap;
6use std::fs;
7use std::sync::Arc;
8
9use anyhow::Result;
10use tokio::sync::RwLock;
11use tracing::error;
12use users::get_user_by_uid;
13use zbus::fdo::{DBusProxy, StatsProxy};
14use zbus::names::BusName;
15use zvariant::{Dict, OwnedValue, Value};
16
17use crate::MachineStats;
18
19#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
24pub struct DBusBrokerPeerAccounting {
25 pub id: String,
26 pub well_known_name: Option<String>,
27
28 pub unix_user_id: Option<u32>,
30 pub process_id: Option<u32>,
31 pub unix_group_ids: Option<Vec<u32>>,
32 pub name_objects: Option<u32>,
37 pub match_bytes: Option<u32>,
38 pub matches: Option<u32>,
39 pub reply_objects: Option<u32>,
40 pub incoming_bytes: Option<u32>,
41 pub incoming_fds: Option<u32>,
42 pub outgoing_bytes: Option<u32>,
43 pub outgoing_fds: Option<u32>,
44 pub activation_request_bytes: Option<u32>,
45 pub activation_request_fds: Option<u32>,
46}
47
48impl DBusBrokerPeerAccounting {
49 pub fn get_name_for_metric(&self) -> String {
50 if let Some(ref well_known) = self.well_known_name {
51 return well_known.clone();
52 }
53
54 let formated_id = self
55 .id
56 .strip_prefix(':')
57 .unwrap_or(&self.id)
58 .replace(',', "-");
59
60 if let Some(ref pid) = self.process_id {
61 let path = format!("/proc/{}/comm", pid);
62 if let Ok(name) = fs::read_to_string(&path) {
63 return format!("{}-{}", name.trim(), formated_id);
66 }
67 }
68
69 formated_id
70 }
71}
72
73#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
74pub struct CurMaxPair {
75 pub cur: u32,
76 pub max: u32,
77}
78
79#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
80pub struct DBusBrokerUserAccounting {
81 pub uid: u32,
82
83 pub bytes: Option<CurMaxPair>,
85 pub fds: Option<CurMaxPair>,
86 pub matches: Option<CurMaxPair>,
87 pub objects: Option<CurMaxPair>,
88 }
91
92impl DBusBrokerUserAccounting {
93 fn new(uid: u32) -> Self {
94 Self {
95 uid,
96 ..Default::default()
97 }
98 }
99
100 pub fn get_name_for_metric(&self) -> String {
101 match get_user_by_uid(self.uid) {
102 Some(user) => user.name().to_string_lossy().into_owned(),
103 None => self.uid.to_string(),
104 }
105 }
106}
107
108#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
109pub struct DBusStats {
110 pub serial: Option<u32>,
111 pub active_connections: Option<u32>,
112 pub incomplete_connections: Option<u32>,
113 pub bus_names: Option<u32>,
114 pub peak_bus_names: Option<u32>,
115 pub peak_bus_names_per_connection: Option<u32>,
116 pub match_rules: Option<u32>,
117 pub peak_match_rules: Option<u32>,
118 pub peak_match_rules_per_connection: Option<u32>,
119
120 pub dbus_broker_peer_accounting: Option<HashMap<String, DBusBrokerPeerAccounting>>,
122 pub dbus_broker_user_accounting: Option<HashMap<u32, DBusBrokerUserAccounting>>,
123}
124
125fn get_u32(dict: &Dict, key: &str) -> Option<u32> {
126 let value_key: Value = key.into();
127 dict.get(&value_key).ok().and_then(|v| match v.flatten() {
128 Some(Value::U32(val)) => Some(*val),
129 _ => None,
130 })
131}
132
133fn get_u32_vec(dict: &Dict, key: &str) -> Option<Vec<u32>> {
134 let value_key: Value = key.into();
135 dict.get(&value_key).ok().and_then(|v| match v.flatten() {
136 Some(Value::Array(array)) => {
137 let vec: Vec<u32> = array
138 .iter()
139 .filter_map(|item| {
140 if let Value::U32(num) = item {
141 Some(*num)
142 } else {
143 None
144 }
145 })
146 .collect();
147
148 Some(vec)
149 }
150 _ => None,
151 })
152}
153
154fn parse_peer_struct(
176 peer_value: &Value,
177 well_known_to_peer_names: &HashMap<String, String>,
178) -> Option<DBusBrokerPeerAccounting> {
179 let peer_struct = match peer_value {
180 Value::Structure(peer_struct) => peer_struct,
181 _ => return None,
182 };
183
184 match peer_struct.fields() {
185 [Value::Str(id), Value::Dict(credentials), Value::Dict(stats), ..] => {
186 Some(DBusBrokerPeerAccounting {
187 id: id.to_string(),
188 well_known_name: well_known_to_peer_names.get(id.as_str()).cloned(),
189 unix_user_id: get_u32(credentials, "UnixUserID"),
190 process_id: get_u32(credentials, "ProcessID"),
191 unix_group_ids: get_u32_vec(credentials, "UnixGroupIDs"),
192 name_objects: get_u32(stats, "NameObjects"),
193 match_bytes: get_u32(stats, "MatchBytes"),
194 matches: get_u32(stats, "Matches"),
195 reply_objects: get_u32(stats, "ReplyObjects"),
196 incoming_bytes: get_u32(stats, "IncomingBytes"),
197 incoming_fds: get_u32(stats, "IncomingFds"),
198 outgoing_bytes: get_u32(stats, "OutgoingBytes"),
199 outgoing_fds: get_u32(stats, "OutgoingFds"),
200 activation_request_bytes: get_u32(stats, "ActivationRequestBytes"),
201 activation_request_fds: get_u32(stats, "ActivationRequestFds"),
202 })
203 }
204 _ => None,
205 }
206}
207
208fn parse_peer_accounting(
209 owned_value: &OwnedValue,
210 well_known_to_peer_names: &HashMap<String, String>,
211) -> Option<HashMap<String, DBusBrokerPeerAccounting>> {
212 let value: &Value = owned_value;
213 let peers_value = match value {
214 Value::Array(peers_value) => peers_value,
215 _ => return None,
216 };
217
218 let result = peers_value
219 .iter()
220 .filter_map(|peer| parse_peer_struct(peer, well_known_to_peer_names))
221 .map(|peer| (peer.id.clone(), peer))
222 .collect();
223
224 Some(result)
225}
226
227fn parse_user_struct(user_value: &Value) -> Option<DBusBrokerUserAccounting> {
258 let user_struct = match user_value {
259 Value::Structure(user_struct) => user_struct,
260 _ => return None,
261 };
262
263 match user_struct.fields() {
264 [Value::U32(uid), Value::Array(user_stats), ..] => {
265 let mut user = DBusBrokerUserAccounting::new(*uid);
266 for user_stat in user_stats.iter() {
267 if let Value::Structure(user_stat) = user_stat {
268 if let [Value::Str(name), Value::U32(cur), Value::U32(max), ..] =
269 user_stat.fields()
270 {
271 let pair = CurMaxPair {
272 cur: *cur,
273 max: *max,
274 };
275 match name.as_str() {
276 "Bytes" => user.bytes = Some(pair),
277 "Fds" => user.fds = Some(pair),
278 "Matches" => user.matches = Some(pair),
279 "Objects" => user.objects = Some(pair),
280 _ => {} }
282 }
283 }
284 }
285
286 Some(user)
287 }
288 _ => None,
289 }
290}
291
292fn parse_user_accounting(
293 owned_value: &OwnedValue,
294) -> Option<HashMap<u32, DBusBrokerUserAccounting>> {
295 let value: &Value = owned_value;
296 let users_value = match value {
297 Value::Array(users_value) => users_value,
298 _ => return None,
299 };
300
301 let result = users_value
302 .iter()
303 .filter_map(parse_user_struct)
304 .map(|user| (user.uid, user))
305 .collect();
306
307 Some(result)
308}
309
310async fn get_well_known_to_peer_names(
311 dbus_proxy: &DBusProxy<'_>,
312) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync>> {
313 let dbus_names = dbus_proxy.list_names().await?;
314 let mut result = HashMap::new();
315
316 for owned_busname in dbus_names.iter() {
317 let name: &BusName = owned_busname;
318 if let BusName::WellKnown(_) = name {
319 let owner = dbus_proxy.get_name_owner(name.clone()).await?;
321 result.insert(owner.to_string(), name.to_string());
322 }
323 }
324
325 Ok(result)
326}
327
328pub async fn parse_dbus_stats(
330 connection: &zbus::Connection,
331) -> Result<DBusStats, Box<dyn std::error::Error + Send + Sync>> {
332 let dbus_proxy = DBusProxy::new(connection).await?;
333 let well_known_to_peer_names = get_well_known_to_peer_names(&dbus_proxy).await?;
334
335 let stats_proxy = StatsProxy::new(connection).await?;
336 let stats = stats_proxy.get_stats().await?;
337
338 let dbus_stats = DBusStats {
339 serial: stats.serial(),
340 active_connections: stats.active_connections(),
341 incomplete_connections: stats.incomplete_connections(),
342 bus_names: stats.bus_names(),
343 peak_bus_names: stats.peak_bus_names(),
344 peak_bus_names_per_connection: stats.peak_bus_names_per_connection(),
345 match_rules: stats.match_rules(),
346 peak_match_rules: stats.peak_match_rules(),
347 peak_match_rules_per_connection: stats.peak_match_rules_per_connection(),
348
349 dbus_broker_peer_accounting: stats
351 .rest()
352 .get("org.bus1.DBus.Debug.Stats.PeerAccounting")
353 .map(|peer| parse_peer_accounting(peer, &well_known_to_peer_names))
354 .unwrap_or_default(),
355 dbus_broker_user_accounting: stats
356 .rest()
357 .get("org.bus1.DBus.Debug.Stats.UserAccounting")
358 .map(parse_user_accounting)
359 .unwrap_or_default(),
360 };
361
362 Ok(dbus_stats)
363}
364
365pub async fn update_dbus_stats(
367 connection: zbus::Connection,
368 locked_machine_stats: Arc<RwLock<MachineStats>>,
369) -> anyhow::Result<()> {
370 match parse_dbus_stats(&connection).await {
371 Ok(dbus_stats) => {
372 let mut machine_stats = locked_machine_stats.write().await;
373 machine_stats.dbus_stats = Some(dbus_stats)
374 }
375 Err(err) => error!("dbus stats failed: {:?}", err),
376 }
377 Ok(())
378}