Skip to main content

moqtap_codec/draft18/
data_stream.rs

1//! Draft-18 data stream header encoding and decoding.
2//!
3//! Subgroup header type byte: bit 4 must be set; valid type ranges are
4//! 0x10..0x1F, 0x30..0x3F, 0x50..0x5F, 0x70..0x7F.
5//!   - bit 0 (0x01): PROPERTIES
6//!   - bits 1-2 (0x06): SUBGROUP_ID_MODE (0=zero, 1=first_obj, 2=explicit, 3=reserved)
7//!   - bit 3 (0x08): END_OF_GROUP
8//!   - bit 5 (0x20): DEFAULT_PRIORITY (no priority byte)
9//!   - bit 6 (0x40): FIRST_OBJECT (new in draft-18)
10//!
11//! Datagram type byte: 0b00X0XXXX (bit 4 always 0)
12//!   - bit 0 (0x01): PROPERTIES
13//!   - bit 1 (0x02): END_OF_GROUP
14//!   - bit 2 (0x04): ZERO_OBJECT_ID (object_id=0, field omitted)
15//!   - bit 3 (0x08): DEFAULT_PRIORITY (no priority byte)
16//!   - bit 5 (0x20): STATUS (status byte replaces payload)
17//!
18//! Fetch header: stream type 0x05 + request_id. Fetch objects use
19//! delta-encoded Group ID and Object ID (the first object's deltas are
20//! interpreted as absolute values).
21
22use bytes::{Buf, BufMut};
23
24use crate::error::CodecError;
25use crate::varint::VarInt;
26
27// ── Subgroup ──────────────────────────────────────────────────
28
29const SUBGROUP_PROPERTIES_BIT: u8 = 0x01;
30const SUBGROUP_ID_MODE_MASK: u8 = 0x06;
31const SUBGROUP_END_OF_GROUP_BIT: u8 = 0x08;
32const SUBGROUP_BASE_BIT: u8 = 0x10;
33const SUBGROUP_DEFAULT_PRIORITY_BIT: u8 = 0x20;
34const SUBGROUP_FIRST_OBJECT_BIT: u8 = 0x40;
35
36#[derive(Debug, Clone)]
37pub struct SubgroupHeader {
38    pub header_type: u8,
39    pub track_alias: VarInt,
40    pub group_id: VarInt,
41    pub subgroup_id: VarInt,
42    pub publisher_priority: Option<u8>,
43}
44
45impl SubgroupHeader {
46    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
47        if buf.remaining() < 1 {
48            return Err(CodecError::UnexpectedEnd);
49        }
50        let header_type = buf.get_u8();
51
52        // Validate: bit 4 must be set
53        if header_type & SUBGROUP_BASE_BIT == 0 {
54            return Err(CodecError::InvalidField);
55        }
56
57        let track_alias = VarInt::decode(buf)?;
58        let group_id = VarInt::decode(buf)?;
59
60        let subgroup_id_mode = (header_type & SUBGROUP_ID_MODE_MASK) >> 1;
61        let subgroup_id = match subgroup_id_mode {
62            0 => VarInt::from_u64(0).unwrap(),
63            2 => VarInt::decode(buf)?,
64            // Modes 1 and 3: mode 1 = first object's ID (resolved later),
65            // mode 3 = reserved. Store 0 for now.
66            _ => VarInt::from_u64(0).unwrap(),
67        };
68
69        let publisher_priority = if header_type & SUBGROUP_DEFAULT_PRIORITY_BIT == 0 {
70            if buf.remaining() < 1 {
71                return Err(CodecError::UnexpectedEnd);
72            }
73            Some(buf.get_u8())
74        } else {
75            None
76        };
77
78        Ok(SubgroupHeader { header_type, track_alias, group_id, subgroup_id, publisher_priority })
79    }
80
81    pub fn encode(&self, buf: &mut impl BufMut) {
82        buf.put_u8(self.header_type);
83        self.track_alias.encode(buf);
84        self.group_id.encode(buf);
85
86        let subgroup_id_mode = (self.header_type & SUBGROUP_ID_MODE_MASK) >> 1;
87        if subgroup_id_mode == 2 {
88            self.subgroup_id.encode(buf);
89        }
90
91        if self.header_type & SUBGROUP_DEFAULT_PRIORITY_BIT == 0 {
92            buf.put_u8(self.publisher_priority.unwrap_or(128));
93        }
94    }
95
96    pub fn has_properties(&self) -> bool {
97        self.header_type & SUBGROUP_PROPERTIES_BIT != 0
98    }
99
100    pub fn is_end_of_group(&self) -> bool {
101        self.header_type & SUBGROUP_END_OF_GROUP_BIT != 0
102    }
103
104    /// `true` when the FIRST_OBJECT bit (0x40) is set, signaling the first
105    /// object on this stream is the original publisher's first object in the
106    /// subgroup. Added in draft-18.
107    pub fn is_first_object(&self) -> bool {
108        self.header_type & SUBGROUP_FIRST_OBJECT_BIT != 0
109    }
110}
111
112// ── Subgroup objects (stateful) ───────────────────────────────
113
114/// One object within a draft-18 subgroup stream. Object IDs are
115/// delta-encoded; whether a per-object "properties" block (the draft-18
116/// equivalent of extension headers) is present depends on the PROPERTIES
117/// bit on the enclosing [`SubgroupHeader`]. Use [`SubgroupObjectReader`]
118/// to encode/decode.
119#[derive(Debug, Clone)]
120pub struct SubgroupObject {
121    pub object_id: VarInt,
122    /// Raw properties bytes (empty unless the subgroup header sets the
123    /// PROPERTIES bit). When present, holds the `ext_count` varint
124    /// followed by each property's `key`, `vlen`, and value.
125    pub extension_headers: Vec<u8>,
126    pub payload_length: VarInt,
127    pub object_status: Option<VarInt>,
128    pub payload: Vec<u8>,
129}
130
131#[derive(Debug, Clone)]
132pub struct SubgroupObjectReader {
133    extensions_present: bool,
134    prev_object_id: Option<u64>,
135}
136
137impl SubgroupObjectReader {
138    pub fn new(header: &SubgroupHeader) -> Self {
139        Self { extensions_present: header.has_properties(), prev_object_id: None }
140    }
141
142    pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
143        let delta = VarInt::decode(buf)?.into_inner();
144        let object_id_val = match self.prev_object_id {
145            None => delta,
146            Some(prev) => prev
147                .checked_add(1)
148                .and_then(|v| v.checked_add(delta))
149                .ok_or(CodecError::InvalidField)?,
150        };
151        self.prev_object_id = Some(object_id_val);
152        let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
153
154        let extension_headers = if self.extensions_present {
155            let mut out: Vec<u8> = Vec::new();
156            let ext_count = VarInt::decode(buf)?;
157            ext_count.encode(&mut out);
158            let count = ext_count.into_inner();
159            for _ in 0..count {
160                let key = VarInt::decode(buf)?;
161                let vlen = VarInt::decode(buf)?;
162                let vlen_usize = vlen.into_inner() as usize;
163                if buf.remaining() < vlen_usize {
164                    return Err(CodecError::UnexpectedEnd);
165                }
166                key.encode(&mut out);
167                vlen.encode(&mut out);
168                let value = buf.copy_to_bytes(vlen_usize);
169                out.extend_from_slice(&value);
170            }
171            out
172        } else {
173            Vec::new()
174        };
175
176        let payload_length_vi = VarInt::decode(buf)?;
177        let payload_length_val = payload_length_vi.into_inner() as usize;
178        let (object_status, payload) = if payload_length_val == 0 {
179            let status = VarInt::decode(buf)?;
180            (Some(status), Vec::new())
181        } else {
182            let payload = crate::types::read_bytes(buf, payload_length_val)?;
183            (None, payload)
184        };
185
186        Ok(SubgroupObject {
187            object_id,
188            extension_headers,
189            payload_length: payload_length_vi,
190            object_status,
191            payload,
192        })
193    }
194
195    pub fn write_object(
196        &mut self,
197        object: &SubgroupObject,
198        buf: &mut impl BufMut,
199    ) -> Result<(), CodecError> {
200        let oid = object.object_id.into_inner();
201        let delta = match self.prev_object_id {
202            None => oid,
203            Some(prev) => oid
204                .checked_sub(prev)
205                .and_then(|v| v.checked_sub(1))
206                .ok_or(CodecError::InvalidField)?,
207        };
208        VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
209        if self.extensions_present {
210            buf.put_slice(&object.extension_headers);
211        }
212        object.payload_length.encode(buf);
213        if object.payload_length.into_inner() == 0 {
214            if let Some(s) = &object.object_status {
215                s.encode(buf);
216            } else {
217                VarInt::from_u64(0).unwrap().encode(buf);
218            }
219        } else {
220            buf.put_slice(&object.payload);
221        }
222        self.prev_object_id = Some(oid);
223        Ok(())
224    }
225}
226
227// ── Datagram ──────────────────────────────────────────────────
228
229const DATAGRAM_PROPERTIES_BIT: u8 = 0x01;
230const DATAGRAM_END_OF_GROUP_BIT: u8 = 0x02;
231const DATAGRAM_ZERO_OBJECT_ID_BIT: u8 = 0x04;
232const DATAGRAM_DEFAULT_PRIORITY_BIT: u8 = 0x08;
233const DATAGRAM_STATUS_BIT: u8 = 0x20;
234
235#[derive(Debug, Clone)]
236pub struct DatagramHeader {
237    pub datagram_type: u8,
238    pub track_alias: VarInt,
239    pub group_id: VarInt,
240    pub object_id: VarInt,
241    pub publisher_priority: Option<u8>,
242    pub object_status: Option<u8>,
243}
244
245impl DatagramHeader {
246    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
247        if buf.remaining() < 1 {
248            return Err(CodecError::UnexpectedEnd);
249        }
250        let datagram_type = buf.get_u8();
251
252        let track_alias = VarInt::decode(buf)?;
253        let group_id = VarInt::decode(buf)?;
254
255        let object_id = if datagram_type & DATAGRAM_ZERO_OBJECT_ID_BIT != 0 {
256            VarInt::from_usize(0)
257        } else {
258            VarInt::decode(buf)?
259        };
260
261        let publisher_priority = if datagram_type & DATAGRAM_DEFAULT_PRIORITY_BIT == 0 {
262            if buf.remaining() < 1 {
263                return Err(CodecError::UnexpectedEnd);
264            }
265            Some(buf.get_u8())
266        } else {
267            None
268        };
269
270        // Skip properties if present
271        if datagram_type & DATAGRAM_PROPERTIES_BIT != 0 {
272            let props_len = VarInt::decode(buf)?.into_inner() as usize;
273            if buf.remaining() < props_len {
274                return Err(CodecError::UnexpectedEnd);
275            }
276            buf.advance(props_len);
277        }
278
279        let object_status = if datagram_type & DATAGRAM_STATUS_BIT != 0 {
280            if buf.remaining() < 1 {
281                return Err(CodecError::UnexpectedEnd);
282            }
283            Some(buf.get_u8())
284        } else {
285            None
286        };
287
288        Ok(DatagramHeader {
289            datagram_type,
290            track_alias,
291            group_id,
292            object_id,
293            publisher_priority,
294            object_status,
295        })
296    }
297
298    pub fn encode(&self, buf: &mut impl BufMut) {
299        buf.put_u8(self.datagram_type);
300        self.track_alias.encode(buf);
301        self.group_id.encode(buf);
302
303        if self.datagram_type & DATAGRAM_ZERO_OBJECT_ID_BIT == 0 {
304            self.object_id.encode(buf);
305        }
306
307        if self.datagram_type & DATAGRAM_DEFAULT_PRIORITY_BIT == 0 {
308            buf.put_u8(self.publisher_priority.unwrap_or(128));
309        }
310
311        if self.datagram_type & DATAGRAM_STATUS_BIT != 0 {
312            buf.put_u8(self.object_status.unwrap_or(0));
313        }
314    }
315
316    pub fn is_end_of_group(&self) -> bool {
317        self.datagram_type & DATAGRAM_END_OF_GROUP_BIT != 0
318    }
319
320    pub fn has_status(&self) -> bool {
321        self.datagram_type & DATAGRAM_STATUS_BIT != 0
322    }
323}
324
325// ── Fetch Header ──────────────────────────────────────────────
326
327const FETCH_STREAM_TYPE: u64 = 0x05;
328
329#[derive(Debug, Clone)]
330pub struct FetchHeader {
331    pub request_id: VarInt,
332}
333
334impl FetchHeader {
335    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
336        let stream_type = VarInt::decode(buf)?.into_inner();
337        if stream_type != FETCH_STREAM_TYPE {
338            return Err(CodecError::InvalidField);
339        }
340        let request_id = VarInt::decode(buf)?;
341        Ok(FetchHeader { request_id })
342    }
343
344    pub fn encode(&self, buf: &mut impl BufMut) {
345        VarInt::from_usize(FETCH_STREAM_TYPE as usize).encode(buf);
346        self.request_id.encode(buf);
347    }
348}