moqtap_codec/draft18/
data_stream.rs1use bytes::{Buf, BufMut};
23
24use crate::error::CodecError;
25use crate::varint::VarInt;
26
27const SUBGROUP_PROPERTIES_BIT: u8 = 0x01;
30const SUBGROUP_ID_MODE_MASK: u8 = 0x06;
31const SUBGROUP_END_OF_GROUP_BIT: u8 = 0x08;
32const SUBGROUP_BASE_BIT: u8 = 0x10;
33const SUBGROUP_DEFAULT_PRIORITY_BIT: u8 = 0x20;
34const SUBGROUP_FIRST_OBJECT_BIT: u8 = 0x40;
35
36#[derive(Debug, Clone)]
37pub struct SubgroupHeader {
38 pub header_type: u8,
39 pub track_alias: VarInt,
40 pub group_id: VarInt,
41 pub subgroup_id: VarInt,
42 pub publisher_priority: Option<u8>,
43}
44
45impl SubgroupHeader {
46 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
47 if buf.remaining() < 1 {
48 return Err(CodecError::UnexpectedEnd);
49 }
50 let header_type = buf.get_u8();
51
52 if header_type & SUBGROUP_BASE_BIT == 0 {
54 return Err(CodecError::InvalidField);
55 }
56
57 let track_alias = VarInt::decode(buf)?;
58 let group_id = VarInt::decode(buf)?;
59
60 let subgroup_id_mode = (header_type & SUBGROUP_ID_MODE_MASK) >> 1;
61 let subgroup_id = match subgroup_id_mode {
62 0 => VarInt::from_u64(0).unwrap(),
63 2 => VarInt::decode(buf)?,
64 _ => VarInt::from_u64(0).unwrap(),
67 };
68
69 let publisher_priority = if header_type & SUBGROUP_DEFAULT_PRIORITY_BIT == 0 {
70 if buf.remaining() < 1 {
71 return Err(CodecError::UnexpectedEnd);
72 }
73 Some(buf.get_u8())
74 } else {
75 None
76 };
77
78 Ok(SubgroupHeader { header_type, track_alias, group_id, subgroup_id, publisher_priority })
79 }
80
81 pub fn encode(&self, buf: &mut impl BufMut) {
82 buf.put_u8(self.header_type);
83 self.track_alias.encode(buf);
84 self.group_id.encode(buf);
85
86 let subgroup_id_mode = (self.header_type & SUBGROUP_ID_MODE_MASK) >> 1;
87 if subgroup_id_mode == 2 {
88 self.subgroup_id.encode(buf);
89 }
90
91 if self.header_type & SUBGROUP_DEFAULT_PRIORITY_BIT == 0 {
92 buf.put_u8(self.publisher_priority.unwrap_or(128));
93 }
94 }
95
96 pub fn has_properties(&self) -> bool {
97 self.header_type & SUBGROUP_PROPERTIES_BIT != 0
98 }
99
100 pub fn is_end_of_group(&self) -> bool {
101 self.header_type & SUBGROUP_END_OF_GROUP_BIT != 0
102 }
103
104 pub fn is_first_object(&self) -> bool {
108 self.header_type & SUBGROUP_FIRST_OBJECT_BIT != 0
109 }
110}
111
112#[derive(Debug, Clone)]
120pub struct SubgroupObject {
121 pub object_id: VarInt,
122 pub extension_headers: Vec<u8>,
126 pub payload_length: VarInt,
127 pub object_status: Option<VarInt>,
128 pub payload: Vec<u8>,
129}
130
131#[derive(Debug, Clone)]
132pub struct SubgroupObjectReader {
133 extensions_present: bool,
134 prev_object_id: Option<u64>,
135}
136
137impl SubgroupObjectReader {
138 pub fn new(header: &SubgroupHeader) -> Self {
139 Self { extensions_present: header.has_properties(), prev_object_id: None }
140 }
141
142 pub fn read_object(&mut self, buf: &mut impl Buf) -> Result<SubgroupObject, CodecError> {
143 let delta = VarInt::decode(buf)?.into_inner();
144 let object_id_val = match self.prev_object_id {
145 None => delta,
146 Some(prev) => prev
147 .checked_add(1)
148 .and_then(|v| v.checked_add(delta))
149 .ok_or(CodecError::InvalidField)?,
150 };
151 self.prev_object_id = Some(object_id_val);
152 let object_id = VarInt::from_u64(object_id_val).map_err(|_| CodecError::InvalidField)?;
153
154 let extension_headers = if self.extensions_present {
155 let mut out: Vec<u8> = Vec::new();
156 let ext_count = VarInt::decode(buf)?;
157 ext_count.encode(&mut out);
158 let count = ext_count.into_inner();
159 for _ in 0..count {
160 let key = VarInt::decode(buf)?;
161 let vlen = VarInt::decode(buf)?;
162 let vlen_usize = vlen.into_inner() as usize;
163 if buf.remaining() < vlen_usize {
164 return Err(CodecError::UnexpectedEnd);
165 }
166 key.encode(&mut out);
167 vlen.encode(&mut out);
168 let value = buf.copy_to_bytes(vlen_usize);
169 out.extend_from_slice(&value);
170 }
171 out
172 } else {
173 Vec::new()
174 };
175
176 let payload_length_vi = VarInt::decode(buf)?;
177 let payload_length_val = payload_length_vi.into_inner() as usize;
178 let (object_status, payload) = if payload_length_val == 0 {
179 let status = VarInt::decode(buf)?;
180 (Some(status), Vec::new())
181 } else {
182 let payload = crate::types::read_bytes(buf, payload_length_val)?;
183 (None, payload)
184 };
185
186 Ok(SubgroupObject {
187 object_id,
188 extension_headers,
189 payload_length: payload_length_vi,
190 object_status,
191 payload,
192 })
193 }
194
195 pub fn write_object(
196 &mut self,
197 object: &SubgroupObject,
198 buf: &mut impl BufMut,
199 ) -> Result<(), CodecError> {
200 let oid = object.object_id.into_inner();
201 let delta = match self.prev_object_id {
202 None => oid,
203 Some(prev) => oid
204 .checked_sub(prev)
205 .and_then(|v| v.checked_sub(1))
206 .ok_or(CodecError::InvalidField)?,
207 };
208 VarInt::from_u64(delta).map_err(|_| CodecError::InvalidField)?.encode(buf);
209 if self.extensions_present {
210 buf.put_slice(&object.extension_headers);
211 }
212 object.payload_length.encode(buf);
213 if object.payload_length.into_inner() == 0 {
214 if let Some(s) = &object.object_status {
215 s.encode(buf);
216 } else {
217 VarInt::from_u64(0).unwrap().encode(buf);
218 }
219 } else {
220 buf.put_slice(&object.payload);
221 }
222 self.prev_object_id = Some(oid);
223 Ok(())
224 }
225}
226
227const DATAGRAM_PROPERTIES_BIT: u8 = 0x01;
230const DATAGRAM_END_OF_GROUP_BIT: u8 = 0x02;
231const DATAGRAM_ZERO_OBJECT_ID_BIT: u8 = 0x04;
232const DATAGRAM_DEFAULT_PRIORITY_BIT: u8 = 0x08;
233const DATAGRAM_STATUS_BIT: u8 = 0x20;
234
235#[derive(Debug, Clone)]
236pub struct DatagramHeader {
237 pub datagram_type: u8,
238 pub track_alias: VarInt,
239 pub group_id: VarInt,
240 pub object_id: VarInt,
241 pub publisher_priority: Option<u8>,
242 pub object_status: Option<u8>,
243}
244
245impl DatagramHeader {
246 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
247 if buf.remaining() < 1 {
248 return Err(CodecError::UnexpectedEnd);
249 }
250 let datagram_type = buf.get_u8();
251
252 let track_alias = VarInt::decode(buf)?;
253 let group_id = VarInt::decode(buf)?;
254
255 let object_id = if datagram_type & DATAGRAM_ZERO_OBJECT_ID_BIT != 0 {
256 VarInt::from_usize(0)
257 } else {
258 VarInt::decode(buf)?
259 };
260
261 let publisher_priority = if datagram_type & DATAGRAM_DEFAULT_PRIORITY_BIT == 0 {
262 if buf.remaining() < 1 {
263 return Err(CodecError::UnexpectedEnd);
264 }
265 Some(buf.get_u8())
266 } else {
267 None
268 };
269
270 if datagram_type & DATAGRAM_PROPERTIES_BIT != 0 {
272 let props_len = VarInt::decode(buf)?.into_inner() as usize;
273 if buf.remaining() < props_len {
274 return Err(CodecError::UnexpectedEnd);
275 }
276 buf.advance(props_len);
277 }
278
279 let object_status = if datagram_type & DATAGRAM_STATUS_BIT != 0 {
280 if buf.remaining() < 1 {
281 return Err(CodecError::UnexpectedEnd);
282 }
283 Some(buf.get_u8())
284 } else {
285 None
286 };
287
288 Ok(DatagramHeader {
289 datagram_type,
290 track_alias,
291 group_id,
292 object_id,
293 publisher_priority,
294 object_status,
295 })
296 }
297
298 pub fn encode(&self, buf: &mut impl BufMut) {
299 buf.put_u8(self.datagram_type);
300 self.track_alias.encode(buf);
301 self.group_id.encode(buf);
302
303 if self.datagram_type & DATAGRAM_ZERO_OBJECT_ID_BIT == 0 {
304 self.object_id.encode(buf);
305 }
306
307 if self.datagram_type & DATAGRAM_DEFAULT_PRIORITY_BIT == 0 {
308 buf.put_u8(self.publisher_priority.unwrap_or(128));
309 }
310
311 if self.datagram_type & DATAGRAM_STATUS_BIT != 0 {
312 buf.put_u8(self.object_status.unwrap_or(0));
313 }
314 }
315
316 pub fn is_end_of_group(&self) -> bool {
317 self.datagram_type & DATAGRAM_END_OF_GROUP_BIT != 0
318 }
319
320 pub fn has_status(&self) -> bool {
321 self.datagram_type & DATAGRAM_STATUS_BIT != 0
322 }
323}
324
325const FETCH_STREAM_TYPE: u64 = 0x05;
328
329#[derive(Debug, Clone)]
330pub struct FetchHeader {
331 pub request_id: VarInt,
332}
333
334impl FetchHeader {
335 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
336 let stream_type = VarInt::decode(buf)?.into_inner();
337 if stream_type != FETCH_STREAM_TYPE {
338 return Err(CodecError::InvalidField);
339 }
340 let request_id = VarInt::decode(buf)?;
341 Ok(FetchHeader { request_id })
342 }
343
344 pub fn encode(&self, buf: &mut impl BufMut) {
345 VarInt::from_usize(FETCH_STREAM_TYPE as usize).encode(buf);
346 self.request_id.encode(buf);
347 }
348}