Skip to main content

moqtap_codec/draft18/
message.rs

1//! Draft-18 control message encoding and decoding.
2//!
3//! Key differences from draft-17:
4//! - `Required Request ID Delta` field removed from every request message.
5//! - SUBSCRIBE_NAMESPACE renumbered to 0x50 and `subscribe_options` removed.
6//! - New SUBSCRIBE_TRACKS message (0x51); FORWARD parameter belongs here.
7//! - PUBLISH_OK collapsed into REQUEST_OK (0x07); REQUEST_OK gains a trailing
8//!   Track Properties block (length implicit from message length).
9//! - GOAWAY gains an optional `request_id` (control stream only).
10//! - REQUEST_ERROR gains REDIRECT (0x34) carrying a Redirect structure
11//!   (connect_uri, track_namespace, track_name) appended after reason_phrase.
12//! - PUBLISH_DONE status codes 0x5/0x6 swapped: 0x5 = TOO_FAR_BEHIND,
13//!   0x6 = EXPIRED.
14//! - DELIVERY_TIMEOUT (0x02) renamed to OBJECT_DELIVERY_TIMEOUT;
15//!   new SUBGROUP_DELIVERY_TIMEOUT (0x06) and FILL_TIMEOUT (0x0A).
16//! - New TRACK_NAMESPACE_PREFIX parameter (0x34) for REQUEST_UPDATE
17//!   (length-prefixed encoded TrackNamespace).
18
19pub use crate::error::{
20    CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
21    MAX_REASON_PHRASE_LENGTH,
22};
23use crate::kvp::{KeyValuePair, KvpValue};
24use crate::types::*;
25use crate::varint::VarInt;
26use bytes::{Buf, BufMut};
27
28// ============================================================
29// Parameter encoding helpers for draft-18
30// ============================================================
31
32/// How a parameter value is encoded on the wire.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34enum ParamEncoding {
35    /// Bare varint.
36    Varint,
37    /// Single byte (uint8).
38    Uint8,
39    /// Length-prefixed bytes.
40    LengthPrefixed,
41}
42
43fn param_encoding(key: u64) -> Option<ParamEncoding> {
44    match key {
45        // 0x02 = OBJECT_DELIVERY_TIMEOUT (renamed from DELIVERY_TIMEOUT)
46        // 0x04 = MAX_CACHE_DURATION
47        // 0x06 = SUBGROUP_DELIVERY_TIMEOUT (new in draft-18)
48        // 0x08 = EXPIRES
49        // 0x0A = FILL_TIMEOUT (new in draft-18, FETCH only)
50        // 0x32 = NEW_GROUP_REQUEST
51        0x02 | 0x04 | 0x06 | 0x08 | 0x0A | 0x32 => Some(ParamEncoding::Varint),
52        // 0x10 = FORWARD, 0x20 = SUBSCRIBER_PRIORITY, 0x22 = GROUP_ORDER
53        0x10 | 0x20 | 0x22 => Some(ParamEncoding::Uint8),
54        // 0x03 = AUTHORIZATION_TOKEN
55        // 0x09 = LARGEST_OBJECT (length-prefixed in draft-18; was bare two
56        //         varints in draft-17)
57        // 0x21 = SUBSCRIPTION_FILTER
58        // 0x34 = TRACK_NAMESPACE_PREFIX (new in draft-18)
59        0x03 | 0x09 | 0x21 | 0x34 => Some(ParamEncoding::LengthPrefixed),
60        _ => None,
61    }
62}
63
64/// Decode a count-prefixed list of parameters with delta-encoded types.
65fn decode_parameters(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
66    let count = VarInt::decode(buf)?.into_inner() as usize;
67    let mut params = Vec::with_capacity(count);
68    let mut prev_key: u64 = 0;
69
70    for _ in 0..count {
71        let delta = VarInt::decode(buf)?.into_inner();
72        let abs_key = prev_key + delta;
73        prev_key = abs_key;
74
75        let encoding = param_encoding(abs_key).ok_or(CodecError::InvalidField)?;
76
77        let value = match encoding {
78            ParamEncoding::Varint => {
79                let v = VarInt::decode(buf)?;
80                KvpValue::Varint(v)
81            }
82            ParamEncoding::Uint8 => {
83                if buf.remaining() < 1 {
84                    return Err(CodecError::UnexpectedEnd);
85                }
86                let byte = buf.get_u8();
87                KvpValue::Varint(VarInt::from_u64(byte as u64).unwrap())
88            }
89            ParamEncoding::LengthPrefixed => {
90                let len = VarInt::decode(buf)?.into_inner() as usize;
91                let data = read_bytes(buf, len)?;
92                KvpValue::Bytes(data)
93            }
94        };
95
96        params.push(KeyValuePair { key: VarInt::from_u64(abs_key).unwrap(), value });
97    }
98    Ok(params)
99}
100
101/// Encode a count-prefixed list of parameters with delta-encoded types.
102fn encode_parameters(params: &[KeyValuePair], buf: &mut impl BufMut) {
103    VarInt::from_usize(params.len()).encode(buf);
104    let mut prev_key: u64 = 0;
105
106    for p in params {
107        let abs_key = p.key.into_inner();
108        let delta = abs_key - prev_key;
109        prev_key = abs_key;
110        VarInt::from_u64(delta).unwrap().encode(buf);
111
112        let encoding = param_encoding(abs_key);
113        match (&p.value, encoding) {
114            (KvpValue::Varint(v), Some(ParamEncoding::Varint)) => {
115                v.encode(buf);
116            }
117            (KvpValue::Varint(v), Some(ParamEncoding::Uint8)) => {
118                buf.put_u8(v.into_inner() as u8);
119            }
120            (KvpValue::Bytes(b), Some(ParamEncoding::LengthPrefixed)) => {
121                VarInt::from_usize(b.len()).encode(buf);
122                buf.put_slice(b);
123            }
124            _ => {
125                // Fallback: encode as KVP even/odd
126                match &p.value {
127                    KvpValue::Varint(v) => v.encode(buf),
128                    KvpValue::Bytes(b) => {
129                        VarInt::from_usize(b.len()).encode(buf);
130                        buf.put_slice(b);
131                    }
132                }
133            }
134        }
135    }
136}
137
138/// Decode delta-encoded KVPs with even/odd convention (for setup options
139/// and track properties). Read until buffer is exhausted.
140fn decode_kvp_delta(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
141    let mut pairs = Vec::new();
142    let mut prev_key: u64 = 0;
143
144    while buf.has_remaining() {
145        let delta = VarInt::decode(buf)?.into_inner();
146        let abs_key = prev_key + delta;
147        prev_key = abs_key;
148
149        let value = if abs_key % 2 == 0 {
150            let v = VarInt::decode(buf)?;
151            KvpValue::Varint(v)
152        } else {
153            let len = VarInt::decode(buf)?.into_inner() as usize;
154            let data = read_bytes(buf, len)?;
155            KvpValue::Bytes(data)
156        };
157
158        pairs.push(KeyValuePair { key: VarInt::from_u64(abs_key).unwrap(), value });
159    }
160    Ok(pairs)
161}
162
163/// Encode delta-encoded KVPs with even/odd convention.
164fn encode_kvp_delta(pairs: &[KeyValuePair], buf: &mut impl BufMut) {
165    let mut prev_key: u64 = 0;
166    for p in pairs {
167        let abs_key = p.key.into_inner();
168        let delta = abs_key - prev_key;
169        prev_key = abs_key;
170        VarInt::from_u64(delta).unwrap().encode(buf);
171        match &p.value {
172            KvpValue::Varint(v) => v.encode(buf),
173            KvpValue::Bytes(b) => {
174                VarInt::from_usize(b.len()).encode(buf);
175                buf.put_slice(b);
176            }
177        }
178    }
179}
180
181// ============================================================
182// Message Types
183// ============================================================
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
186#[repr(u64)]
187pub enum MessageType {
188    RequestUpdate = 0x02,
189    Subscribe = 0x03,
190    SubscribeOk = 0x04,
191    RequestError = 0x05,
192    PublishNamespace = 0x06,
193    /// REQUEST_OK (0x07). PUBLISH_OK is now an alias of this type.
194    RequestOk = 0x07,
195    Namespace = 0x08,
196    PublishDone = 0x0B,
197    TrackStatus = 0x0D,
198    NamespaceDone = 0x0E,
199    PublishBlocked = 0x0F,
200    GoAway = 0x10,
201    Fetch = 0x16,
202    FetchOk = 0x18,
203    Publish = 0x1D,
204    /// SUBSCRIBE_NAMESPACE (renumbered to 0x50 in draft-18).
205    SubscribeNamespace = 0x50,
206    /// SUBSCRIBE_TRACKS (new message in draft-18).
207    SubscribeTracks = 0x51,
208    Setup = 0x2F00,
209}
210
211impl MessageType {
212    pub fn from_id(id: u64) -> Option<Self> {
213        match id {
214            0x02 => Some(MessageType::RequestUpdate),
215            0x03 => Some(MessageType::Subscribe),
216            0x04 => Some(MessageType::SubscribeOk),
217            0x05 => Some(MessageType::RequestError),
218            0x06 => Some(MessageType::PublishNamespace),
219            0x07 => Some(MessageType::RequestOk),
220            0x08 => Some(MessageType::Namespace),
221            0x0B => Some(MessageType::PublishDone),
222            0x0D => Some(MessageType::TrackStatus),
223            0x0E => Some(MessageType::NamespaceDone),
224            0x0F => Some(MessageType::PublishBlocked),
225            0x10 => Some(MessageType::GoAway),
226            0x16 => Some(MessageType::Fetch),
227            0x18 => Some(MessageType::FetchOk),
228            0x1D => Some(MessageType::Publish),
229            0x50 => Some(MessageType::SubscribeNamespace),
230            0x51 => Some(MessageType::SubscribeTracks),
231            0x2F00 => Some(MessageType::Setup),
232            _ => None,
233        }
234    }
235
236    pub fn id(&self) -> u64 {
237        *self as u64
238    }
239}
240
241// ============================================================
242// Session Lifecycle Messages
243// ============================================================
244
245/// Unified SETUP (0x2F00).
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct Setup {
248    pub options: Vec<KeyValuePair>,
249}
250
251/// GOAWAY (0x10). Sent on the control stream (with `request_id`) or on an
252/// individual request stream (without `request_id`).
253#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct GoAway {
255    pub new_session_uri: Vec<u8>,
256    pub timeout: VarInt,
257    /// Present only when sent on the control stream — identifies the
258    /// smallest peer Request ID that may not have been processed.
259    pub request_id: Option<VarInt>,
260}
261
262// ============================================================
263// Consolidated Response Messages
264// ============================================================
265
266/// REQUEST_OK (0x07). Used as a generic OK response and as the alias for
267/// PUBLISH_OK / REQUEST_UPDATE_OK / TRACK_STATUS_OK / SUBSCRIBE_NAMESPACE_OK
268/// / PUBLISH_NAMESPACE_OK.
269///
270/// `track_properties` is only populated for TRACK_STATUS_OK; for every
271/// other shape it MUST be empty (length implicit from the message length).
272#[derive(Debug, Clone, PartialEq, Eq)]
273pub struct RequestOk {
274    pub parameters: Vec<KeyValuePair>,
275    pub track_properties: Vec<KeyValuePair>,
276}
277
278/// Optional Redirect structure carried in REQUEST_ERROR with code 0x34.
279#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct Redirect {
281    pub connect_uri: Vec<u8>,
282    pub track_namespace: TrackNamespace,
283    pub track_name: Vec<u8>,
284}
285
286/// REQUEST_ERROR (0x05). Adds an optional Redirect structure when
287/// `error_code` is REDIRECT (0x34).
288#[derive(Debug, Clone, PartialEq, Eq)]
289pub struct RequestError {
290    pub error_code: VarInt,
291    pub retry_interval: VarInt,
292    pub reason_phrase: Vec<u8>,
293    pub redirect: Option<Redirect>,
294}
295
296/// REQUEST_ERROR error codes that gain dedicated meaning in draft-18.
297pub mod request_error_codes {
298    /// New in draft-18: a Mandatory Track Property the receiver does not
299    /// understand.
300    pub const UNSUPPORTED_EXTENSION: u64 = 0x33;
301    /// New in draft-18: response carries a [`super::Redirect`] structure.
302    pub const REDIRECT: u64 = 0x34;
303}
304
305// ============================================================
306// Subscribe Messages
307// ============================================================
308
309#[derive(Debug, Clone, PartialEq, Eq)]
310pub struct Subscribe {
311    pub request_id: VarInt,
312    pub track_namespace: TrackNamespace,
313    pub track_name: Vec<u8>,
314    pub parameters: Vec<KeyValuePair>,
315}
316
317/// SUBSCRIBE_OK (0x04).
318#[derive(Debug, Clone, PartialEq, Eq)]
319pub struct SubscribeOk {
320    pub track_alias: VarInt,
321    pub parameters: Vec<KeyValuePair>,
322    pub track_properties: Vec<KeyValuePair>,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq)]
326pub struct RequestUpdate {
327    pub request_id: VarInt,
328    pub parameters: Vec<KeyValuePair>,
329}
330
331// ============================================================
332// Publish Messages
333// ============================================================
334
335#[derive(Debug, Clone, PartialEq, Eq)]
336pub struct Publish {
337    pub request_id: VarInt,
338    pub track_namespace: TrackNamespace,
339    pub track_name: Vec<u8>,
340    pub track_alias: VarInt,
341    pub parameters: Vec<KeyValuePair>,
342    pub track_properties: Vec<KeyValuePair>,
343}
344
345/// PUBLISH_DONE (0x0B). Status codes 0x5/0x6 are swapped vs draft-17.
346#[derive(Debug, Clone, PartialEq, Eq)]
347pub struct PublishDone {
348    pub status_code: VarInt,
349    pub stream_count: VarInt,
350    pub reason_phrase: Vec<u8>,
351}
352
353/// Numeric values for the [`PublishDone::status_code`] field.
354pub mod publish_done_codes {
355    /// Draft-18: TOO_FAR_BEHIND is 0x05 (was 0x06 in draft-17).
356    pub const TOO_FAR_BEHIND: u64 = 0x05;
357    /// Draft-18: EXPIRED is 0x06 (was 0x05 in draft-17).
358    pub const EXPIRED: u64 = 0x06;
359}
360
361// ============================================================
362// Publish Namespace Messages
363// ============================================================
364
365#[derive(Debug, Clone, PartialEq, Eq)]
366pub struct PublishNamespace {
367    pub request_id: VarInt,
368    pub track_namespace: TrackNamespace,
369    pub parameters: Vec<KeyValuePair>,
370}
371
372// ============================================================
373// Namespace Messages
374// ============================================================
375
376#[derive(Debug, Clone, PartialEq, Eq)]
377pub struct Namespace {
378    pub namespace_suffix: TrackNamespace,
379}
380
381#[derive(Debug, Clone, PartialEq, Eq)]
382pub struct NamespaceDone {
383    pub namespace_suffix: TrackNamespace,
384}
385
386// ============================================================
387// Subscribe Namespace / Tracks Messages
388// ============================================================
389
390/// SUBSCRIBE_NAMESPACE (0x50). Subscribes to NAMESPACE / NAMESPACE_DONE
391/// advertisements for namespaces matching `namespace_prefix`. The
392/// `subscribe_options` byte from draft-17 is removed; namespace subscriptions
393/// only produce NAMESPACE / NAMESPACE_DONE.
394#[derive(Debug, Clone, PartialEq, Eq)]
395pub struct SubscribeNamespace {
396    pub request_id: VarInt,
397    pub namespace_prefix: TrackNamespace,
398    pub parameters: Vec<KeyValuePair>,
399}
400
401/// SUBSCRIBE_TRACKS (0x51, new in draft-18). Subscribes to PUBLISH messages
402/// for tracks whose namespace matches `namespace_prefix`. Carries the
403/// FORWARD parameter (which previously lived on SUBSCRIBE_NAMESPACE).
404#[derive(Debug, Clone, PartialEq, Eq)]
405pub struct SubscribeTracks {
406    pub request_id: VarInt,
407    pub namespace_prefix: TrackNamespace,
408    pub parameters: Vec<KeyValuePair>,
409}
410
411// ============================================================
412// Track Status Messages
413// ============================================================
414
415#[derive(Debug, Clone, PartialEq, Eq)]
416pub struct TrackStatus {
417    pub request_id: VarInt,
418    pub track_namespace: TrackNamespace,
419    pub track_name: Vec<u8>,
420    pub parameters: Vec<KeyValuePair>,
421}
422
423// ============================================================
424// Fetch Messages
425// ============================================================
426
427#[derive(Debug, Clone, Copy, PartialEq, Eq)]
428#[repr(u64)]
429pub enum FetchType {
430    Standalone = 1,
431    RelativeJoining = 2,
432    AbsoluteJoining = 3,
433}
434
435impl FetchType {
436    pub fn from_u64(v: u64) -> Option<Self> {
437        match v {
438            1 => Some(FetchType::Standalone),
439            2 => Some(FetchType::RelativeJoining),
440            3 => Some(FetchType::AbsoluteJoining),
441            _ => None,
442        }
443    }
444}
445
446#[derive(Debug, Clone, PartialEq, Eq)]
447pub struct Fetch {
448    pub request_id: VarInt,
449    pub fetch_type: FetchType,
450    pub fetch_payload: FetchPayload,
451    pub parameters: Vec<KeyValuePair>,
452}
453
454#[derive(Debug, Clone, PartialEq, Eq)]
455pub enum FetchPayload {
456    Standalone {
457        track_namespace: TrackNamespace,
458        track_name: Vec<u8>,
459        start_group: VarInt,
460        start_object: VarInt,
461        end_group: VarInt,
462        end_object: VarInt,
463    },
464    Joining {
465        joining_request_id: VarInt,
466        joining_start: VarInt,
467    },
468}
469
470/// FETCH_OK (0x18). `end_of_track` is uint8.
471#[derive(Debug, Clone, PartialEq, Eq)]
472pub struct FetchOk {
473    pub end_of_track: u8,
474    pub end_group: VarInt,
475    pub end_object: VarInt,
476    pub parameters: Vec<KeyValuePair>,
477    pub track_properties: Vec<KeyValuePair>,
478}
479
480// ============================================================
481// Publish Blocked
482// ============================================================
483
484#[derive(Debug, Clone, PartialEq, Eq)]
485pub struct PublishBlocked {
486    pub namespace_suffix: TrackNamespace,
487    pub track_name: Vec<u8>,
488}
489
490// ============================================================
491// Unified Message Enum
492// ============================================================
493
494#[derive(Debug, Clone, PartialEq, Eq)]
495pub enum ControlMessage {
496    Setup(Setup),
497    GoAway(GoAway),
498    RequestOk(RequestOk),
499    RequestError(RequestError),
500    Subscribe(Subscribe),
501    SubscribeOk(SubscribeOk),
502    RequestUpdate(RequestUpdate),
503    Publish(Publish),
504    PublishDone(PublishDone),
505    PublishNamespace(PublishNamespace),
506    Namespace(Namespace),
507    NamespaceDone(NamespaceDone),
508    SubscribeNamespace(SubscribeNamespace),
509    SubscribeTracks(SubscribeTracks),
510    TrackStatus(TrackStatus),
511    Fetch(Fetch),
512    FetchOk(FetchOk),
513    PublishBlocked(PublishBlocked),
514}
515
516impl ControlMessage {
517    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
518        let mut payload = Vec::with_capacity(256);
519        self.encode_payload(&mut payload)?;
520
521        if payload.len() > MAX_MESSAGE_LENGTH {
522            return Err(CodecError::MessageTooLong(payload.len()));
523        }
524
525        let msg_type = self.message_type();
526        VarInt::from_usize(msg_type.id() as usize).encode(buf);
527        // Draft-18: 16-bit length (big-endian)
528        buf.put_u16(payload.len() as u16);
529        buf.put_slice(&payload);
530        Ok(())
531    }
532
533    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
534        let type_id = VarInt::decode(buf)?.into_inner();
535        let msg_type =
536            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
537        // Draft-18: 16-bit length (big-endian)
538        if buf.remaining() < 2 {
539            return Err(CodecError::UnexpectedEnd);
540        }
541        let payload_len = buf.get_u16() as usize;
542        if buf.remaining() < payload_len {
543            return Err(CodecError::UnexpectedEnd);
544        }
545        let payload_bytes = buf.copy_to_bytes(payload_len);
546        let mut payload = &payload_bytes[..];
547        Self::decode_payload(msg_type, &mut payload)
548    }
549
550    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
551        match self {
552            ControlMessage::Setup(m) => {
553                encode_kvp_delta(&m.options, buf);
554            }
555            ControlMessage::GoAway(m) => {
556                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
557                    return Err(CodecError::GoAwayUriTooLong);
558                }
559                VarInt::from_usize(m.new_session_uri.len()).encode(buf);
560                buf.put_slice(&m.new_session_uri);
561                m.timeout.encode(buf);
562                if let Some(rid) = &m.request_id {
563                    rid.encode(buf);
564                }
565            }
566            ControlMessage::RequestOk(m) => {
567                encode_parameters(&m.parameters, buf);
568                encode_kvp_delta(&m.track_properties, buf);
569            }
570            ControlMessage::RequestError(m) => {
571                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
572                    return Err(CodecError::ReasonPhraseTooLong);
573                }
574                m.error_code.encode(buf);
575                m.retry_interval.encode(buf);
576                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
577                buf.put_slice(&m.reason_phrase);
578                if let Some(r) = &m.redirect {
579                    VarInt::from_usize(r.connect_uri.len()).encode(buf);
580                    buf.put_slice(&r.connect_uri);
581                    r.track_namespace.encode(buf);
582                    VarInt::from_usize(r.track_name.len()).encode(buf);
583                    buf.put_slice(&r.track_name);
584                }
585            }
586            ControlMessage::Subscribe(m) => {
587                m.request_id.encode(buf);
588                m.track_namespace.encode(buf);
589                VarInt::from_usize(m.track_name.len()).encode(buf);
590                buf.put_slice(&m.track_name);
591                encode_parameters(&m.parameters, buf);
592            }
593            ControlMessage::SubscribeOk(m) => {
594                m.track_alias.encode(buf);
595                encode_parameters(&m.parameters, buf);
596                encode_kvp_delta(&m.track_properties, buf);
597            }
598            ControlMessage::RequestUpdate(m) => {
599                m.request_id.encode(buf);
600                encode_parameters(&m.parameters, buf);
601            }
602            ControlMessage::Publish(m) => {
603                m.request_id.encode(buf);
604                m.track_namespace.encode(buf);
605                VarInt::from_usize(m.track_name.len()).encode(buf);
606                buf.put_slice(&m.track_name);
607                m.track_alias.encode(buf);
608                encode_parameters(&m.parameters, buf);
609                encode_kvp_delta(&m.track_properties, buf);
610            }
611            ControlMessage::PublishDone(m) => {
612                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
613                    return Err(CodecError::ReasonPhraseTooLong);
614                }
615                m.status_code.encode(buf);
616                m.stream_count.encode(buf);
617                VarInt::from_usize(m.reason_phrase.len()).encode(buf);
618                buf.put_slice(&m.reason_phrase);
619            }
620            ControlMessage::PublishNamespace(m) => {
621                m.request_id.encode(buf);
622                m.track_namespace.encode(buf);
623                encode_parameters(&m.parameters, buf);
624            }
625            ControlMessage::Namespace(m) => {
626                m.namespace_suffix.encode(buf);
627            }
628            ControlMessage::NamespaceDone(m) => {
629                m.namespace_suffix.encode(buf);
630            }
631            ControlMessage::SubscribeNamespace(m) => {
632                m.request_id.encode(buf);
633                m.namespace_prefix.encode(buf);
634                encode_parameters(&m.parameters, buf);
635            }
636            ControlMessage::SubscribeTracks(m) => {
637                m.request_id.encode(buf);
638                m.namespace_prefix.encode(buf);
639                encode_parameters(&m.parameters, buf);
640            }
641            ControlMessage::TrackStatus(m) => {
642                m.request_id.encode(buf);
643                m.track_namespace.encode(buf);
644                VarInt::from_usize(m.track_name.len()).encode(buf);
645                buf.put_slice(&m.track_name);
646                encode_parameters(&m.parameters, buf);
647            }
648            ControlMessage::Fetch(m) => {
649                m.request_id.encode(buf);
650                VarInt::from_usize(m.fetch_type as usize).encode(buf);
651                match &m.fetch_payload {
652                    FetchPayload::Standalone {
653                        track_namespace,
654                        track_name,
655                        start_group,
656                        start_object,
657                        end_group,
658                        end_object,
659                    } => {
660                        track_namespace.encode(buf);
661                        VarInt::from_usize(track_name.len()).encode(buf);
662                        buf.put_slice(track_name);
663                        start_group.encode(buf);
664                        start_object.encode(buf);
665                        end_group.encode(buf);
666                        end_object.encode(buf);
667                    }
668                    FetchPayload::Joining { joining_request_id, joining_start } => {
669                        joining_request_id.encode(buf);
670                        joining_start.encode(buf);
671                    }
672                }
673                encode_parameters(&m.parameters, buf);
674            }
675            ControlMessage::FetchOk(m) => {
676                buf.put_u8(m.end_of_track);
677                m.end_group.encode(buf);
678                m.end_object.encode(buf);
679                encode_parameters(&m.parameters, buf);
680                encode_kvp_delta(&m.track_properties, buf);
681            }
682            ControlMessage::PublishBlocked(m) => {
683                m.namespace_suffix.encode(buf);
684                VarInt::from_usize(m.track_name.len()).encode(buf);
685                buf.put_slice(&m.track_name);
686            }
687        }
688        Ok(())
689    }
690
691    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
692        match msg_type {
693            MessageType::Setup => {
694                let options = decode_kvp_delta(buf)?;
695                Ok(ControlMessage::Setup(Setup { options }))
696            }
697            MessageType::GoAway => {
698                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
699                let uri = read_bytes(buf, uri_len)?;
700                let timeout = VarInt::decode(buf)?;
701                let request_id =
702                    if buf.has_remaining() { Some(VarInt::decode(buf)?) } else { None };
703                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri, timeout, request_id }))
704            }
705            MessageType::RequestOk => {
706                let parameters = decode_parameters(buf)?;
707                let track_properties = decode_kvp_delta(buf)?;
708                Ok(ControlMessage::RequestOk(RequestOk { parameters, track_properties }))
709            }
710            MessageType::RequestError => {
711                let error_code = VarInt::decode(buf)?;
712                let retry_interval = VarInt::decode(buf)?;
713                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
714                let reason_phrase = read_bytes(buf, reason_len)?;
715                let redirect = if error_code.into_inner() == request_error_codes::REDIRECT {
716                    let uri_len = VarInt::decode(buf)?.into_inner() as usize;
717                    let connect_uri = read_bytes(buf, uri_len)?;
718                    let track_namespace = TrackNamespace::decode_allow_empty(buf)?;
719                    let name_len = VarInt::decode(buf)?.into_inner() as usize;
720                    let track_name = read_bytes(buf, name_len)?;
721                    Some(Redirect { connect_uri, track_namespace, track_name })
722                } else {
723                    None
724                };
725                Ok(ControlMessage::RequestError(RequestError {
726                    error_code,
727                    retry_interval,
728                    reason_phrase,
729                    redirect,
730                }))
731            }
732            MessageType::Subscribe => {
733                let request_id = VarInt::decode(buf)?;
734                let track_namespace = TrackNamespace::decode(buf)?;
735                let tn_len = VarInt::decode(buf)?.into_inner() as usize;
736                let track_name = read_bytes(buf, tn_len)?;
737                let parameters = decode_parameters(buf)?;
738                Ok(ControlMessage::Subscribe(Subscribe {
739                    request_id,
740                    track_namespace,
741                    track_name,
742                    parameters,
743                }))
744            }
745            MessageType::SubscribeOk => {
746                let track_alias = VarInt::decode(buf)?;
747                let parameters = decode_parameters(buf)?;
748                let track_properties = decode_kvp_delta(buf)?;
749                Ok(ControlMessage::SubscribeOk(SubscribeOk {
750                    track_alias,
751                    parameters,
752                    track_properties,
753                }))
754            }
755            MessageType::RequestUpdate => {
756                let request_id = VarInt::decode(buf)?;
757                let parameters = decode_parameters(buf)?;
758                Ok(ControlMessage::RequestUpdate(RequestUpdate { request_id, parameters }))
759            }
760            MessageType::Publish => {
761                let request_id = VarInt::decode(buf)?;
762                let track_namespace = TrackNamespace::decode(buf)?;
763                let tn_len = VarInt::decode(buf)?.into_inner() as usize;
764                let track_name = read_bytes(buf, tn_len)?;
765                let track_alias = VarInt::decode(buf)?;
766                let parameters = decode_parameters(buf)?;
767                let track_properties = decode_kvp_delta(buf)?;
768                Ok(ControlMessage::Publish(Publish {
769                    request_id,
770                    track_namespace,
771                    track_name,
772                    track_alias,
773                    parameters,
774                    track_properties,
775                }))
776            }
777            MessageType::PublishDone => {
778                let status_code = VarInt::decode(buf)?;
779                let stream_count = VarInt::decode(buf)?;
780                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
781                let reason_phrase = read_bytes(buf, reason_len)?;
782                Ok(ControlMessage::PublishDone(PublishDone {
783                    status_code,
784                    stream_count,
785                    reason_phrase,
786                }))
787            }
788            MessageType::PublishNamespace => {
789                let request_id = VarInt::decode(buf)?;
790                let track_namespace = TrackNamespace::decode(buf)?;
791                let parameters = decode_parameters(buf)?;
792                Ok(ControlMessage::PublishNamespace(PublishNamespace {
793                    request_id,
794                    track_namespace,
795                    parameters,
796                }))
797            }
798            MessageType::Namespace => {
799                let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
800                Ok(ControlMessage::Namespace(Namespace { namespace_suffix }))
801            }
802            MessageType::NamespaceDone => {
803                let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
804                Ok(ControlMessage::NamespaceDone(NamespaceDone { namespace_suffix }))
805            }
806            MessageType::SubscribeNamespace => {
807                let request_id = VarInt::decode(buf)?;
808                let namespace_prefix = TrackNamespace::decode_allow_empty(buf)?;
809                let parameters = decode_parameters(buf)?;
810                Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
811                    request_id,
812                    namespace_prefix,
813                    parameters,
814                }))
815            }
816            MessageType::SubscribeTracks => {
817                let request_id = VarInt::decode(buf)?;
818                let namespace_prefix = TrackNamespace::decode_allow_empty(buf)?;
819                let parameters = decode_parameters(buf)?;
820                Ok(ControlMessage::SubscribeTracks(SubscribeTracks {
821                    request_id,
822                    namespace_prefix,
823                    parameters,
824                }))
825            }
826            MessageType::TrackStatus => {
827                let request_id = VarInt::decode(buf)?;
828                let track_namespace = TrackNamespace::decode(buf)?;
829                let tn_len = VarInt::decode(buf)?.into_inner() as usize;
830                let track_name = read_bytes(buf, tn_len)?;
831                let parameters = decode_parameters(buf)?;
832                Ok(ControlMessage::TrackStatus(TrackStatus {
833                    request_id,
834                    track_namespace,
835                    track_name,
836                    parameters,
837                }))
838            }
839            MessageType::Fetch => {
840                let request_id = VarInt::decode(buf)?;
841                let fetch_type_val = VarInt::decode(buf)?.into_inner();
842                let fetch_type =
843                    FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
844                let fetch_payload = match fetch_type {
845                    FetchType::Standalone => {
846                        let track_namespace = TrackNamespace::decode(buf)?;
847                        let tn_len = VarInt::decode(buf)?.into_inner() as usize;
848                        let track_name = read_bytes(buf, tn_len)?;
849                        let start_group = VarInt::decode(buf)?;
850                        let start_object = VarInt::decode(buf)?;
851                        let end_group = VarInt::decode(buf)?;
852                        let end_object = VarInt::decode(buf)?;
853                        FetchPayload::Standalone {
854                            track_namespace,
855                            track_name,
856                            start_group,
857                            start_object,
858                            end_group,
859                            end_object,
860                        }
861                    }
862                    FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
863                        let joining_request_id = VarInt::decode(buf)?;
864                        let joining_start = VarInt::decode(buf)?;
865                        FetchPayload::Joining { joining_request_id, joining_start }
866                    }
867                };
868                let parameters = decode_parameters(buf)?;
869                Ok(ControlMessage::Fetch(Fetch {
870                    request_id,
871                    fetch_type,
872                    fetch_payload,
873                    parameters,
874                }))
875            }
876            MessageType::FetchOk => {
877                if buf.remaining() < 1 {
878                    return Err(CodecError::UnexpectedEnd);
879                }
880                let end_of_track = buf.get_u8();
881                let end_group = VarInt::decode(buf)?;
882                let end_object = VarInt::decode(buf)?;
883                let parameters = decode_parameters(buf)?;
884                let track_properties = decode_kvp_delta(buf)?;
885                Ok(ControlMessage::FetchOk(FetchOk {
886                    end_of_track,
887                    end_group,
888                    end_object,
889                    parameters,
890                    track_properties,
891                }))
892            }
893            MessageType::PublishBlocked => {
894                let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
895                let tn_len = VarInt::decode(buf)?.into_inner() as usize;
896                let track_name = read_bytes(buf, tn_len)?;
897                Ok(ControlMessage::PublishBlocked(PublishBlocked { namespace_suffix, track_name }))
898            }
899        }
900    }
901
902    pub fn message_type(&self) -> MessageType {
903        match self {
904            ControlMessage::Setup(_) => MessageType::Setup,
905            ControlMessage::GoAway(_) => MessageType::GoAway,
906            ControlMessage::RequestOk(_) => MessageType::RequestOk,
907            ControlMessage::RequestError(_) => MessageType::RequestError,
908            ControlMessage::Subscribe(_) => MessageType::Subscribe,
909            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
910            ControlMessage::RequestUpdate(_) => MessageType::RequestUpdate,
911            ControlMessage::Publish(_) => MessageType::Publish,
912            ControlMessage::PublishDone(_) => MessageType::PublishDone,
913            ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
914            ControlMessage::Namespace(_) => MessageType::Namespace,
915            ControlMessage::NamespaceDone(_) => MessageType::NamespaceDone,
916            ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
917            ControlMessage::SubscribeTracks(_) => MessageType::SubscribeTracks,
918            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
919            ControlMessage::Fetch(_) => MessageType::Fetch,
920            ControlMessage::FetchOk(_) => MessageType::FetchOk,
921            ControlMessage::PublishBlocked(_) => MessageType::PublishBlocked,
922        }
923    }
924}