veecle_telemetry/
protocol.rs

1//! Telemetry protocol types and message definitions.
2//!
3//! This module defines the core data structures used for telemetry message exchange.
4//! It includes message types for logging, tracing, and time synchronization, as well
5//! as supporting types for execution tracking and attribute management.
6//!
7//! # Message Types
8//!
9//! The protocol supports several categories of telemetry messages:
10//!
11//! - **Log Messages** - Structured logging with severity levels and attributes
12//! - **Tracing Messages** - Distributed tracing with spans, events, and links
13//! - **Time Sync Messages** - Time synchronization between systems
14//!
15//! # Thread Tracking
16//!
17//! Each message is associated with an [`ThreadId`] that uniquely identifies
18//! the thread it came from (globally unique across all processes).
19//! This allows telemetry data from multiple threads to be correlated and analyzed separately.
20//!
21//! # Serialization
22//!
23//! All protocol types implement [`serde::Serialize`] and optionally [`serde::Deserialize`]
24//! (when the `alloc` feature is enabled) for easy serialization to various formats.
25
26#[cfg(feature = "alloc")]
27use alloc::vec::Vec;
28use core::fmt;
29use core::num::NonZeroU64;
30use core::str::FromStr;
31
32use serde::{Deserialize, Serialize};
33
34use crate::SpanContext;
35pub use crate::id::{ProcessId, SpanId};
36#[cfg(feature = "alloc")]
37use crate::to_static::ToStatic;
38use crate::types::{ListType, StringType, list_from_slice};
39use crate::value::KeyValue;
40
41/// A specialised form of [`list_from_slice`] for attributes.
42pub fn attribute_list_from_slice<'a>(slice: &'a [KeyValue<'a>]) -> AttributeListType<'a> {
43    list_from_slice::<KeyValue<'a>>(slice)
44}
45
46/// Type alias for a list of attributes.
47pub type AttributeListType<'a> = ListType<'a, KeyValue<'a>>;
48
49#[cfg(feature = "alloc")]
50impl ToStatic for AttributeListType<'_> {
51    type Static = AttributeListType<'static>;
52
53    fn to_static(&self) -> Self::Static {
54        self.iter()
55            .map(|item| item.to_static())
56            .collect::<Vec<_>>()
57            .into()
58    }
59}
60
61/// A globally-unique id identifying a thread within a specific process.
62///
63/// The primary purpose of this id is to allow the consumer of telemetry messages to associate
64/// spans with the callstack they came from to reconstruct parent-child relationships. On a normal
65/// operating system this is the thread, on other systems it should be whatever is the closest
66/// equivalent, e.g. for FreeRTOS it would be a task. On a single threaded bare-metal system it
67/// would be a constant as there is only the one callstack.
68#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
69pub struct ThreadId {
70    /// The globally-unique id for the process this thread is within.
71    pub process: ProcessId,
72
73    /// The process-unique id for this thread within the process.
74    raw: NonZeroU64,
75}
76
77impl ThreadId {
78    /// Creates a [`ThreadId`] from a raw value.
79    ///
80    /// Extra care needs to be taken that this is not a constant value or re-used within this
81    /// process in any way.
82    pub const fn from_raw(process: ProcessId, raw: NonZeroU64) -> Self {
83        Self { process, raw }
84    }
85
86    /// Creates a [`ThreadId`] for the current thread, using OS specific means to acquire it.
87    #[cfg(feature = "enable")]
88    pub(crate) fn current(process: ProcessId) -> Self {
89        #[cfg_attr(not(feature = "std"), expect(unreachable_code))]
90        Self::from_raw(process, {
91            #[cfg(feature = "std")]
92            {
93                use veecle_osal_std::thread::{Thread, ThreadAbstraction};
94                Thread::current_thread_id()
95            }
96
97            #[cfg(not(feature = "std"))]
98            {
99                panic!("not yet supported")
100            }
101        })
102    }
103}
104
105impl fmt::Display for ThreadId {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        let Self { process, raw } = self;
108        write!(f, "{process}:{raw:032x}")
109    }
110}
111
112/// Errors that can occur while parsing [`ThreadId`] from a string.
113#[derive(Clone, Debug)]
114pub enum ParseThreadIdError {
115    /// The string is missing a `:` separator.
116    MissingSeparator,
117
118    /// The embedded [`ProcessId`] failed to parse.
119    InvalidProcessId(core::num::ParseIntError),
120
121    /// The embedded [`ThreadId`] failed to parse.
122    InvalidThreadId(core::num::ParseIntError),
123
124    /// The embedded [`ThreadId`] had a zero value.
125    ZeroThreadId,
126}
127
128impl fmt::Display for ParseThreadIdError {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        match self {
131            Self::MissingSeparator => f.write_str("missing ':' separator"),
132            Self::InvalidProcessId(_) => f.write_str("failed to parse process id"),
133            Self::InvalidThreadId(_) => f.write_str("failed to parse thread id"),
134            Self::ZeroThreadId => f.write_str("zero thread id"),
135        }
136    }
137}
138
139impl core::error::Error for ParseThreadIdError {
140    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
141        match self {
142            Self::MissingSeparator => None,
143            Self::InvalidProcessId(error) => Some(error),
144            Self::InvalidThreadId(error) => Some(error),
145            Self::ZeroThreadId => None,
146        }
147    }
148}
149
150impl FromStr for ThreadId {
151    type Err = ParseThreadIdError;
152
153    fn from_str(s: &str) -> Result<Self, Self::Err> {
154        let Some((process, thread)) = s.split_once(":") else {
155            return Err(ParseThreadIdError::MissingSeparator);
156        };
157        let process = ProcessId::from_str(process).map_err(ParseThreadIdError::InvalidProcessId)?;
158        let thread = NonZeroU64::new(
159            u64::from_str_radix(thread, 16).map_err(ParseThreadIdError::InvalidThreadId)?,
160        )
161        .ok_or(ParseThreadIdError::ZeroThreadId)?;
162        Ok(Self::from_raw(process, thread))
163    }
164}
165
166impl serde::Serialize for ThreadId {
167    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
168    where
169        S: serde::Serializer,
170    {
171        let mut bytes = [0u8; 49];
172
173        hex::encode_to_slice(self.process.to_raw().to_le_bytes(), &mut bytes[..32]).unwrap();
174        bytes[32] = b':';
175        hex::encode_to_slice(self.raw.get().to_le_bytes(), &mut bytes[33..]).unwrap();
176
177        serializer.serialize_str(str::from_utf8(&bytes).unwrap())
178    }
179}
180
181impl<'de> serde::Deserialize<'de> for ThreadId {
182    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
183    where
184        D: serde::Deserializer<'de>,
185    {
186        use serde::de::Error;
187
188        let string = <&str>::deserialize(deserializer)?;
189
190        if string.len() != 49 {
191            return Err(D::Error::invalid_length(
192                string.len(),
193                &"expected 49 byte string",
194            ));
195        }
196
197        let bytes = string.as_bytes();
198
199        if bytes[32] != b':' {
200            return Err(D::Error::invalid_value(
201                serde::de::Unexpected::Str(string),
202                &"expected : separator at byte 32",
203            ));
204        }
205
206        let mut process = [0; 16];
207        hex::decode_to_slice(&bytes[..32], &mut process).map_err(D::Error::custom)?;
208        let process = ProcessId::from_raw(u128::from_le_bytes(process));
209
210        let mut thread = [0; 8];
211        hex::decode_to_slice(&bytes[33..], &mut thread).map_err(D::Error::custom)?;
212        let thread = NonZeroU64::new(u64::from_le_bytes(thread))
213            .ok_or_else(|| D::Error::custom("zero thread id"))?;
214
215        Ok(Self::from_raw(process, thread))
216    }
217}
218
219/// A telemetry message associated with a specific execution thread.
220///
221/// This structure wraps a telemetry message with its execution context,
222/// allowing messages from different executions to be properly correlated.
223#[derive(Clone, Debug, Serialize)]
224#[cfg_attr(feature = "alloc", derive(Deserialize))]
225pub struct InstanceMessage<'a> {
226    /// The thread this message belongs to
227    pub thread_id: ThreadId,
228
229    /// The telemetry message content
230    #[serde(borrow)]
231    pub message: TelemetryMessage<'a>,
232}
233
234#[cfg(feature = "alloc")]
235impl ToStatic for InstanceMessage<'_> {
236    type Static = InstanceMessage<'static>;
237
238    fn to_static(&self) -> Self::Static {
239        InstanceMessage {
240            thread_id: self.thread_id,
241            message: self.message.to_static(),
242        }
243    }
244}
245
246/// An enumeration of all possible telemetry message types.
247///
248/// This enum represents the different categories of telemetry data that can be
249/// collected and exported by the system.
250#[derive(Clone, Debug, Serialize)]
251#[cfg_attr(feature = "alloc", derive(Deserialize))]
252pub enum TelemetryMessage<'a> {
253    /// A structured log message with severity and attributes
254    Log(#[serde(borrow)] LogMessage<'a>),
255    /// A time synchronization message for clock coordination
256    TimeSync(TimeSyncMessage),
257    /// A distributed tracing message (spans, events, links)
258    Tracing(#[serde(borrow)] TracingMessage<'a>),
259}
260
261#[cfg(feature = "alloc")]
262impl ToStatic for TelemetryMessage<'_> {
263    type Static = TelemetryMessage<'static>;
264
265    fn to_static(&self) -> Self::Static {
266        match self {
267            TelemetryMessage::Log(msg) => TelemetryMessage::Log(msg.to_static()),
268            TelemetryMessage::TimeSync(msg) => TelemetryMessage::TimeSync(msg.clone()),
269            TelemetryMessage::Tracing(msg) => TelemetryMessage::Tracing(msg.to_static()),
270        }
271    }
272}
273
274/// Log message severity levels.
275///
276/// These levels follow standard logging conventions, ordered from most verbose
277/// to most critical.
278#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
279pub enum Severity {
280    /// The "trace" level.
281    ///
282    /// Designates very low priority, often extremely verbose, information.
283    Trace,
284    /// The "debug" level.
285    ///
286    /// Designates lower priority information.
287    Debug,
288    /// The "info" level.
289    ///
290    /// Designates useful information.
291    Info,
292    /// The "warn" level.
293    ///
294    /// Designates hazardous situations.
295    Warn,
296    /// The "error" level.
297    ///
298    /// Designates very serious errors.
299    Error,
300    /// The "fatal" level.
301    ///
302    /// Designates critical failures that might crash the program.
303    Fatal,
304}
305
306/// A structured log message with severity, timestamp, and attributes.
307///
308/// Log messages can be optionally correlated with traces by including trace and span IDs when available.
309#[derive(Clone, Debug, Serialize)]
310#[cfg_attr(feature = "alloc", derive(Deserialize))]
311pub struct LogMessage<'a> {
312    /// Timestamp in nanoseconds since Unix epoch (or system start)
313    pub time_unix_nano: u64,
314    /// The severity level of this log message
315    pub severity: Severity,
316
317    /// The message body
318    #[serde(borrow)]
319    pub body: StringType<'a>,
320
321    /// Key-value attributes providing additional context
322    #[serde(borrow)]
323    pub attributes: AttributeListType<'a>,
324}
325
326#[cfg(feature = "alloc")]
327impl ToStatic for LogMessage<'_> {
328    type Static = LogMessage<'static>;
329
330    fn to_static(&self) -> Self::Static {
331        LogMessage {
332            time_unix_nano: self.time_unix_nano,
333            severity: self.severity,
334            body: self.body.to_static(),
335            attributes: self.attributes.to_static(),
336        }
337    }
338}
339
340/// A time synchronization message for coordinating clocks between systems.
341///
342/// This message contains both local time and absolute time information,
343/// allowing downstream systems to correlate local timestamps with real-world time.
344#[derive(Clone, Debug, Serialize, Deserialize)]
345pub struct TimeSyncMessage {
346    /// Local timestamp in system-specific units
347    pub local_timestamp: u64,
348    /// Time since Unix epoch in nanoseconds
349    pub since_epoch: u64,
350}
351
352/// Messages related to distributed tracing operations.
353///
354/// This enum encompasses all the different types of tracing messages that can be
355/// generated during span lifecycle management and tracing operations.
356#[derive(Clone, Debug, Serialize)]
357#[cfg_attr(feature = "alloc", derive(Deserialize))]
358pub enum TracingMessage<'a> {
359    /// A new span has been created
360    CreateSpan(#[serde(borrow)] SpanCreateMessage<'a>),
361    /// A span has been entered (made current)
362    EnterSpan(SpanEnterMessage),
363    /// A span has been exited (no longer current)
364    ExitSpan(SpanExitMessage),
365    /// A span has been closed (completed)
366    CloseSpan(SpanCloseMessage),
367    /// An event has been added to a span
368    AddEvent(#[serde(borrow)] SpanAddEventMessage<'a>),
369    /// A link has been added to a span
370    AddLink(SpanAddLinkMessage),
371    /// An attribute has been set on a span
372    SetAttribute(#[serde(borrow)] SpanSetAttributeMessage<'a>),
373}
374
375#[cfg(feature = "alloc")]
376impl ToStatic for TracingMessage<'_> {
377    type Static = TracingMessage<'static>;
378
379    fn to_static(&self) -> Self::Static {
380        match self {
381            TracingMessage::CreateSpan(msg) => TracingMessage::CreateSpan(msg.to_static()),
382            TracingMessage::EnterSpan(msg) => TracingMessage::EnterSpan(*msg),
383            TracingMessage::ExitSpan(msg) => TracingMessage::ExitSpan(*msg),
384            TracingMessage::CloseSpan(msg) => TracingMessage::CloseSpan(*msg),
385            TracingMessage::AddEvent(msg) => TracingMessage::AddEvent(msg.to_static()),
386            TracingMessage::AddLink(msg) => TracingMessage::AddLink(*msg),
387            TracingMessage::SetAttribute(msg) => TracingMessage::SetAttribute(msg.to_static()),
388        }
389    }
390}
391
392/// Message indicating the creation of a new span.
393///
394/// This message provides all the information needed to create a new span
395/// in the trace, including its identity, timing, and initial attributes.
396#[derive(Clone, Debug, Serialize)]
397#[cfg_attr(feature = "alloc", derive(Deserialize))]
398pub struct SpanCreateMessage<'a> {
399    /// The unique identifier (within the associated process) for this span.
400    pub span_id: SpanId,
401
402    /// The name of the span
403    #[serde(borrow)]
404    pub name: StringType<'a>,
405
406    /// Timestamp when the span was started
407    pub start_time_unix_nano: u64,
408
409    /// Initial attributes attached to the span
410    #[serde(borrow)]
411    pub attributes: AttributeListType<'a>,
412}
413
414#[cfg(feature = "alloc")]
415impl ToStatic for SpanCreateMessage<'_> {
416    type Static = SpanCreateMessage<'static>;
417
418    fn to_static(&self) -> Self::Static {
419        SpanCreateMessage {
420            span_id: self.span_id,
421            name: self.name.to_static(),
422            start_time_unix_nano: self.start_time_unix_nano,
423            attributes: self.attributes.to_static(),
424        }
425    }
426}
427
428/// Message indicating a span has been entered.
429#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
430pub struct SpanEnterMessage {
431    /// The span being entered
432    pub span_id: SpanId,
433
434    /// Timestamp when the span was entered
435    pub time_unix_nano: u64,
436}
437
438/// Message indicating a span has been exited.
439#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
440pub struct SpanExitMessage {
441    /// The span being exited
442    pub span_id: SpanId,
443
444    /// Timestamp when the span was exited
445    pub time_unix_nano: u64,
446}
447
448/// Message indicating a span has been closed (completed).
449#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
450pub struct SpanCloseMessage {
451    /// The span being closed
452    pub span_id: SpanId,
453
454    /// Timestamp when the span was closed
455    pub end_time_unix_nano: u64,
456}
457
458/// Message indicating an attribute has been set on a span.
459#[derive(Clone, Debug, Serialize, Deserialize)]
460pub struct SpanSetAttributeMessage<'a> {
461    /// The span the attribute is being set on, if [`None`] then this applies to the "current span"
462    /// as determined by tracking [`SpanEnterMessage`] and [`SpanExitMessage`] pairs.
463    pub span_id: Option<SpanId>,
464
465    /// The attribute being set
466    #[serde(borrow)]
467    pub attribute: KeyValue<'a>,
468}
469
470#[cfg(feature = "alloc")]
471impl ToStatic for SpanSetAttributeMessage<'_> {
472    type Static = SpanSetAttributeMessage<'static>;
473
474    fn to_static(&self) -> Self::Static {
475        SpanSetAttributeMessage {
476            span_id: self.span_id,
477            attribute: self.attribute.to_static(),
478        }
479    }
480}
481
482/// Message indicating an event has been added to a span.
483#[derive(Clone, Debug, Serialize)]
484#[cfg_attr(feature = "alloc", derive(Deserialize))]
485pub struct SpanAddEventMessage<'a> {
486    /// The span the event is being added to, if [`None`] then this applies to the "current span"
487    /// as determined by tracking [`SpanEnterMessage`] and [`SpanExitMessage`] pairs.
488    pub span_id: Option<SpanId>,
489
490    /// The name of the event
491    #[serde(borrow)]
492    pub name: StringType<'a>,
493
494    /// Timestamp when the event occurred
495    pub time_unix_nano: u64,
496
497    /// Attributes providing additional context for the event
498    #[serde(borrow)]
499    pub attributes: AttributeListType<'a>,
500}
501
502#[cfg(feature = "alloc")]
503impl ToStatic for SpanAddEventMessage<'_> {
504    type Static = SpanAddEventMessage<'static>;
505
506    fn to_static(&self) -> Self::Static {
507        SpanAddEventMessage {
508            span_id: self.span_id,
509            name: self.name.to_static(),
510            time_unix_nano: self.time_unix_nano,
511            attributes: self.attributes.to_static(),
512        }
513    }
514}
515
516/// Message indicating a link has been added to a span.
517///
518/// Links connect spans across different traces, representing relationships
519/// that are not parent-child hierarchies.
520#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
521pub struct SpanAddLinkMessage {
522    /// The span the link is being added to, if [`None`] then this applies to the "current span"
523    /// as determined by tracking [`SpanEnterMessage`] and [`SpanExitMessage`] pairs.
524    pub span_id: Option<SpanId>,
525
526    /// The span context being linked to
527    pub link: SpanContext,
528}
529
530#[cfg(test)]
531#[cfg_attr(coverage_nightly, coverage(off))]
532mod tests {
533    use alloc::format;
534    #[cfg(feature = "alloc")]
535    use alloc::string::String;
536
537    use super::*;
538
539    #[test]
540    fn thread_id_format_from_str_roundtrip() {
541        let test_cases = [
542            ThreadId::from_raw(ProcessId::from_raw(0), NonZeroU64::new(1).unwrap()),
543            ThreadId::from_raw(
544                ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
545                NonZeroU64::new(0xFEDCBA9876543210).unwrap(),
546            ),
547            ThreadId::from_raw(
548                ProcessId::from_raw(u128::MAX),
549                NonZeroU64::new(u64::MAX).unwrap(),
550            ),
551            ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
552        ];
553
554        for thread_id in test_cases {
555            let formatted = format!("{thread_id}");
556            let parsed = formatted.parse::<ThreadId>().unwrap();
557            assert_eq!(
558                thread_id,
559                parsed,
560                "Failed roundtrip for {:#x}:{:#x}",
561                thread_id.process.to_raw(),
562                thread_id.raw,
563            );
564        }
565    }
566
567    #[test]
568    fn thread_id_serde_roundtrip() {
569        let test_cases = [
570            ThreadId::from_raw(ProcessId::from_raw(0), NonZeroU64::new(1).unwrap()),
571            ThreadId::from_raw(
572                ProcessId::from_raw(0x123456789ABCDEF0FEDCBA9876543210),
573                NonZeroU64::new(0xFEDCBA9876543210).unwrap(),
574            ),
575            ThreadId::from_raw(
576                ProcessId::from_raw(u128::MAX),
577                NonZeroU64::new(u64::MAX).unwrap(),
578            ),
579            ThreadId::from_raw(ProcessId::from_raw(1), NonZeroU64::new(1).unwrap()),
580        ];
581
582        for original in test_cases {
583            let json = serde_json::to_string(&original).unwrap();
584            let deserialized: ThreadId = serde_json::from_str(&json).unwrap();
585            assert_eq!(original, deserialized);
586        }
587    }
588
589    #[test]
590    fn string_type_conversions() {
591        let static_str: StringType<'static> = "static".into();
592
593        let _event = SpanAddEventMessage {
594            span_id: Some(SpanId(0)),
595            name: static_str,
596            time_unix_nano: 0,
597            attributes: attribute_list_from_slice(&[]),
598        };
599
600        let borrowed_str: StringType = "borrowed".into();
601
602        let _event = SpanAddEventMessage {
603            span_id: Some(SpanId(0)),
604            name: borrowed_str,
605            time_unix_nano: 0,
606            attributes: attribute_list_from_slice(&[]),
607        };
608    }
609
610    #[cfg(any(feature = "std", feature = "alloc"))]
611    #[test]
612    fn string_type_with_owned_strings() {
613        let string = String::from("owned");
614        let owned: StringType<'static> = StringType::from(string);
615
616        let _event = SpanAddEventMessage {
617            span_id: Some(SpanId(0)),
618            name: owned,
619            time_unix_nano: 0,
620            attributes: attribute_list_from_slice(&[]),
621        };
622    }
623
624    #[cfg(feature = "alloc")]
625    #[test]
626    fn to_static_conversion() {
627        use alloc::string::String;
628
629        use crate::value::Value;
630
631        // Create some data with non-static lifetime
632        let borrowed_name_str = "test_span";
633        let borrowed_name: StringType = borrowed_name_str.into();
634
635        let owned_key = String::from("test_key");
636        let owned_value = String::from("test_value");
637        let attribute = KeyValue {
638            key: owned_key.as_str().into(),
639            value: Value::String(owned_value.as_str().into()),
640        };
641
642        let attributes = [attribute];
643        let span_event = SpanAddEventMessage {
644            span_id: Some(SpanId(0)),
645            name: borrowed_name,
646            time_unix_nano: 0,
647            attributes: attribute_list_from_slice(&attributes),
648        };
649
650        let tracing_message = TracingMessage::AddEvent(span_event);
651        let telemetry_message = TelemetryMessage::Tracing(tracing_message);
652        let instance_message = InstanceMessage {
653            thread_id: ThreadId::from_raw(ProcessId::from_raw(999), NonZeroU64::new(111).unwrap()),
654            message: telemetry_message,
655        };
656
657        let static_message: InstanceMessage<'static> = instance_message.to_static();
658
659        // Verify the conversion worked - the static message should have the same data
660        if let TelemetryMessage::Tracing(TracingMessage::AddEvent(span_event)) =
661            &static_message.message
662        {
663            assert_eq!(span_event.name.as_ref(), "test_span");
664        } else {
665            panic!("Expected CreateSpan message");
666        }
667    }
668}