monitord/
dbus_stats.rs

1//! # dbus_stats module
2//!
3//! Handle getting statistics of our Dbus daemon/broker
4
5use std::collections::HashMap;
6use std::fs;
7use std::sync::Arc;
8
9use anyhow::Result;
10use tokio::sync::RwLock;
11use tracing::error;
12use users::get_user_by_uid;
13use zbus::fdo::{DBusProxy, StatsProxy};
14use zbus::names::BusName;
15use zvariant::{Dict, OwnedValue, Value};
16
17use crate::MachineStats;
18
19// Unfortunately, various DBus daemons (ex: dbus-broker and dbus-daemon)
20// represent stats differently. Moreover, the stats vary across versions of the same daemon.
21// Hence, the code uses flexible approach providing max available information.
22
23#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
24pub struct DBusBrokerPeerAccounting {
25    pub id: String,
26    pub well_known_name: Option<String>,
27
28    // credentials
29    pub unix_user_id: Option<u32>,
30    pub process_id: Option<u32>,
31    pub unix_group_ids: Option<Vec<u32>>,
32    // ignoring LinuxSecurityLabel
33    // pub linux_security_label: Option<String>,
34
35    // stats
36    pub name_objects: Option<u32>,
37    pub match_bytes: Option<u32>,
38    pub matches: Option<u32>,
39    pub reply_objects: Option<u32>,
40    pub incoming_bytes: Option<u32>,
41    pub incoming_fds: Option<u32>,
42    pub outgoing_bytes: Option<u32>,
43    pub outgoing_fds: Option<u32>,
44    pub activation_request_bytes: Option<u32>,
45    pub activation_request_fds: Option<u32>,
46}
47
48impl DBusBrokerPeerAccounting {
49    pub fn get_name_for_metric(&self) -> String {
50        if let Some(ref well_known) = self.well_known_name {
51            return well_known.clone();
52        }
53
54        let formated_id = self
55            .id
56            .strip_prefix(':')
57            .unwrap_or(&self.id)
58            .replace(',', "-");
59
60        if let Some(ref pid) = self.process_id {
61            let path = format!("/proc/{}/comm", pid);
62            if let Ok(name) = fs::read_to_string(&path) {
63                // There might be multiple connections from the same process.
64                // As result, need to suffix result with connection_id for uniqueness
65                return format!("{}-{}", name.trim(), formated_id);
66            }
67        }
68
69        formated_id
70    }
71}
72
73#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
74pub struct CurMaxPair {
75    pub cur: u32,
76    pub max: u32,
77}
78
79#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
80pub struct DBusBrokerUserAccounting {
81    pub uid: u32,
82
83    // stats
84    pub bytes: Option<CurMaxPair>,
85    pub fds: Option<CurMaxPair>,
86    pub matches: Option<CurMaxPair>,
87    pub objects: Option<CurMaxPair>,
88    // TODO UserUsage
89    // see src/util/user.h
90}
91
92impl DBusBrokerUserAccounting {
93    fn new(uid: u32) -> Self {
94        Self {
95            uid,
96            ..Default::default()
97        }
98    }
99
100    pub fn get_name_for_metric(&self) -> String {
101        match get_user_by_uid(self.uid) {
102            Some(user) => user.name().to_string_lossy().into_owned(),
103            None => self.uid.to_string(),
104        }
105    }
106}
107
108#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)]
109pub struct DBusStats {
110    pub serial: Option<u32>,
111    pub active_connections: Option<u32>,
112    pub incomplete_connections: Option<u32>,
113    pub bus_names: Option<u32>,
114    pub peak_bus_names: Option<u32>,
115    pub peak_bus_names_per_connection: Option<u32>,
116    pub match_rules: Option<u32>,
117    pub peak_match_rules: Option<u32>,
118    pub peak_match_rules_per_connection: Option<u32>,
119
120    // dbus-broker specific stats
121    pub dbus_broker_peer_accounting: Option<HashMap<String, DBusBrokerPeerAccounting>>,
122    pub dbus_broker_user_accounting: Option<HashMap<u32, DBusBrokerUserAccounting>>,
123}
124
125fn get_u32(dict: &Dict, key: &str) -> Option<u32> {
126    let value_key: Value = key.into();
127    dict.get(&value_key).ok().and_then(|v| match v.flatten() {
128        Some(Value::U32(val)) => Some(*val),
129        _ => None,
130    })
131}
132
133fn get_u32_vec(dict: &Dict, key: &str) -> Option<Vec<u32>> {
134    let value_key: Value = key.into();
135    dict.get(&value_key).ok().and_then(|v| match v.flatten() {
136        Some(Value::Array(array)) => {
137            let vec: Vec<u32> = array
138                .iter()
139                .filter_map(|item| {
140                    if let Value::U32(num) = item {
141                        Some(*num)
142                    } else {
143                        None
144                    }
145                })
146                .collect();
147
148            Some(vec)
149        }
150        _ => None,
151    })
152}
153
154/* Parse DBusBrokerPeerAccounting from OwnedValue.
155 * Expected structure:
156 * struct {
157 *     string ":1.2197907"
158 *     array [
159 *         dict entry(
160 *              string "UnixUserID"
161 *              variant uint32 0
162 *         )
163 *         ... other fields
164 *     ]
165 *     array [
166 *         dict entry(
167 *              string "NameObjects"
168 *              uint32 1
169 *         )
170 *         ... other fields
171 *     ]
172 * }
173 */
174
175fn parse_peer_struct(
176    peer_value: &Value,
177    well_known_to_peer_names: &HashMap<String, String>,
178) -> Option<DBusBrokerPeerAccounting> {
179    let peer_struct = match peer_value {
180        Value::Structure(peer_struct) => peer_struct,
181        _ => return None,
182    };
183
184    match peer_struct.fields() {
185        [Value::Str(id), Value::Dict(credentials), Value::Dict(stats), ..] => {
186            Some(DBusBrokerPeerAccounting {
187                id: id.to_string(),
188                well_known_name: well_known_to_peer_names.get(id.as_str()).cloned(),
189                unix_user_id: get_u32(credentials, "UnixUserID"),
190                process_id: get_u32(credentials, "ProcessID"),
191                unix_group_ids: get_u32_vec(credentials, "UnixGroupIDs"),
192                name_objects: get_u32(stats, "NameObjects"),
193                match_bytes: get_u32(stats, "MatchBytes"),
194                matches: get_u32(stats, "Matches"),
195                reply_objects: get_u32(stats, "ReplyObjects"),
196                incoming_bytes: get_u32(stats, "IncomingBytes"),
197                incoming_fds: get_u32(stats, "IncomingFds"),
198                outgoing_bytes: get_u32(stats, "OutgoingBytes"),
199                outgoing_fds: get_u32(stats, "OutgoingFds"),
200                activation_request_bytes: get_u32(stats, "ActivationRequestBytes"),
201                activation_request_fds: get_u32(stats, "ActivationRequestFds"),
202            })
203        }
204        _ => None,
205    }
206}
207
208fn parse_peer_accounting(
209    owned_value: &OwnedValue,
210    well_known_to_peer_names: &HashMap<String, String>,
211) -> Option<HashMap<String, DBusBrokerPeerAccounting>> {
212    let value: &Value = owned_value;
213    let peers_value = match value {
214        Value::Array(peers_value) => peers_value,
215        _ => return None,
216    };
217
218    let result = peers_value
219        .iter()
220        .filter_map(|peer| parse_peer_struct(peer, well_known_to_peer_names))
221        .map(|peer| (peer.id.clone(), peer))
222        .collect();
223
224    Some(result)
225}
226
227/* Parse DBusBrokerUserAccounting from OwnedValue.
228 * Expected structure:
229 * struct {
230 *     uint32 0
231 *     array [
232 *         struct {
233 *             string "Bytes"
234 *             uint32 536843240
235 *             uint32 536870912
236 *         }
237 *         ... more fields
238 *     ]
239 *     # TODO parse usages, ignoring for now
240 *     # see src/bus/driver.c:2258
241 *     # the part below is not parsed
242 *     array [
243 *         dict entry(
244 *             uint32 0
245 *             array [
246 *             dict entry(
247 *                 string "Bytes"
248 *                 uint32 27672
249 *             )
250 *             ... more fields
251 *             ]
252 *         )
253 *     ]
254 * }
255 */
256
257fn parse_user_struct(user_value: &Value) -> Option<DBusBrokerUserAccounting> {
258    let user_struct = match user_value {
259        Value::Structure(user_struct) => user_struct,
260        _ => return None,
261    };
262
263    match user_struct.fields() {
264        [Value::U32(uid), Value::Array(user_stats), ..] => {
265            let mut user = DBusBrokerUserAccounting::new(*uid);
266            for user_stat in user_stats.iter() {
267                if let Value::Structure(user_stat) = user_stat {
268                    if let [Value::Str(name), Value::U32(cur), Value::U32(max), ..] =
269                        user_stat.fields()
270                    {
271                        let pair = CurMaxPair {
272                            cur: *cur,
273                            max: *max,
274                        };
275                        match name.as_str() {
276                            "Bytes" => user.bytes = Some(pair),
277                            "Fds" => user.fds = Some(pair),
278                            "Matches" => user.matches = Some(pair),
279                            "Objects" => user.objects = Some(pair),
280                            _ => {} // ignore other fields
281                        }
282                    }
283                }
284            }
285
286            Some(user)
287        }
288        _ => None,
289    }
290}
291
292fn parse_user_accounting(
293    owned_value: &OwnedValue,
294) -> Option<HashMap<u32, DBusBrokerUserAccounting>> {
295    let value: &Value = owned_value;
296    let users_value = match value {
297        Value::Array(users_value) => users_value,
298        _ => return None,
299    };
300
301    let result = users_value
302        .iter()
303        .filter_map(parse_user_struct)
304        .map(|user| (user.uid, user))
305        .collect();
306
307    Some(result)
308}
309
310async fn get_well_known_to_peer_names(
311    dbus_proxy: &DBusProxy<'_>,
312) -> Result<HashMap<String, String>, Box<dyn std::error::Error + Send + Sync>> {
313    let dbus_names = dbus_proxy.list_names().await?;
314    let mut result = HashMap::new();
315
316    for owned_busname in dbus_names.iter() {
317        let name: &BusName = owned_busname;
318        if let BusName::WellKnown(_) = name {
319            // TODO parallelize
320            let owner = dbus_proxy.get_name_owner(name.clone()).await?;
321            result.insert(owner.to_string(), name.to_string());
322        }
323    }
324
325    Ok(result)
326}
327
328/// Pull all units from dbus and count how system is setup and behaving
329pub async fn parse_dbus_stats(
330    connection: &zbus::Connection,
331) -> Result<DBusStats, Box<dyn std::error::Error + Send + Sync>> {
332    let dbus_proxy = DBusProxy::new(connection).await?;
333    let well_known_to_peer_names = get_well_known_to_peer_names(&dbus_proxy).await?;
334
335    let stats_proxy = StatsProxy::new(connection).await?;
336    let stats = stats_proxy.get_stats().await?;
337
338    let dbus_stats = DBusStats {
339        serial: stats.serial(),
340        active_connections: stats.active_connections(),
341        incomplete_connections: stats.incomplete_connections(),
342        bus_names: stats.bus_names(),
343        peak_bus_names: stats.peak_bus_names(),
344        peak_bus_names_per_connection: stats.peak_bus_names_per_connection(),
345        match_rules: stats.match_rules(),
346        peak_match_rules: stats.peak_match_rules(),
347        peak_match_rules_per_connection: stats.peak_match_rules_per_connection(),
348
349        // attempt to parse dbus-broker specific stats
350        dbus_broker_peer_accounting: stats
351            .rest()
352            .get("org.bus1.DBus.Debug.Stats.PeerAccounting")
353            .map(|peer| parse_peer_accounting(peer, &well_known_to_peer_names))
354            .unwrap_or_default(),
355        dbus_broker_user_accounting: stats
356            .rest()
357            .get("org.bus1.DBus.Debug.Stats.UserAccounting")
358            .map(parse_user_accounting)
359            .unwrap_or_default(),
360    };
361
362    Ok(dbus_stats)
363}
364
365/// Async wrapper than can update dbus stats when passed a locked struct
366pub async fn update_dbus_stats(
367    connection: zbus::Connection,
368    locked_machine_stats: Arc<RwLock<MachineStats>>,
369) -> anyhow::Result<()> {
370    match parse_dbus_stats(&connection).await {
371        Ok(dbus_stats) => {
372            let mut machine_stats = locked_machine_stats.write().await;
373            machine_stats.dbus_stats = Some(dbus_stats)
374        }
375        Err(err) => error!("dbus stats failed: {:?}", err),
376    }
377    Ok(())
378}