monitord/
dbus_stats.rs

1//! # dbus_stats module
2//!
3//! Handle getting statistics of our Dbus daemon/broker
4
5use std::collections::HashMap;
6use std::fs;
7use std::io;
8use std::sync::Arc;
9
10use anyhow::Result;
11use tokio::sync::RwLock;
12use tracing::error;
13use users::get_user_by_uid;
14use zbus::fdo::{DBusProxy, StatsProxy};
15use zbus::names::BusName;
16use zvariant::{Dict, OwnedValue, Value};
17
18use crate::MachineStats;
19
20// Unfortunately, various DBus daemons (ex: dbus-broker and dbus-daemon)
21// represent stats differently. Moreover, the stats vary across versions of the same daemon.
22// Hence, the code uses flexible approach providing max available information.
23
24#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
25pub struct DBusBrokerPeerAccounting {
26    pub id: String,
27    pub well_known_name: Option<String>,
28
29    // credentials
30    pub unix_user_id: Option<u32>,
31    pub process_id: Option<u32>,
32    pub unix_group_ids: Option<Vec<u32>>,
33    // ignoring LinuxSecurityLabel
34    // pub linux_security_label: Option<String>,
35
36    // stats
37    pub name_objects: Option<u32>,
38    pub match_bytes: Option<u32>,
39    pub matches: Option<u32>,
40    pub reply_objects: Option<u32>,
41    pub incoming_bytes: Option<u32>,
42    pub incoming_fds: Option<u32>,
43    pub outgoing_bytes: Option<u32>,
44    pub outgoing_fds: Option<u32>,
45    pub activation_request_bytes: Option<u32>,
46    pub activation_request_fds: Option<u32>,
47}
48
49impl DBusBrokerPeerAccounting {
50    pub fn get_cgroup_name(&self) -> Result<String, io::Error> {
51        let pid = self
52            .process_id
53            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "missing process_id"))?;
54
55        let path = format!("/proc/{}/cgroup", pid);
56        let content = fs::read_to_string(&path)?;
57
58        // ex: 0::/system.slice/metalos.classic.metald.service
59        let cgroup = content.strip_prefix("0::").ok_or_else(|| {
60            io::Error::new(io::ErrorKind::InvalidData, "unexpected cgroup format")
61        })?;
62
63        Ok(cgroup.trim().trim_matches('/').replace('/', "-"))
64    }
65}
66
67/* DBusBrokerCGroupAccounting is not present in org.freedesktop.DBus.Debug.Stats.GetStats output.
68 * We group by cgroup to avoid reporting individual peer stats which blows cardinality.
69 * This approach is not ideal, but good enough to identify abusive clients. */
70#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
71pub struct DBusBrokerCGroupAccounting {
72    pub name: String,
73
74    // stats
75    pub name_objects: Option<u32>,
76    pub match_bytes: Option<u32>,
77    pub matches: Option<u32>,
78    pub reply_objects: Option<u32>,
79    pub incoming_bytes: Option<u32>,
80    pub incoming_fds: Option<u32>,
81    pub outgoing_bytes: Option<u32>,
82    pub outgoing_fds: Option<u32>,
83    pub activation_request_bytes: Option<u32>,
84    pub activation_request_fds: Option<u32>,
85}
86
87impl DBusBrokerCGroupAccounting {
88    pub fn combine_with_peer(&mut self, peer: &DBusBrokerPeerAccounting) {
89        fn sum(a: &mut Option<u32>, b: &Option<u32>) {
90            *a = match (a.take(), b) {
91                (Some(x), Some(y)) => Some(x + y),
92                (Some(x), None) => Some(x),
93                (None, Some(y)) => Some(*y),
94                (None, None) => None,
95            };
96        }
97
98        sum(&mut self.name_objects, &peer.name_objects);
99        sum(&mut self.match_bytes, &peer.match_bytes);
100        sum(&mut self.matches, &peer.matches);
101        sum(&mut self.reply_objects, &peer.reply_objects);
102        sum(&mut self.incoming_bytes, &peer.incoming_bytes);
103        sum(&mut self.incoming_fds, &peer.incoming_fds);
104        sum(&mut self.outgoing_bytes, &peer.outgoing_bytes);
105        sum(&mut self.outgoing_fds, &peer.outgoing_fds);
106        sum(
107            &mut self.activation_request_bytes,
108            &peer.activation_request_bytes,
109        );
110        sum(
111            &mut self.activation_request_fds,
112            &peer.activation_request_fds,
113        );
114    }
115}
116
117#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
118pub struct CurMaxPair {
119    // dbus-broker maintains current value in an inverted form i.e. usage is max - cur
120    pub cur: u32,
121    pub max: u32,
122}
123
124impl CurMaxPair {
125    pub fn get_usage(&self) -> u32 {
126        // There is a theoretical possibility of max < cur due to various factors.
127        // I'll leave it for now to avoid premature optimizations.
128        self.max - self.cur
129    }
130}
131
132#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
133pub struct DBusBrokerUserAccounting {
134    pub uid: u32,
135
136    // aggregated stats
137    pub bytes: Option<CurMaxPair>,
138    pub fds: Option<CurMaxPair>,
139    pub matches: Option<CurMaxPair>,
140    pub objects: Option<CurMaxPair>,
141    // UserUsage provides detailed breakdown of the aggregated numbers.
142    // However, dbus-broker exposes usage as real values (not inverted, see CurMaxPair).
143}
144
145impl DBusBrokerUserAccounting {
146    fn new(uid: u32) -> Self {
147        Self {
148            uid,
149            ..Default::default()
150        }
151    }
152
153    pub fn get_name_for_metric(&self) -> String {
154        match get_user_by_uid(self.uid) {
155            Some(user) => user.name().to_string_lossy().into_owned(),
156            None => self.uid.to_string(),
157        }
158    }
159}
160
161#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
162pub struct DBusStats {
163    pub serial: Option<u32>,
164    pub active_connections: Option<u32>,
165    pub incomplete_connections: Option<u32>,
166    pub bus_names: Option<u32>,
167    pub peak_bus_names: Option<u32>,
168    pub peak_bus_names_per_connection: Option<u32>,
169    pub match_rules: Option<u32>,
170    pub peak_match_rules: Option<u32>,
171    pub peak_match_rules_per_connection: Option<u32>,
172
173    // dbus-broker specific stats
174    pub dbus_broker_peer_accounting: Option<HashMap<String, DBusBrokerPeerAccounting>>,
175    pub dbus_broker_user_accounting: Option<HashMap<u32, DBusBrokerUserAccounting>>,
176
177    // config options
178    pub peer_stats: bool,
179    pub cgroup_stats: bool,
180}
181
182impl DBusStats {
183    pub fn peer_accounting(&self) -> Option<&HashMap<String, DBusBrokerPeerAccounting>> {
184        match self.peer_stats {
185            true => self.dbus_broker_peer_accounting.as_ref(),
186            false => None,
187        }
188    }
189
190    pub fn cgroup_accounting(&self) -> Option<HashMap<String, DBusBrokerCGroupAccounting>> {
191        if !self.cgroup_stats {
192            return None;
193        }
194
195        let peer_accounting = self.dbus_broker_peer_accounting.as_ref()?;
196        let mut result: HashMap<String, DBusBrokerCGroupAccounting> = HashMap::new();
197
198        for peer in peer_accounting.values() {
199            let cgroup_name = match peer.get_cgroup_name() {
200                Ok(name) => name,
201                Err(err) => {
202                    error!("Failed to get cgroup name for peer {}: {}", peer.id, err);
203                    continue;
204                }
205            };
206
207            let entry =
208                result
209                    .entry(cgroup_name.clone())
210                    .or_insert_with(|| DBusBrokerCGroupAccounting {
211                        name: cgroup_name,
212                        ..Default::default()
213                    });
214
215            entry.combine_with_peer(peer);
216        }
217
218        Some(result)
219    }
220
221    pub fn user_accounting(&self) -> Option<&HashMap<u32, DBusBrokerUserAccounting>> {
222        self.dbus_broker_user_accounting.as_ref()
223    }
224}
225
226fn get_u32(dict: &Dict, key: &str) -> Option<u32> {
227    let value_key: Value = key.into();
228    dict.get(&value_key).ok().and_then(|v| match v.flatten() {
229        Some(Value::U32(val)) => Some(*val),
230        _ => None,
231    })
232}
233
234fn get_u32_vec(dict: &Dict, key: &str) -> Option<Vec<u32>> {
235    let value_key: Value = key.into();
236    dict.get(&value_key).ok().and_then(|v| match v.flatten() {
237        Some(Value::Array(array)) => {
238            let vec: Vec<u32> = array
239                .iter()
240                .filter_map(|item| {
241                    if let Value::U32(num) = item {
242                        Some(*num)
243                    } else {
244                        None
245                    }
246                })
247                .collect();
248
249            Some(vec)
250        }
251        _ => None,
252    })
253}
254
255/* Parse DBusBrokerPeerAccounting from OwnedValue.
256 * Expected structure:
257 * struct {
258 *     string ":1.2197907"
259 *     array [
260 *         dict entry(
261 *              string "UnixUserID"
262 *              variant uint32 0
263 *         )
264 *         ... other fields
265 *     ]
266 *     array [
267 *         dict entry(
268 *              string "NameObjects"
269 *              uint32 1
270 *         )
271 *         ... other fields
272 *     ]
273 * }
274 */
275
276fn parse_peer_struct(
277    peer_value: &Value,
278    well_known_to_peer_names: &HashMap<String, String>,
279) -> Option<DBusBrokerPeerAccounting> {
280    let peer_struct = match peer_value {
281        Value::Structure(peer_struct) => peer_struct,
282        _ => return None,
283    };
284
285    match peer_struct.fields() {
286        [Value::Str(id), Value::Dict(credentials), Value::Dict(stats), ..] => {
287            Some(DBusBrokerPeerAccounting {
288                id: id.to_string(),
289                well_known_name: well_known_to_peer_names.get(id.as_str()).cloned(),
290                unix_user_id: get_u32(credentials, "UnixUserID"),
291                process_id: get_u32(credentials, "ProcessID"),
292                unix_group_ids: get_u32_vec(credentials, "UnixGroupIDs"),
293                name_objects: get_u32(stats, "NameObjects"),
294                match_bytes: get_u32(stats, "MatchBytes"),
295                matches: get_u32(stats, "Matches"),
296                reply_objects: get_u32(stats, "ReplyObjects"),
297                incoming_bytes: get_u32(stats, "IncomingBytes"),
298                incoming_fds: get_u32(stats, "IncomingFds"),
299                outgoing_bytes: get_u32(stats, "OutgoingBytes"),
300                outgoing_fds: get_u32(stats, "OutgoingFds"),
301                activation_request_bytes: get_u32(stats, "ActivationRequestBytes"),
302                activation_request_fds: get_u32(stats, "ActivationRequestFds"),
303            })
304        }
305        _ => None,
306    }
307}
308
309fn parse_peer_accounting(
310    config: &crate::config::Config,
311    owned_value: &OwnedValue,
312    well_known_to_peer_names: &HashMap<String, String>,
313) -> Option<HashMap<String, DBusBrokerPeerAccounting>> {
314    // need to keep collecting peer stats when cgroup_stats=true
315    // since cgroup_stats is a derivative of peer stats
316    if !config.dbus_stats.peer_stats && !config.dbus_stats.cgroup_stats {
317        return None;
318    }
319
320    let value: &Value = owned_value;
321    let peers_value = match value {
322        Value::Array(peers_value) => peers_value,
323        _ => return None,
324    };
325
326    let result = peers_value
327        .iter()
328        .filter_map(|peer| parse_peer_struct(peer, well_known_to_peer_names))
329        .map(|peer| (peer.id.clone(), peer))
330        .collect();
331
332    Some(result)
333}
334
335/* Parse DBusBrokerUserAccounting from OwnedValue.
336 * Expected structure:
337 * struct {
338 *     uint32 0
339 *     array [
340 *         struct {
341 *             string "Bytes"
342 *             uint32 536843240
343 *             uint32 536870912
344 *         }
345 *         ... more fields
346 *     ]
347 *     # TODO parse usages, ignoring for now
348 *     # see src/bus/driver.c:2258
349 *     # the part below is not parsed
350 *     array [
351 *         dict entry(
352 *             uint32 0
353 *             array [
354 *             dict entry(
355 *                 string "Bytes"
356 *                 uint32 27672
357 *             )
358 *             ... more fields
359 *             ]
360 *         )
361 *     ]
362 * }
363 */
364
365fn parse_user_struct(user_value: &Value) -> Option<DBusBrokerUserAccounting> {
366    let user_struct = match user_value {
367        Value::Structure(user_struct) => user_struct,
368        _ => return None,
369    };
370
371    match user_struct.fields() {
372        [Value::U32(uid), Value::Array(user_stats), ..] => {
373            let mut user = DBusBrokerUserAccounting::new(*uid);
374            for user_stat in user_stats.iter() {
375                if let Value::Structure(user_stat) = user_stat {
376                    if let [Value::Str(name), Value::U32(cur), Value::U32(max), ..] =
377                        user_stat.fields()
378                    {
379                        let pair = CurMaxPair {
380                            cur: *cur,
381                            max: *max,
382                        };
383                        match name.as_str() {
384                            "Bytes" => user.bytes = Some(pair),
385                            "Fds" => user.fds = Some(pair),
386                            "Matches" => user.matches = Some(pair),
387                            "Objects" => user.objects = Some(pair),
388                            _ => {} // ignore other fields
389                        }
390                    }
391                }
392            }
393
394            Some(user)
395        }
396        _ => None,
397    }
398}
399
400fn parse_user_accounting(
401    config: &crate::config::Config,
402    owned_value: &OwnedValue,
403) -> Option<HashMap<u32, DBusBrokerUserAccounting>> {
404    // reject collecting user stats when told so
405    if !config.dbus_stats.user_stats {
406        return None;
407    }
408
409    let value: &Value = owned_value;
410    let users_value = match value {
411        Value::Array(users_value) => users_value,
412        _ => return None,
413    };
414
415    let result = users_value
416        .iter()
417        .filter_map(parse_user_struct)
418        .map(|user| (user.uid, user))
419        .collect();
420
421    Some(result)
422}
423
424async fn get_well_known_to_peer_names(
425    dbus_proxy: &DBusProxy<'_>,
426) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync>> {
427    let dbus_names = dbus_proxy.list_names().await?;
428    let mut result = HashMap::new();
429
430    for owned_busname in dbus_names.iter() {
431        let name: &BusName = owned_busname;
432        if let BusName::WellKnown(_) = name {
433            // TODO parallelize
434            let owner = dbus_proxy.get_name_owner(name.clone()).await?;
435            result.insert(owner.to_string(), name.to_string());
436        }
437    }
438
439    Ok(result)
440}
441
442/// Pull all units from dbus and count how system is setup and behaving
443pub async fn parse_dbus_stats(
444    config: &crate::config::Config,
445    connection: &zbus::Connection,
446) -> Result<DBusStats, Box<dyn std::error::Error + Send + Sync>> {
447    let dbus_proxy = DBusProxy::new(connection).await?;
448    let well_known_to_peer_names = get_well_known_to_peer_names(&dbus_proxy).await?;
449
450    let stats_proxy = StatsProxy::new(connection).await?;
451    let stats = stats_proxy.get_stats().await?;
452
453    let dbus_stats = DBusStats {
454        serial: stats.serial(),
455        active_connections: stats.active_connections(),
456        incomplete_connections: stats.incomplete_connections(),
457        bus_names: stats.bus_names(),
458        peak_bus_names: stats.peak_bus_names(),
459        peak_bus_names_per_connection: stats.peak_bus_names_per_connection(),
460        match_rules: stats.match_rules(),
461        peak_match_rules: stats.peak_match_rules(),
462        peak_match_rules_per_connection: stats.peak_match_rules_per_connection(),
463
464        // attempt to parse dbus-broker specific stats
465        dbus_broker_peer_accounting: stats
466            .rest()
467            .get("org.bus1.DBus.Debug.Stats.PeerAccounting")
468            .map(|peer| parse_peer_accounting(config, peer, &well_known_to_peer_names))
469            .unwrap_or_default(),
470        dbus_broker_user_accounting: stats
471            .rest()
472            .get("org.bus1.DBus.Debug.Stats.UserAccounting")
473            .map(|user| parse_user_accounting(config, user))
474            .unwrap_or_default(),
475
476        // have to keep settings since cgroup stats depends on peer stats
477        peer_stats: config.dbus_stats.peer_stats,
478        cgroup_stats: config.dbus_stats.cgroup_stats,
479    };
480
481    Ok(dbus_stats)
482}
483
484/// Async wrapper than can update dbus stats when passed a locked struct
485pub async fn update_dbus_stats(
486    config: crate::config::Config,
487    connection: zbus::Connection,
488    locked_machine_stats: Arc<RwLock<MachineStats>>,
489) -> anyhow::Result<()> {
490    match parse_dbus_stats(&config, &connection).await {
491        Ok(dbus_stats) => {
492            let mut machine_stats = locked_machine_stats.write().await;
493            machine_stats.dbus_stats = Some(dbus_stats)
494        }
495        Err(err) => error!("dbus stats failed: {:?}", err),
496    }
497    Ok(())
498}