Skip to main content

moqtap_codec/
data_stream.rs

1use crate::kvp::KeyValuePair;
2use crate::message::CodecError;
3use crate::types::*;
4use crate::varint::VarInt;
5use bytes::{Buf, BufMut};
6
7/// Subgroup stream header (unidirectional stream for subscription objects).
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct SubgroupHeader {
10    pub track_alias: VarInt,
11    pub group: VarInt,
12    pub subgroup: VarInt,
13    pub publisher_priority: u8,
14}
15
16/// Datagram header (for datagram forwarding preference).
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct DatagramHeader {
19    pub track_alias: VarInt,
20    pub group: VarInt,
21    pub object: VarInt,
22    pub publisher_priority: u8,
23}
24
25/// Object header within a subgroup stream or fetch response.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub struct ObjectHeader {
28    pub object_status: ObjectStatus,
29    /// Present unless status is DoesNotExist.
30    pub payload_length: Option<VarInt>,
31    /// Present when different from track default.
32    pub forwarding_preference: Option<ForwardingPreference>,
33    /// Present for Normal status when dependencies exist.
34    pub dependencies: Option<VarInt>,
35    /// Object extension key-value pairs.
36    pub extensions: Vec<KeyValuePair>,
37}
38
39/// Fetch response stream header.
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct FetchHeader {
42    pub track_alias: VarInt,
43    pub group: VarInt,
44    pub subgroup: VarInt,
45    pub publisher_priority: u8,
46}
47
48impl SubgroupHeader {
49    pub fn encode(&self, buf: &mut impl BufMut) {
50        self.track_alias.encode(buf);
51        self.group.encode(buf);
52        self.subgroup.encode(buf);
53        buf.put_u8(self.publisher_priority);
54    }
55
56    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
57        let track_alias = VarInt::decode(buf)?;
58        let group = VarInt::decode(buf)?;
59        let subgroup = VarInt::decode(buf)?;
60        if buf.remaining() < 1 {
61            return Err(CodecError::UnexpectedEnd);
62        }
63        let publisher_priority = buf.get_u8();
64        Ok(Self { track_alias, group, subgroup, publisher_priority })
65    }
66}
67
68impl DatagramHeader {
69    pub fn encode(&self, buf: &mut impl BufMut) {
70        self.track_alias.encode(buf);
71        self.group.encode(buf);
72        self.object.encode(buf);
73        buf.put_u8(self.publisher_priority);
74    }
75
76    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
77        let track_alias = VarInt::decode(buf)?;
78        let group = VarInt::decode(buf)?;
79        let object = VarInt::decode(buf)?;
80        if buf.remaining() < 1 {
81            return Err(CodecError::UnexpectedEnd);
82        }
83        let publisher_priority = buf.get_u8();
84        Ok(Self { track_alias, group, object, publisher_priority })
85    }
86}
87
88impl ObjectHeader {
89    pub fn encode(&self, buf: &mut impl BufMut) {
90        // Encode object status as VarInt
91        VarInt::from_u64(self.object_status as u64).unwrap().encode(buf);
92
93        // Encode flags: bit 0 = has forwarding_preference, bit 1 = has dependencies
94        let mut flags: u64 = 0;
95        if self.forwarding_preference.is_some() {
96            flags |= 0x01;
97        }
98        if self.dependencies.is_some() {
99            flags |= 0x02;
100        }
101        VarInt::from_u64(flags).unwrap().encode(buf);
102
103        // Payload length (present unless DoesNotExist)
104        if self.object_status != ObjectStatus::DoesNotExist {
105            if let Some(pl) = &self.payload_length {
106                pl.encode(buf);
107            }
108        }
109
110        // Optional forwarding preference
111        if let Some(fp) = &self.forwarding_preference {
112            VarInt::from_u64(*fp as u64).unwrap().encode(buf);
113        }
114
115        // Optional dependencies
116        if let Some(deps) = &self.dependencies {
117            deps.encode(buf);
118        }
119
120        // Extensions as KVP list
121        KeyValuePair::encode_list(&self.extensions, buf);
122    }
123
124    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
125        let status_val = VarInt::decode(buf)?.into_inner();
126        let object_status =
127            ObjectStatus::from_u8(status_val as u8).ok_or(CodecError::InvalidField)?;
128
129        let flags = VarInt::decode(buf)?.into_inner();
130        let has_forwarding_preference = flags & 0x01 != 0;
131        let has_dependencies = flags & 0x02 != 0;
132
133        let payload_length = if object_status != ObjectStatus::DoesNotExist {
134            Some(VarInt::decode(buf)?)
135        } else {
136            None
137        };
138
139        let forwarding_preference = if has_forwarding_preference {
140            let fp_val = VarInt::decode(buf)?.into_inner();
141            Some(ForwardingPreference::from_u8(fp_val as u8).ok_or(CodecError::InvalidField)?)
142        } else {
143            None
144        };
145
146        let dependencies = if has_dependencies { Some(VarInt::decode(buf)?) } else { None };
147
148        let extensions = KeyValuePair::decode_list(buf)?;
149
150        Ok(Self { object_status, payload_length, forwarding_preference, dependencies, extensions })
151    }
152}
153
154impl FetchHeader {
155    pub fn encode(&self, buf: &mut impl BufMut) {
156        self.track_alias.encode(buf);
157        self.group.encode(buf);
158        self.subgroup.encode(buf);
159        buf.put_u8(self.publisher_priority);
160    }
161
162    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
163        let track_alias = VarInt::decode(buf)?;
164        let group = VarInt::decode(buf)?;
165        let subgroup = VarInt::decode(buf)?;
166        if buf.remaining() < 1 {
167            return Err(CodecError::UnexpectedEnd);
168        }
169        let publisher_priority = buf.get_u8();
170        Ok(Self { track_alias, group, subgroup, publisher_priority })
171    }
172}