Skip to main content

moqtap_codec/
message.rs

1use crate::kvp::KeyValuePair;
2use crate::types::*;
3use crate::varint::VarInt;
4use bytes::{Buf, BufMut};
5
6/// Maximum control message payload length: 2^16 - 1 bytes.
7pub const MAX_MESSAGE_LENGTH: usize = 65535;
8/// Maximum reason phrase length: 1024 bytes.
9pub const MAX_REASON_PHRASE_LENGTH: usize = 1024;
10/// Maximum GOAWAY new session URI length: 8192 bytes.
11pub const MAX_GOAWAY_URI_LENGTH: usize = 8192;
12/// Maximum full track name length: 4096 bytes.
13pub const MAX_FULL_TRACK_NAME_LENGTH: usize = 4096;
14/// Maximum track namespace tuple size: 32 elements.
15pub const MAX_NAMESPACE_TUPLE_SIZE: usize = 32;
16
17/// Control message type IDs (draft-14).
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19#[repr(u64)]
20pub enum MessageType {
21    SubscribeUpdate = 0x02,
22    Subscribe = 0x03,
23    SubscribeOk = 0x04,
24    SubscribeError = 0x05,
25    PublishNamespace = 0x06,
26    PublishNamespaceOk = 0x07,
27    PublishNamespaceError = 0x08,
28    PublishNamespaceDone = 0x09,
29    Unsubscribe = 0x0A,
30    PublishDone = 0x0B,
31    PublishNamespaceCancel = 0x0C,
32    TrackStatus = 0x0D,
33    TrackStatusOk = 0x0E,
34    TrackStatusError = 0x0F,
35    GoAway = 0x10,
36    SubscribeNamespace = 0x11,
37    SubscribeNamespaceOk = 0x12,
38    SubscribeNamespaceError = 0x13,
39    UnsubscribeNamespace = 0x14,
40    MaxRequestId = 0x15,
41    Fetch = 0x16,
42    FetchCancel = 0x17,
43    FetchOk = 0x18,
44    FetchError = 0x19,
45    RequestsBlocked = 0x1A,
46    Publish = 0x1D,
47    PublishOk = 0x1E,
48    PublishError = 0x1F,
49    ClientSetup = 0x20,
50    ServerSetup = 0x21,
51}
52
53impl MessageType {
54    pub fn from_id(id: u64) -> Option<Self> {
55        match id {
56            0x02 => Some(MessageType::SubscribeUpdate),
57            0x03 => Some(MessageType::Subscribe),
58            0x04 => Some(MessageType::SubscribeOk),
59            0x05 => Some(MessageType::SubscribeError),
60            0x06 => Some(MessageType::PublishNamespace),
61            0x07 => Some(MessageType::PublishNamespaceOk),
62            0x08 => Some(MessageType::PublishNamespaceError),
63            0x09 => Some(MessageType::PublishNamespaceDone),
64            0x0A => Some(MessageType::Unsubscribe),
65            0x0B => Some(MessageType::PublishDone),
66            0x0C => Some(MessageType::PublishNamespaceCancel),
67            0x0D => Some(MessageType::TrackStatus),
68            0x0E => Some(MessageType::TrackStatusOk),
69            0x0F => Some(MessageType::TrackStatusError),
70            0x10 => Some(MessageType::GoAway),
71            0x11 => Some(MessageType::SubscribeNamespace),
72            0x12 => Some(MessageType::SubscribeNamespaceOk),
73            0x13 => Some(MessageType::SubscribeNamespaceError),
74            0x14 => Some(MessageType::UnsubscribeNamespace),
75            0x15 => Some(MessageType::MaxRequestId),
76            0x16 => Some(MessageType::Fetch),
77            0x17 => Some(MessageType::FetchCancel),
78            0x18 => Some(MessageType::FetchOk),
79            0x19 => Some(MessageType::FetchError),
80            0x1A => Some(MessageType::RequestsBlocked),
81            0x1D => Some(MessageType::Publish),
82            0x1E => Some(MessageType::PublishOk),
83            0x1F => Some(MessageType::PublishError),
84            0x20 => Some(MessageType::ClientSetup),
85            0x21 => Some(MessageType::ServerSetup),
86            _ => None,
87        }
88    }
89
90    pub fn id(&self) -> u64 {
91        *self as u64
92    }
93}
94
95// ============================================================
96// Session Lifecycle Messages
97// ============================================================
98
99#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct ClientSetup {
101    pub supported_versions: Vec<VarInt>,
102    pub parameters: Vec<KeyValuePair>,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct ServerSetup {
107    pub selected_version: VarInt,
108    pub parameters: Vec<KeyValuePair>,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct GoAway {
113    pub new_session_uri: Vec<u8>,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct MaxRequestIdMsg {
118    pub request_id: VarInt,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct RequestsBlocked {
123    pub maximum_request_id: VarInt,
124}
125
126// ============================================================
127// Subscribe Messages
128// ============================================================
129
130#[derive(Debug, Clone, PartialEq, Eq)]
131pub struct Subscribe {
132    pub request_id: VarInt,
133    pub track_namespace: TrackNamespace,
134    pub track_name: Vec<u8>,
135    pub subscriber_priority: u8,
136    pub group_order: GroupOrder,
137    pub forward: Forward,
138    pub filter_type: FilterType,
139    /// Present only for AbsoluteStart and AbsoluteRange filter types.
140    pub start_location: Option<Location>,
141    /// Present only for AbsoluteRange filter type.
142    pub end_group: Option<VarInt>,
143    pub parameters: Vec<KeyValuePair>,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct SubscribeOk {
148    pub request_id: VarInt,
149    pub track_alias: VarInt,
150    pub expires: VarInt,
151    pub group_order: GroupOrder,
152    pub content_exists: ContentExists,
153    /// Present only when content_exists == HasLargestLocation.
154    pub largest_location: Option<Location>,
155    pub parameters: Vec<KeyValuePair>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub struct SubscribeError {
160    pub request_id: VarInt,
161    pub error_code: VarInt,
162    pub reason_phrase: Vec<u8>,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct SubscribeUpdate {
167    pub request_id: VarInt,
168    pub subscription_request_id: VarInt,
169    pub start_location: Location,
170    pub end_group: VarInt,
171    pub subscriber_priority: u8,
172    pub forward: Forward,
173    pub parameters: Vec<KeyValuePair>,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct Unsubscribe {
178    pub request_id: VarInt,
179}
180
181// ============================================================
182// Publish Messages
183// ============================================================
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct Publish {
187    pub request_id: VarInt,
188    pub track_namespace: TrackNamespace,
189    pub track_name: Vec<u8>,
190    pub forward: Forward,
191    pub parameters: Vec<KeyValuePair>,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct PublishOk {
196    pub request_id: VarInt,
197    pub track_alias: VarInt,
198    pub forward: Forward,
199    pub parameters: Vec<KeyValuePair>,
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct PublishError {
204    pub request_id: VarInt,
205    pub error_code: VarInt,
206    pub reason_phrase: Vec<u8>,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct PublishDone {
211    pub request_id: VarInt,
212    pub status_code: VarInt,
213    pub reason_phrase: Vec<u8>,
214}
215
216// ============================================================
217// Publish Namespace Messages
218// ============================================================
219
220#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct PublishNamespaceMsg {
222    pub request_id: VarInt,
223    pub track_namespace: TrackNamespace,
224    pub parameters: Vec<KeyValuePair>,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
228pub struct PublishNamespaceOk {
229    pub request_id: VarInt,
230    pub parameters: Vec<KeyValuePair>,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct PublishNamespaceError {
235    pub request_id: VarInt,
236    pub error_code: VarInt,
237    pub reason_phrase: Vec<u8>,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PublishNamespaceDone {
242    pub request_id: VarInt,
243    pub reason_phrase: Vec<u8>,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct PublishNamespaceCancel {
248    pub request_id: VarInt,
249    pub reason_phrase: Vec<u8>,
250}
251
252// ============================================================
253// Subscribe Namespace Messages
254// ============================================================
255
256#[derive(Debug, Clone, PartialEq, Eq)]
257pub struct SubscribeNamespace {
258    pub request_id: VarInt,
259    pub track_namespace: TrackNamespace,
260    pub parameters: Vec<KeyValuePair>,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq)]
264pub struct SubscribeNamespaceOk {
265    pub request_id: VarInt,
266    pub parameters: Vec<KeyValuePair>,
267}
268
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct SubscribeNamespaceError {
271    pub request_id: VarInt,
272    pub error_code: VarInt,
273    pub reason_phrase: Vec<u8>,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct UnsubscribeNamespace {
278    pub request_id: VarInt,
279    pub track_namespace: TrackNamespace,
280}
281
282// ============================================================
283// Fetch Messages
284// ============================================================
285
286#[derive(Debug, Clone, PartialEq, Eq)]
287pub struct Fetch {
288    pub request_id: VarInt,
289    pub track_namespace: TrackNamespace,
290    pub track_name: Vec<u8>,
291    pub start_group: VarInt,
292    pub start_object: VarInt,
293    pub end_group: Option<VarInt>,
294    pub priority: Option<u8>,
295    pub parameters: Vec<KeyValuePair>,
296}
297
298#[derive(Debug, Clone, PartialEq, Eq)]
299pub struct FetchOk {
300    pub request_id: VarInt,
301    pub group_order: GroupOrder,
302    pub parameters: Vec<KeyValuePair>,
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
306pub struct FetchError {
307    pub request_id: VarInt,
308    pub error_code: VarInt,
309    pub reason_phrase: Vec<u8>,
310}
311
312#[derive(Debug, Clone, PartialEq, Eq)]
313pub struct FetchCancel {
314    pub request_id: VarInt,
315}
316
317// ============================================================
318// Track Status Messages
319// ============================================================
320
321#[derive(Debug, Clone, PartialEq, Eq)]
322pub struct TrackStatusMsg {
323    pub request_id: VarInt,
324    pub track_namespace: TrackNamespace,
325    pub track_name: Vec<u8>,
326    pub parameters: Vec<KeyValuePair>,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct TrackStatusOk {
331    pub request_id: VarInt,
332    pub status_code: VarInt,
333    pub largest_location: Option<Location>,
334    pub parameters: Vec<KeyValuePair>,
335}
336
337#[derive(Debug, Clone, PartialEq, Eq)]
338pub struct TrackStatusError {
339    pub request_id: VarInt,
340    pub error_code: VarInt,
341    pub reason_phrase: Vec<u8>,
342}
343
344// ============================================================
345// Unified Message Enum
346// ============================================================
347
348#[derive(Debug, Clone, PartialEq, Eq)]
349pub enum ControlMessage {
350    ClientSetup(ClientSetup),
351    ServerSetup(ServerSetup),
352    GoAway(GoAway),
353    MaxRequestId(MaxRequestIdMsg),
354    RequestsBlocked(RequestsBlocked),
355    Subscribe(Subscribe),
356    SubscribeOk(SubscribeOk),
357    SubscribeError(SubscribeError),
358    SubscribeUpdate(SubscribeUpdate),
359    Unsubscribe(Unsubscribe),
360    Publish(Publish),
361    PublishOk(PublishOk),
362    PublishError(PublishError),
363    PublishDone(PublishDone),
364    PublishNamespace(PublishNamespaceMsg),
365    PublishNamespaceOk(PublishNamespaceOk),
366    PublishNamespaceError(PublishNamespaceError),
367    PublishNamespaceDone(PublishNamespaceDone),
368    PublishNamespaceCancel(PublishNamespaceCancel),
369    SubscribeNamespace(SubscribeNamespace),
370    SubscribeNamespaceOk(SubscribeNamespaceOk),
371    SubscribeNamespaceError(SubscribeNamespaceError),
372    UnsubscribeNamespace(UnsubscribeNamespace),
373    Fetch(Fetch),
374    FetchOk(FetchOk),
375    FetchError(FetchError),
376    FetchCancel(FetchCancel),
377    TrackStatus(TrackStatusMsg),
378    TrackStatusOk(TrackStatusOk),
379    TrackStatusError(TrackStatusError),
380}
381
382#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
383pub enum CodecError {
384    #[error("unknown message type: 0x{0:x}")]
385    UnknownMessageType(u64),
386    #[error("insufficient bytes")]
387    UnexpectedEnd,
388    #[error("message too long: {0} bytes (max {MAX_MESSAGE_LENGTH})")]
389    MessageTooLong(usize),
390    #[error("varint error: {0}")]
391    VarInt(#[from] crate::varint::VarIntError),
392    #[error("kvp error: {0}")]
393    Kvp(#[from] crate::kvp::KvpError),
394    #[error("invalid field value")]
395    InvalidField,
396    #[error("namespace tuple must have 1-{MAX_NAMESPACE_TUPLE_SIZE} elements, got {0}")]
397    InvalidNamespaceTupleSize(usize),
398    #[error("full track name exceeds {MAX_FULL_TRACK_NAME_LENGTH} bytes")]
399    TrackNameTooLong,
400    #[error("reason phrase exceeds {MAX_REASON_PHRASE_LENGTH} bytes")]
401    ReasonPhraseTooLong,
402    #[error("GOAWAY URI exceeds {MAX_GOAWAY_URI_LENGTH} bytes")]
403    GoAwayUriTooLong,
404}
405
406impl ControlMessage {
407    /// Encode this control message to bytes (including type ID and length prefix).
408    pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
409        // First encode the payload to a temporary buffer
410        let mut payload = Vec::new();
411        self.encode_payload(&mut payload)?;
412
413        // Write message type ID, payload length, payload
414        VarInt::from_u64(self.message_type().id()).unwrap().encode(buf);
415        VarInt::from_u64(payload.len() as u64).unwrap().encode(buf);
416        buf.put_slice(&payload);
417        Ok(())
418    }
419
420    /// Decode a control message from bytes (reads type ID and length prefix first).
421    pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
422        let type_id = VarInt::decode(buf)?.into_inner();
423        let msg_type =
424            MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
425        let payload_len = VarInt::decode(buf)?.into_inner() as usize;
426        if buf.remaining() < payload_len {
427            return Err(CodecError::UnexpectedEnd);
428        }
429        let payload_bytes = buf.copy_to_bytes(payload_len);
430        let mut payload = &payload_bytes[..];
431        Self::decode_payload(msg_type, &mut payload)
432    }
433
434    fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
435        match self {
436            ControlMessage::ClientSetup(m) => {
437                VarInt::from_u64(m.supported_versions.len() as u64).unwrap().encode(buf);
438                for v in &m.supported_versions {
439                    v.encode(buf);
440                }
441                KeyValuePair::encode_list(&m.parameters, buf);
442            }
443            ControlMessage::ServerSetup(m) => {
444                m.selected_version.encode(buf);
445                KeyValuePair::encode_list(&m.parameters, buf);
446            }
447            ControlMessage::GoAway(m) => {
448                if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
449                    return Err(CodecError::GoAwayUriTooLong);
450                }
451                VarInt::from_u64(m.new_session_uri.len() as u64).unwrap().encode(buf);
452                buf.put_slice(&m.new_session_uri);
453            }
454            ControlMessage::MaxRequestId(m) => {
455                m.request_id.encode(buf);
456            }
457            ControlMessage::RequestsBlocked(m) => {
458                m.maximum_request_id.encode(buf);
459            }
460            ControlMessage::Subscribe(m) => {
461                m.request_id.encode(buf);
462                m.track_namespace.encode(buf);
463                VarInt::from_u64(m.track_name.len() as u64).unwrap().encode(buf);
464                buf.put_slice(&m.track_name);
465                buf.put_u8(m.subscriber_priority);
466                buf.put_u8(m.group_order as u8);
467                buf.put_u8(m.forward as u8);
468                buf.put_u8(m.filter_type as u8);
469                if let Some(loc) = &m.start_location {
470                    loc.encode(buf);
471                }
472                if let Some(eg) = &m.end_group {
473                    eg.encode(buf);
474                }
475                KeyValuePair::encode_list(&m.parameters, buf);
476            }
477            ControlMessage::SubscribeOk(m) => {
478                m.request_id.encode(buf);
479                m.track_alias.encode(buf);
480                m.expires.encode(buf);
481                buf.put_u8(m.group_order as u8);
482                buf.put_u8(m.content_exists as u8);
483                if let Some(loc) = &m.largest_location {
484                    loc.encode(buf);
485                }
486                KeyValuePair::encode_list(&m.parameters, buf);
487            }
488            ControlMessage::SubscribeError(m) => {
489                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
490                    return Err(CodecError::ReasonPhraseTooLong);
491                }
492                m.request_id.encode(buf);
493                m.error_code.encode(buf);
494                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
495                buf.put_slice(&m.reason_phrase);
496            }
497            ControlMessage::SubscribeUpdate(m) => {
498                m.request_id.encode(buf);
499                m.subscription_request_id.encode(buf);
500                m.start_location.encode(buf);
501                m.end_group.encode(buf);
502                buf.put_u8(m.subscriber_priority);
503                buf.put_u8(m.forward as u8);
504                KeyValuePair::encode_list(&m.parameters, buf);
505            }
506            ControlMessage::Unsubscribe(m) => {
507                m.request_id.encode(buf);
508            }
509            ControlMessage::Publish(m) => {
510                m.request_id.encode(buf);
511                m.track_namespace.encode(buf);
512                VarInt::from_u64(m.track_name.len() as u64).unwrap().encode(buf);
513                buf.put_slice(&m.track_name);
514                buf.put_u8(m.forward as u8);
515                KeyValuePair::encode_list(&m.parameters, buf);
516            }
517            ControlMessage::PublishOk(m) => {
518                m.request_id.encode(buf);
519                m.track_alias.encode(buf);
520                buf.put_u8(m.forward as u8);
521                KeyValuePair::encode_list(&m.parameters, buf);
522            }
523            ControlMessage::PublishError(m) => {
524                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
525                    return Err(CodecError::ReasonPhraseTooLong);
526                }
527                m.request_id.encode(buf);
528                m.error_code.encode(buf);
529                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
530                buf.put_slice(&m.reason_phrase);
531            }
532            ControlMessage::PublishDone(m) => {
533                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
534                    return Err(CodecError::ReasonPhraseTooLong);
535                }
536                m.request_id.encode(buf);
537                m.status_code.encode(buf);
538                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
539                buf.put_slice(&m.reason_phrase);
540            }
541            ControlMessage::PublishNamespace(m) => {
542                m.request_id.encode(buf);
543                m.track_namespace.encode(buf);
544                KeyValuePair::encode_list(&m.parameters, buf);
545            }
546            ControlMessage::PublishNamespaceOk(m) => {
547                m.request_id.encode(buf);
548                KeyValuePair::encode_list(&m.parameters, buf);
549            }
550            ControlMessage::PublishNamespaceError(m) => {
551                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
552                    return Err(CodecError::ReasonPhraseTooLong);
553                }
554                m.request_id.encode(buf);
555                m.error_code.encode(buf);
556                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
557                buf.put_slice(&m.reason_phrase);
558            }
559            ControlMessage::PublishNamespaceDone(m) => {
560                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
561                    return Err(CodecError::ReasonPhraseTooLong);
562                }
563                m.request_id.encode(buf);
564                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
565                buf.put_slice(&m.reason_phrase);
566            }
567            ControlMessage::PublishNamespaceCancel(m) => {
568                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
569                    return Err(CodecError::ReasonPhraseTooLong);
570                }
571                m.request_id.encode(buf);
572                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
573                buf.put_slice(&m.reason_phrase);
574            }
575            ControlMessage::SubscribeNamespace(m) => {
576                m.request_id.encode(buf);
577                m.track_namespace.encode(buf);
578                KeyValuePair::encode_list(&m.parameters, buf);
579            }
580            ControlMessage::SubscribeNamespaceOk(m) => {
581                m.request_id.encode(buf);
582                KeyValuePair::encode_list(&m.parameters, buf);
583            }
584            ControlMessage::SubscribeNamespaceError(m) => {
585                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
586                    return Err(CodecError::ReasonPhraseTooLong);
587                }
588                m.request_id.encode(buf);
589                m.error_code.encode(buf);
590                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
591                buf.put_slice(&m.reason_phrase);
592            }
593            ControlMessage::UnsubscribeNamespace(m) => {
594                m.request_id.encode(buf);
595                m.track_namespace.encode(buf);
596            }
597            ControlMessage::Fetch(m) => {
598                m.request_id.encode(buf);
599                m.track_namespace.encode(buf);
600                VarInt::from_u64(m.track_name.len() as u64).unwrap().encode(buf);
601                buf.put_slice(&m.track_name);
602                m.start_group.encode(buf);
603                m.start_object.encode(buf);
604                // end_group: 0 = absent, value+1 = present
605                match &m.end_group {
606                    None => VarInt::from_u64(0).unwrap().encode(buf),
607                    Some(v) => VarInt::from_u64(v.into_inner() + 1).unwrap().encode(buf),
608                }
609                // priority: 0 = absent, value+1 = present
610                match &m.priority {
611                    None => VarInt::from_u64(0).unwrap().encode(buf),
612                    Some(v) => VarInt::from_u64(*v as u64 + 1).unwrap().encode(buf),
613                }
614                KeyValuePair::encode_list(&m.parameters, buf);
615            }
616            ControlMessage::FetchOk(m) => {
617                m.request_id.encode(buf);
618                buf.put_u8(m.group_order as u8);
619                KeyValuePair::encode_list(&m.parameters, buf);
620            }
621            ControlMessage::FetchError(m) => {
622                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
623                    return Err(CodecError::ReasonPhraseTooLong);
624                }
625                m.request_id.encode(buf);
626                m.error_code.encode(buf);
627                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
628                buf.put_slice(&m.reason_phrase);
629            }
630            ControlMessage::FetchCancel(m) => {
631                m.request_id.encode(buf);
632            }
633            ControlMessage::TrackStatus(m) => {
634                m.request_id.encode(buf);
635                m.track_namespace.encode(buf);
636                VarInt::from_u64(m.track_name.len() as u64).unwrap().encode(buf);
637                buf.put_slice(&m.track_name);
638                KeyValuePair::encode_list(&m.parameters, buf);
639            }
640            ControlMessage::TrackStatusOk(m) => {
641                m.request_id.encode(buf);
642                m.status_code.encode(buf);
643                // content_exists flag for largest_location
644                match &m.largest_location {
645                    None => buf.put_u8(0),
646                    Some(loc) => {
647                        buf.put_u8(1);
648                        loc.encode(buf);
649                    }
650                }
651                KeyValuePair::encode_list(&m.parameters, buf);
652            }
653            ControlMessage::TrackStatusError(m) => {
654                if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
655                    return Err(CodecError::ReasonPhraseTooLong);
656                }
657                m.request_id.encode(buf);
658                m.error_code.encode(buf);
659                VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
660                buf.put_slice(&m.reason_phrase);
661            }
662        }
663        Ok(())
664    }
665
666    fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
667        match msg_type {
668            MessageType::ClientSetup => {
669                let num_versions = VarInt::decode(buf)?.into_inner() as usize;
670                let mut supported_versions = Vec::with_capacity(num_versions);
671                for _ in 0..num_versions {
672                    supported_versions.push(VarInt::decode(buf)?);
673                }
674                let parameters = KeyValuePair::decode_list(buf)?;
675                Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
676            }
677            MessageType::ServerSetup => {
678                let selected_version = VarInt::decode(buf)?;
679                let parameters = KeyValuePair::decode_list(buf)?;
680                Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
681            }
682            MessageType::GoAway => {
683                let uri_len = VarInt::decode(buf)?.into_inner() as usize;
684                if buf.remaining() < uri_len {
685                    return Err(CodecError::UnexpectedEnd);
686                }
687                let mut uri = vec![0u8; uri_len];
688                buf.copy_to_slice(&mut uri);
689                Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
690            }
691            MessageType::MaxRequestId => {
692                let request_id = VarInt::decode(buf)?;
693                Ok(ControlMessage::MaxRequestId(MaxRequestIdMsg { request_id }))
694            }
695            MessageType::RequestsBlocked => {
696                let maximum_request_id = VarInt::decode(buf)?;
697                Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
698            }
699            MessageType::Subscribe => {
700                let request_id = VarInt::decode(buf)?;
701                let track_namespace = TrackNamespace::decode(buf)?;
702                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
703                if buf.remaining() < track_name_len {
704                    return Err(CodecError::UnexpectedEnd);
705                }
706                let mut track_name = vec![0u8; track_name_len];
707                buf.copy_to_slice(&mut track_name);
708                if buf.remaining() < 4 {
709                    return Err(CodecError::UnexpectedEnd);
710                }
711                let subscriber_priority = buf.get_u8();
712                let group_order =
713                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
714                let forward_val = buf.get_u8();
715                let forward = match forward_val {
716                    0 => Forward::DontForward,
717                    1 => Forward::Forward,
718                    _ => return Err(CodecError::InvalidField),
719                };
720                let filter_val = buf.get_u8();
721                let filter_type =
722                    FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
723                let start_location = match filter_type {
724                    FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
725                        Some(Location::decode(buf)?)
726                    }
727                    _ => None,
728                };
729                let end_group = match filter_type {
730                    FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
731                    _ => None,
732                };
733                let parameters = KeyValuePair::decode_list(buf)?;
734                Ok(ControlMessage::Subscribe(Subscribe {
735                    request_id,
736                    track_namespace,
737                    track_name,
738                    subscriber_priority,
739                    group_order,
740                    forward,
741                    filter_type,
742                    start_location,
743                    end_group,
744                    parameters,
745                }))
746            }
747            MessageType::SubscribeOk => {
748                let request_id = VarInt::decode(buf)?;
749                let track_alias = VarInt::decode(buf)?;
750                let expires = VarInt::decode(buf)?;
751                if buf.remaining() < 2 {
752                    return Err(CodecError::UnexpectedEnd);
753                }
754                let group_order =
755                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
756                let content_exists_val = buf.get_u8();
757                let content_exists = match content_exists_val {
758                    0 => ContentExists::NoLargestLocation,
759                    1 => ContentExists::HasLargestLocation,
760                    _ => return Err(CodecError::InvalidField),
761                };
762                let largest_location = if content_exists == ContentExists::HasLargestLocation {
763                    Some(Location::decode(buf)?)
764                } else {
765                    None
766                };
767                let parameters = KeyValuePair::decode_list(buf)?;
768                Ok(ControlMessage::SubscribeOk(SubscribeOk {
769                    request_id,
770                    track_alias,
771                    expires,
772                    group_order,
773                    content_exists,
774                    largest_location,
775                    parameters,
776                }))
777            }
778            MessageType::SubscribeError => {
779                let request_id = VarInt::decode(buf)?;
780                let error_code = VarInt::decode(buf)?;
781                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
782                if buf.remaining() < reason_len {
783                    return Err(CodecError::UnexpectedEnd);
784                }
785                let mut reason_phrase = vec![0u8; reason_len];
786                buf.copy_to_slice(&mut reason_phrase);
787                Ok(ControlMessage::SubscribeError(SubscribeError {
788                    request_id,
789                    error_code,
790                    reason_phrase,
791                }))
792            }
793            MessageType::SubscribeUpdate => {
794                let request_id = VarInt::decode(buf)?;
795                let subscription_request_id = VarInt::decode(buf)?;
796                let start_location = Location::decode(buf)?;
797                let end_group = VarInt::decode(buf)?;
798                if buf.remaining() < 2 {
799                    return Err(CodecError::UnexpectedEnd);
800                }
801                let subscriber_priority = buf.get_u8();
802                let forward_val = buf.get_u8();
803                let forward = match forward_val {
804                    0 => Forward::DontForward,
805                    1 => Forward::Forward,
806                    _ => return Err(CodecError::InvalidField),
807                };
808                let parameters = KeyValuePair::decode_list(buf)?;
809                Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
810                    request_id,
811                    subscription_request_id,
812                    start_location,
813                    end_group,
814                    subscriber_priority,
815                    forward,
816                    parameters,
817                }))
818            }
819            MessageType::Unsubscribe => {
820                let request_id = VarInt::decode(buf)?;
821                Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
822            }
823            MessageType::Publish => {
824                let request_id = VarInt::decode(buf)?;
825                let track_namespace = TrackNamespace::decode(buf)?;
826                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
827                if buf.remaining() < track_name_len {
828                    return Err(CodecError::UnexpectedEnd);
829                }
830                let mut track_name = vec![0u8; track_name_len];
831                buf.copy_to_slice(&mut track_name);
832                if buf.remaining() < 1 {
833                    return Err(CodecError::UnexpectedEnd);
834                }
835                let forward_val = buf.get_u8();
836                let forward = match forward_val {
837                    0 => Forward::DontForward,
838                    1 => Forward::Forward,
839                    _ => return Err(CodecError::InvalidField),
840                };
841                let parameters = KeyValuePair::decode_list(buf)?;
842                Ok(ControlMessage::Publish(Publish {
843                    request_id,
844                    track_namespace,
845                    track_name,
846                    forward,
847                    parameters,
848                }))
849            }
850            MessageType::PublishOk => {
851                let request_id = VarInt::decode(buf)?;
852                let track_alias = VarInt::decode(buf)?;
853                if buf.remaining() < 1 {
854                    return Err(CodecError::UnexpectedEnd);
855                }
856                let forward_val = buf.get_u8();
857                let forward = match forward_val {
858                    0 => Forward::DontForward,
859                    1 => Forward::Forward,
860                    _ => return Err(CodecError::InvalidField),
861                };
862                let parameters = KeyValuePair::decode_list(buf)?;
863                Ok(ControlMessage::PublishOk(PublishOk {
864                    request_id,
865                    track_alias,
866                    forward,
867                    parameters,
868                }))
869            }
870            MessageType::PublishError => {
871                let request_id = VarInt::decode(buf)?;
872                let error_code = VarInt::decode(buf)?;
873                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
874                if buf.remaining() < reason_len {
875                    return Err(CodecError::UnexpectedEnd);
876                }
877                let mut reason_phrase = vec![0u8; reason_len];
878                buf.copy_to_slice(&mut reason_phrase);
879                Ok(ControlMessage::PublishError(PublishError {
880                    request_id,
881                    error_code,
882                    reason_phrase,
883                }))
884            }
885            MessageType::PublishDone => {
886                let request_id = VarInt::decode(buf)?;
887                let status_code = VarInt::decode(buf)?;
888                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
889                if buf.remaining() < reason_len {
890                    return Err(CodecError::UnexpectedEnd);
891                }
892                let mut reason_phrase = vec![0u8; reason_len];
893                buf.copy_to_slice(&mut reason_phrase);
894                Ok(ControlMessage::PublishDone(PublishDone {
895                    request_id,
896                    status_code,
897                    reason_phrase,
898                }))
899            }
900            MessageType::PublishNamespace => {
901                let request_id = VarInt::decode(buf)?;
902                let track_namespace = TrackNamespace::decode(buf)?;
903                let parameters = KeyValuePair::decode_list(buf)?;
904                Ok(ControlMessage::PublishNamespace(PublishNamespaceMsg {
905                    request_id,
906                    track_namespace,
907                    parameters,
908                }))
909            }
910            MessageType::PublishNamespaceOk => {
911                let request_id = VarInt::decode(buf)?;
912                let parameters = KeyValuePair::decode_list(buf)?;
913                Ok(ControlMessage::PublishNamespaceOk(PublishNamespaceOk {
914                    request_id,
915                    parameters,
916                }))
917            }
918            MessageType::PublishNamespaceError => {
919                let request_id = VarInt::decode(buf)?;
920                let error_code = VarInt::decode(buf)?;
921                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
922                if buf.remaining() < reason_len {
923                    return Err(CodecError::UnexpectedEnd);
924                }
925                let mut reason_phrase = vec![0u8; reason_len];
926                buf.copy_to_slice(&mut reason_phrase);
927                Ok(ControlMessage::PublishNamespaceError(PublishNamespaceError {
928                    request_id,
929                    error_code,
930                    reason_phrase,
931                }))
932            }
933            MessageType::PublishNamespaceDone => {
934                let request_id = VarInt::decode(buf)?;
935                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
936                if buf.remaining() < reason_len {
937                    return Err(CodecError::UnexpectedEnd);
938                }
939                let mut reason_phrase = vec![0u8; reason_len];
940                buf.copy_to_slice(&mut reason_phrase);
941                Ok(ControlMessage::PublishNamespaceDone(PublishNamespaceDone {
942                    request_id,
943                    reason_phrase,
944                }))
945            }
946            MessageType::PublishNamespaceCancel => {
947                let request_id = VarInt::decode(buf)?;
948                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
949                if buf.remaining() < reason_len {
950                    return Err(CodecError::UnexpectedEnd);
951                }
952                let mut reason_phrase = vec![0u8; reason_len];
953                buf.copy_to_slice(&mut reason_phrase);
954                Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
955                    request_id,
956                    reason_phrase,
957                }))
958            }
959            MessageType::SubscribeNamespace => {
960                let request_id = VarInt::decode(buf)?;
961                let track_namespace = TrackNamespace::decode(buf)?;
962                let parameters = KeyValuePair::decode_list(buf)?;
963                Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
964                    request_id,
965                    track_namespace,
966                    parameters,
967                }))
968            }
969            MessageType::SubscribeNamespaceOk => {
970                let request_id = VarInt::decode(buf)?;
971                let parameters = KeyValuePair::decode_list(buf)?;
972                Ok(ControlMessage::SubscribeNamespaceOk(SubscribeNamespaceOk {
973                    request_id,
974                    parameters,
975                }))
976            }
977            MessageType::SubscribeNamespaceError => {
978                let request_id = VarInt::decode(buf)?;
979                let error_code = VarInt::decode(buf)?;
980                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
981                if buf.remaining() < reason_len {
982                    return Err(CodecError::UnexpectedEnd);
983                }
984                let mut reason_phrase = vec![0u8; reason_len];
985                buf.copy_to_slice(&mut reason_phrase);
986                Ok(ControlMessage::SubscribeNamespaceError(SubscribeNamespaceError {
987                    request_id,
988                    error_code,
989                    reason_phrase,
990                }))
991            }
992            MessageType::UnsubscribeNamespace => {
993                let request_id = VarInt::decode(buf)?;
994                let track_namespace = TrackNamespace::decode(buf)?;
995                Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace {
996                    request_id,
997                    track_namespace,
998                }))
999            }
1000            MessageType::Fetch => {
1001                let request_id = VarInt::decode(buf)?;
1002                let track_namespace = TrackNamespace::decode(buf)?;
1003                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1004                if buf.remaining() < track_name_len {
1005                    return Err(CodecError::UnexpectedEnd);
1006                }
1007                let mut track_name = vec![0u8; track_name_len];
1008                buf.copy_to_slice(&mut track_name);
1009                let start_group = VarInt::decode(buf)?;
1010                let start_object = VarInt::decode(buf)?;
1011                let end_group_raw = VarInt::decode(buf)?.into_inner();
1012                let end_group = if end_group_raw == 0 {
1013                    None
1014                } else {
1015                    Some(VarInt::from_u64(end_group_raw - 1).unwrap())
1016                };
1017                let priority_raw = VarInt::decode(buf)?.into_inner();
1018                let priority =
1019                    if priority_raw == 0 { None } else { Some((priority_raw - 1) as u8) };
1020                let parameters = KeyValuePair::decode_list(buf)?;
1021                Ok(ControlMessage::Fetch(Fetch {
1022                    request_id,
1023                    track_namespace,
1024                    track_name,
1025                    start_group,
1026                    start_object,
1027                    end_group,
1028                    priority,
1029                    parameters,
1030                }))
1031            }
1032            MessageType::FetchOk => {
1033                let request_id = VarInt::decode(buf)?;
1034                if buf.remaining() < 1 {
1035                    return Err(CodecError::UnexpectedEnd);
1036                }
1037                let group_order =
1038                    GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1039                let parameters = KeyValuePair::decode_list(buf)?;
1040                Ok(ControlMessage::FetchOk(FetchOk { request_id, group_order, parameters }))
1041            }
1042            MessageType::FetchError => {
1043                let request_id = VarInt::decode(buf)?;
1044                let error_code = VarInt::decode(buf)?;
1045                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1046                if buf.remaining() < reason_len {
1047                    return Err(CodecError::UnexpectedEnd);
1048                }
1049                let mut reason_phrase = vec![0u8; reason_len];
1050                buf.copy_to_slice(&mut reason_phrase);
1051                Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
1052            }
1053            MessageType::FetchCancel => {
1054                let request_id = VarInt::decode(buf)?;
1055                Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1056            }
1057            MessageType::TrackStatus => {
1058                let request_id = VarInt::decode(buf)?;
1059                let track_namespace = TrackNamespace::decode(buf)?;
1060                let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1061                if buf.remaining() < track_name_len {
1062                    return Err(CodecError::UnexpectedEnd);
1063                }
1064                let mut track_name = vec![0u8; track_name_len];
1065                buf.copy_to_slice(&mut track_name);
1066                let parameters = KeyValuePair::decode_list(buf)?;
1067                Ok(ControlMessage::TrackStatus(TrackStatusMsg {
1068                    request_id,
1069                    track_namespace,
1070                    track_name,
1071                    parameters,
1072                }))
1073            }
1074            MessageType::TrackStatusOk => {
1075                let request_id = VarInt::decode(buf)?;
1076                let status_code = VarInt::decode(buf)?;
1077                if buf.remaining() < 1 {
1078                    return Err(CodecError::UnexpectedEnd);
1079                }
1080                let content_exists = buf.get_u8();
1081                let largest_location =
1082                    if content_exists == 1 { Some(Location::decode(buf)?) } else { None };
1083                let parameters = KeyValuePair::decode_list(buf)?;
1084                Ok(ControlMessage::TrackStatusOk(TrackStatusOk {
1085                    request_id,
1086                    status_code,
1087                    largest_location,
1088                    parameters,
1089                }))
1090            }
1091            MessageType::TrackStatusError => {
1092                let request_id = VarInt::decode(buf)?;
1093                let error_code = VarInt::decode(buf)?;
1094                let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1095                if buf.remaining() < reason_len {
1096                    return Err(CodecError::UnexpectedEnd);
1097                }
1098                let mut reason_phrase = vec![0u8; reason_len];
1099                buf.copy_to_slice(&mut reason_phrase);
1100                Ok(ControlMessage::TrackStatusError(TrackStatusError {
1101                    request_id,
1102                    error_code,
1103                    reason_phrase,
1104                }))
1105            }
1106        }
1107    }
1108
1109    /// Get the message type ID for this message.
1110    pub fn message_type(&self) -> MessageType {
1111        match self {
1112            ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1113            ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1114            ControlMessage::GoAway(_) => MessageType::GoAway,
1115            ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1116            ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1117            ControlMessage::Subscribe(_) => MessageType::Subscribe,
1118            ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1119            ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1120            ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1121            ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1122            ControlMessage::Publish(_) => MessageType::Publish,
1123            ControlMessage::PublishOk(_) => MessageType::PublishOk,
1124            ControlMessage::PublishError(_) => MessageType::PublishError,
1125            ControlMessage::PublishDone(_) => MessageType::PublishDone,
1126            ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
1127            ControlMessage::PublishNamespaceOk(_) => MessageType::PublishNamespaceOk,
1128            ControlMessage::PublishNamespaceError(_) => MessageType::PublishNamespaceError,
1129            ControlMessage::PublishNamespaceDone(_) => MessageType::PublishNamespaceDone,
1130            ControlMessage::PublishNamespaceCancel(_) => MessageType::PublishNamespaceCancel,
1131            ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
1132            ControlMessage::SubscribeNamespaceOk(_) => MessageType::SubscribeNamespaceOk,
1133            ControlMessage::SubscribeNamespaceError(_) => MessageType::SubscribeNamespaceError,
1134            ControlMessage::UnsubscribeNamespace(_) => MessageType::UnsubscribeNamespace,
1135            ControlMessage::Fetch(_) => MessageType::Fetch,
1136            ControlMessage::FetchOk(_) => MessageType::FetchOk,
1137            ControlMessage::FetchError(_) => MessageType::FetchError,
1138            ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1139            ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1140            ControlMessage::TrackStatusOk(_) => MessageType::TrackStatusOk,
1141            ControlMessage::TrackStatusError(_) => MessageType::TrackStatusError,
1142        }
1143    }
1144}