1pub use crate::error::{
20 CodecError, MAX_GOAWAY_URI_LENGTH, MAX_MESSAGE_LENGTH, MAX_NAMESPACE_TUPLE_SIZE,
21 MAX_REASON_PHRASE_LENGTH,
22};
23use crate::kvp::{KeyValuePair, KvpValue};
24use crate::types::*;
25use crate::varint::VarInt;
26use bytes::{Buf, BufMut};
27
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34enum ParamEncoding {
35 Varint,
37 Uint8,
39 LengthPrefixed,
41}
42
43fn param_encoding(key: u64) -> Option<ParamEncoding> {
44 match key {
45 0x02 | 0x04 | 0x06 | 0x08 | 0x0A | 0x32 => Some(ParamEncoding::Varint),
52 0x10 | 0x20 | 0x22 => Some(ParamEncoding::Uint8),
54 0x03 | 0x09 | 0x21 | 0x34 => Some(ParamEncoding::LengthPrefixed),
60 _ => None,
61 }
62}
63
64fn decode_parameters(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
66 let count = VarInt::decode(buf)?.into_inner() as usize;
67 let mut params = Vec::with_capacity(count);
68 let mut prev_key: u64 = 0;
69
70 for _ in 0..count {
71 let delta = VarInt::decode(buf)?.into_inner();
72 let abs_key = prev_key + delta;
73 prev_key = abs_key;
74
75 let encoding = param_encoding(abs_key).ok_or(CodecError::InvalidField)?;
76
77 let value = match encoding {
78 ParamEncoding::Varint => {
79 let v = VarInt::decode(buf)?;
80 KvpValue::Varint(v)
81 }
82 ParamEncoding::Uint8 => {
83 if buf.remaining() < 1 {
84 return Err(CodecError::UnexpectedEnd);
85 }
86 let byte = buf.get_u8();
87 KvpValue::Varint(VarInt::from_u64(byte as u64).unwrap())
88 }
89 ParamEncoding::LengthPrefixed => {
90 let len = VarInt::decode(buf)?.into_inner() as usize;
91 let data = read_bytes(buf, len)?;
92 KvpValue::Bytes(data)
93 }
94 };
95
96 params.push(KeyValuePair { key: VarInt::from_u64(abs_key).unwrap(), value });
97 }
98 Ok(params)
99}
100
101fn encode_parameters(params: &[KeyValuePair], buf: &mut impl BufMut) {
103 VarInt::from_usize(params.len()).encode(buf);
104 let mut prev_key: u64 = 0;
105
106 for p in params {
107 let abs_key = p.key.into_inner();
108 let delta = abs_key - prev_key;
109 prev_key = abs_key;
110 VarInt::from_u64(delta).unwrap().encode(buf);
111
112 let encoding = param_encoding(abs_key);
113 match (&p.value, encoding) {
114 (KvpValue::Varint(v), Some(ParamEncoding::Varint)) => {
115 v.encode(buf);
116 }
117 (KvpValue::Varint(v), Some(ParamEncoding::Uint8)) => {
118 buf.put_u8(v.into_inner() as u8);
119 }
120 (KvpValue::Bytes(b), Some(ParamEncoding::LengthPrefixed)) => {
121 VarInt::from_usize(b.len()).encode(buf);
122 buf.put_slice(b);
123 }
124 _ => {
125 match &p.value {
127 KvpValue::Varint(v) => v.encode(buf),
128 KvpValue::Bytes(b) => {
129 VarInt::from_usize(b.len()).encode(buf);
130 buf.put_slice(b);
131 }
132 }
133 }
134 }
135 }
136}
137
138fn decode_kvp_delta(buf: &mut impl Buf) -> Result<Vec<KeyValuePair>, CodecError> {
141 let mut pairs = Vec::new();
142 let mut prev_key: u64 = 0;
143
144 while buf.has_remaining() {
145 let delta = VarInt::decode(buf)?.into_inner();
146 let abs_key = prev_key + delta;
147 prev_key = abs_key;
148
149 let value = if abs_key % 2 == 0 {
150 let v = VarInt::decode(buf)?;
151 KvpValue::Varint(v)
152 } else {
153 let len = VarInt::decode(buf)?.into_inner() as usize;
154 let data = read_bytes(buf, len)?;
155 KvpValue::Bytes(data)
156 };
157
158 pairs.push(KeyValuePair { key: VarInt::from_u64(abs_key).unwrap(), value });
159 }
160 Ok(pairs)
161}
162
163fn encode_kvp_delta(pairs: &[KeyValuePair], buf: &mut impl BufMut) {
165 let mut prev_key: u64 = 0;
166 for p in pairs {
167 let abs_key = p.key.into_inner();
168 let delta = abs_key - prev_key;
169 prev_key = abs_key;
170 VarInt::from_u64(delta).unwrap().encode(buf);
171 match &p.value {
172 KvpValue::Varint(v) => v.encode(buf),
173 KvpValue::Bytes(b) => {
174 VarInt::from_usize(b.len()).encode(buf);
175 buf.put_slice(b);
176 }
177 }
178 }
179}
180
181#[derive(Debug, Clone, Copy, PartialEq, Eq)]
186#[repr(u64)]
187pub enum MessageType {
188 RequestUpdate = 0x02,
189 Subscribe = 0x03,
190 SubscribeOk = 0x04,
191 RequestError = 0x05,
192 PublishNamespace = 0x06,
193 RequestOk = 0x07,
195 Namespace = 0x08,
196 PublishDone = 0x0B,
197 TrackStatus = 0x0D,
198 NamespaceDone = 0x0E,
199 PublishBlocked = 0x0F,
200 GoAway = 0x10,
201 Fetch = 0x16,
202 FetchOk = 0x18,
203 Publish = 0x1D,
204 SubscribeNamespace = 0x50,
206 SubscribeTracks = 0x51,
208 Setup = 0x2F00,
209}
210
211impl MessageType {
212 pub fn from_id(id: u64) -> Option<Self> {
213 match id {
214 0x02 => Some(MessageType::RequestUpdate),
215 0x03 => Some(MessageType::Subscribe),
216 0x04 => Some(MessageType::SubscribeOk),
217 0x05 => Some(MessageType::RequestError),
218 0x06 => Some(MessageType::PublishNamespace),
219 0x07 => Some(MessageType::RequestOk),
220 0x08 => Some(MessageType::Namespace),
221 0x0B => Some(MessageType::PublishDone),
222 0x0D => Some(MessageType::TrackStatus),
223 0x0E => Some(MessageType::NamespaceDone),
224 0x0F => Some(MessageType::PublishBlocked),
225 0x10 => Some(MessageType::GoAway),
226 0x16 => Some(MessageType::Fetch),
227 0x18 => Some(MessageType::FetchOk),
228 0x1D => Some(MessageType::Publish),
229 0x50 => Some(MessageType::SubscribeNamespace),
230 0x51 => Some(MessageType::SubscribeTracks),
231 0x2F00 => Some(MessageType::Setup),
232 _ => None,
233 }
234 }
235
236 pub fn id(&self) -> u64 {
237 *self as u64
238 }
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct Setup {
248 pub options: Vec<KeyValuePair>,
249}
250
251#[derive(Debug, Clone, PartialEq, Eq)]
254pub struct GoAway {
255 pub new_session_uri: Vec<u8>,
256 pub timeout: VarInt,
257 pub request_id: Option<VarInt>,
260}
261
262#[derive(Debug, Clone, PartialEq, Eq)]
273pub struct RequestOk {
274 pub parameters: Vec<KeyValuePair>,
275 pub track_properties: Vec<KeyValuePair>,
276}
277
278#[derive(Debug, Clone, PartialEq, Eq)]
280pub struct Redirect {
281 pub connect_uri: Vec<u8>,
282 pub track_namespace: TrackNamespace,
283 pub track_name: Vec<u8>,
284}
285
286#[derive(Debug, Clone, PartialEq, Eq)]
289pub struct RequestError {
290 pub error_code: VarInt,
291 pub retry_interval: VarInt,
292 pub reason_phrase: Vec<u8>,
293 pub redirect: Option<Redirect>,
294}
295
296pub mod request_error_codes {
298 pub const UNSUPPORTED_EXTENSION: u64 = 0x33;
301 pub const REDIRECT: u64 = 0x34;
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
310pub struct Subscribe {
311 pub request_id: VarInt,
312 pub track_namespace: TrackNamespace,
313 pub track_name: Vec<u8>,
314 pub parameters: Vec<KeyValuePair>,
315}
316
317#[derive(Debug, Clone, PartialEq, Eq)]
319pub struct SubscribeOk {
320 pub track_alias: VarInt,
321 pub parameters: Vec<KeyValuePair>,
322 pub track_properties: Vec<KeyValuePair>,
323}
324
325#[derive(Debug, Clone, PartialEq, Eq)]
326pub struct RequestUpdate {
327 pub request_id: VarInt,
328 pub parameters: Vec<KeyValuePair>,
329}
330
331#[derive(Debug, Clone, PartialEq, Eq)]
336pub struct Publish {
337 pub request_id: VarInt,
338 pub track_namespace: TrackNamespace,
339 pub track_name: Vec<u8>,
340 pub track_alias: VarInt,
341 pub parameters: Vec<KeyValuePair>,
342 pub track_properties: Vec<KeyValuePair>,
343}
344
345#[derive(Debug, Clone, PartialEq, Eq)]
347pub struct PublishDone {
348 pub status_code: VarInt,
349 pub stream_count: VarInt,
350 pub reason_phrase: Vec<u8>,
351}
352
353pub mod publish_done_codes {
355 pub const TOO_FAR_BEHIND: u64 = 0x05;
357 pub const EXPIRED: u64 = 0x06;
359}
360
361#[derive(Debug, Clone, PartialEq, Eq)]
366pub struct PublishNamespace {
367 pub request_id: VarInt,
368 pub track_namespace: TrackNamespace,
369 pub parameters: Vec<KeyValuePair>,
370}
371
372#[derive(Debug, Clone, PartialEq, Eq)]
377pub struct Namespace {
378 pub namespace_suffix: TrackNamespace,
379}
380
381#[derive(Debug, Clone, PartialEq, Eq)]
382pub struct NamespaceDone {
383 pub namespace_suffix: TrackNamespace,
384}
385
386#[derive(Debug, Clone, PartialEq, Eq)]
395pub struct SubscribeNamespace {
396 pub request_id: VarInt,
397 pub namespace_prefix: TrackNamespace,
398 pub parameters: Vec<KeyValuePair>,
399}
400
401#[derive(Debug, Clone, PartialEq, Eq)]
405pub struct SubscribeTracks {
406 pub request_id: VarInt,
407 pub namespace_prefix: TrackNamespace,
408 pub parameters: Vec<KeyValuePair>,
409}
410
411#[derive(Debug, Clone, PartialEq, Eq)]
416pub struct TrackStatus {
417 pub request_id: VarInt,
418 pub track_namespace: TrackNamespace,
419 pub track_name: Vec<u8>,
420 pub parameters: Vec<KeyValuePair>,
421}
422
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
428#[repr(u64)]
429pub enum FetchType {
430 Standalone = 1,
431 RelativeJoining = 2,
432 AbsoluteJoining = 3,
433}
434
435impl FetchType {
436 pub fn from_u64(v: u64) -> Option<Self> {
437 match v {
438 1 => Some(FetchType::Standalone),
439 2 => Some(FetchType::RelativeJoining),
440 3 => Some(FetchType::AbsoluteJoining),
441 _ => None,
442 }
443 }
444}
445
446#[derive(Debug, Clone, PartialEq, Eq)]
447pub struct Fetch {
448 pub request_id: VarInt,
449 pub fetch_type: FetchType,
450 pub fetch_payload: FetchPayload,
451 pub parameters: Vec<KeyValuePair>,
452}
453
454#[derive(Debug, Clone, PartialEq, Eq)]
455pub enum FetchPayload {
456 Standalone {
457 track_namespace: TrackNamespace,
458 track_name: Vec<u8>,
459 start_group: VarInt,
460 start_object: VarInt,
461 end_group: VarInt,
462 end_object: VarInt,
463 },
464 Joining {
465 joining_request_id: VarInt,
466 joining_start: VarInt,
467 },
468}
469
470#[derive(Debug, Clone, PartialEq, Eq)]
472pub struct FetchOk {
473 pub end_of_track: u8,
474 pub end_group: VarInt,
475 pub end_object: VarInt,
476 pub parameters: Vec<KeyValuePair>,
477 pub track_properties: Vec<KeyValuePair>,
478}
479
480#[derive(Debug, Clone, PartialEq, Eq)]
485pub struct PublishBlocked {
486 pub namespace_suffix: TrackNamespace,
487 pub track_name: Vec<u8>,
488}
489
490#[derive(Debug, Clone, PartialEq, Eq)]
495pub enum ControlMessage {
496 Setup(Setup),
497 GoAway(GoAway),
498 RequestOk(RequestOk),
499 RequestError(RequestError),
500 Subscribe(Subscribe),
501 SubscribeOk(SubscribeOk),
502 RequestUpdate(RequestUpdate),
503 Publish(Publish),
504 PublishDone(PublishDone),
505 PublishNamespace(PublishNamespace),
506 Namespace(Namespace),
507 NamespaceDone(NamespaceDone),
508 SubscribeNamespace(SubscribeNamespace),
509 SubscribeTracks(SubscribeTracks),
510 TrackStatus(TrackStatus),
511 Fetch(Fetch),
512 FetchOk(FetchOk),
513 PublishBlocked(PublishBlocked),
514}
515
516impl ControlMessage {
517 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
518 let mut payload = Vec::with_capacity(256);
519 self.encode_payload(&mut payload)?;
520
521 if payload.len() > MAX_MESSAGE_LENGTH {
522 return Err(CodecError::MessageTooLong(payload.len()));
523 }
524
525 let msg_type = self.message_type();
526 VarInt::from_usize(msg_type.id() as usize).encode(buf);
527 buf.put_u16(payload.len() as u16);
529 buf.put_slice(&payload);
530 Ok(())
531 }
532
533 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
534 let type_id = VarInt::decode(buf)?.into_inner();
535 let msg_type =
536 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
537 if buf.remaining() < 2 {
539 return Err(CodecError::UnexpectedEnd);
540 }
541 let payload_len = buf.get_u16() as usize;
542 if buf.remaining() < payload_len {
543 return Err(CodecError::UnexpectedEnd);
544 }
545 let payload_bytes = buf.copy_to_bytes(payload_len);
546 let mut payload = &payload_bytes[..];
547 Self::decode_payload(msg_type, &mut payload)
548 }
549
550 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
551 match self {
552 ControlMessage::Setup(m) => {
553 encode_kvp_delta(&m.options, buf);
554 }
555 ControlMessage::GoAway(m) => {
556 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
557 return Err(CodecError::GoAwayUriTooLong);
558 }
559 VarInt::from_usize(m.new_session_uri.len()).encode(buf);
560 buf.put_slice(&m.new_session_uri);
561 m.timeout.encode(buf);
562 if let Some(rid) = &m.request_id {
563 rid.encode(buf);
564 }
565 }
566 ControlMessage::RequestOk(m) => {
567 encode_parameters(&m.parameters, buf);
568 encode_kvp_delta(&m.track_properties, buf);
569 }
570 ControlMessage::RequestError(m) => {
571 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
572 return Err(CodecError::ReasonPhraseTooLong);
573 }
574 m.error_code.encode(buf);
575 m.retry_interval.encode(buf);
576 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
577 buf.put_slice(&m.reason_phrase);
578 if let Some(r) = &m.redirect {
579 VarInt::from_usize(r.connect_uri.len()).encode(buf);
580 buf.put_slice(&r.connect_uri);
581 r.track_namespace.encode(buf);
582 VarInt::from_usize(r.track_name.len()).encode(buf);
583 buf.put_slice(&r.track_name);
584 }
585 }
586 ControlMessage::Subscribe(m) => {
587 m.request_id.encode(buf);
588 m.track_namespace.encode(buf);
589 VarInt::from_usize(m.track_name.len()).encode(buf);
590 buf.put_slice(&m.track_name);
591 encode_parameters(&m.parameters, buf);
592 }
593 ControlMessage::SubscribeOk(m) => {
594 m.track_alias.encode(buf);
595 encode_parameters(&m.parameters, buf);
596 encode_kvp_delta(&m.track_properties, buf);
597 }
598 ControlMessage::RequestUpdate(m) => {
599 m.request_id.encode(buf);
600 encode_parameters(&m.parameters, buf);
601 }
602 ControlMessage::Publish(m) => {
603 m.request_id.encode(buf);
604 m.track_namespace.encode(buf);
605 VarInt::from_usize(m.track_name.len()).encode(buf);
606 buf.put_slice(&m.track_name);
607 m.track_alias.encode(buf);
608 encode_parameters(&m.parameters, buf);
609 encode_kvp_delta(&m.track_properties, buf);
610 }
611 ControlMessage::PublishDone(m) => {
612 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
613 return Err(CodecError::ReasonPhraseTooLong);
614 }
615 m.status_code.encode(buf);
616 m.stream_count.encode(buf);
617 VarInt::from_usize(m.reason_phrase.len()).encode(buf);
618 buf.put_slice(&m.reason_phrase);
619 }
620 ControlMessage::PublishNamespace(m) => {
621 m.request_id.encode(buf);
622 m.track_namespace.encode(buf);
623 encode_parameters(&m.parameters, buf);
624 }
625 ControlMessage::Namespace(m) => {
626 m.namespace_suffix.encode(buf);
627 }
628 ControlMessage::NamespaceDone(m) => {
629 m.namespace_suffix.encode(buf);
630 }
631 ControlMessage::SubscribeNamespace(m) => {
632 m.request_id.encode(buf);
633 m.namespace_prefix.encode(buf);
634 encode_parameters(&m.parameters, buf);
635 }
636 ControlMessage::SubscribeTracks(m) => {
637 m.request_id.encode(buf);
638 m.namespace_prefix.encode(buf);
639 encode_parameters(&m.parameters, buf);
640 }
641 ControlMessage::TrackStatus(m) => {
642 m.request_id.encode(buf);
643 m.track_namespace.encode(buf);
644 VarInt::from_usize(m.track_name.len()).encode(buf);
645 buf.put_slice(&m.track_name);
646 encode_parameters(&m.parameters, buf);
647 }
648 ControlMessage::Fetch(m) => {
649 m.request_id.encode(buf);
650 VarInt::from_usize(m.fetch_type as usize).encode(buf);
651 match &m.fetch_payload {
652 FetchPayload::Standalone {
653 track_namespace,
654 track_name,
655 start_group,
656 start_object,
657 end_group,
658 end_object,
659 } => {
660 track_namespace.encode(buf);
661 VarInt::from_usize(track_name.len()).encode(buf);
662 buf.put_slice(track_name);
663 start_group.encode(buf);
664 start_object.encode(buf);
665 end_group.encode(buf);
666 end_object.encode(buf);
667 }
668 FetchPayload::Joining { joining_request_id, joining_start } => {
669 joining_request_id.encode(buf);
670 joining_start.encode(buf);
671 }
672 }
673 encode_parameters(&m.parameters, buf);
674 }
675 ControlMessage::FetchOk(m) => {
676 buf.put_u8(m.end_of_track);
677 m.end_group.encode(buf);
678 m.end_object.encode(buf);
679 encode_parameters(&m.parameters, buf);
680 encode_kvp_delta(&m.track_properties, buf);
681 }
682 ControlMessage::PublishBlocked(m) => {
683 m.namespace_suffix.encode(buf);
684 VarInt::from_usize(m.track_name.len()).encode(buf);
685 buf.put_slice(&m.track_name);
686 }
687 }
688 Ok(())
689 }
690
691 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
692 match msg_type {
693 MessageType::Setup => {
694 let options = decode_kvp_delta(buf)?;
695 Ok(ControlMessage::Setup(Setup { options }))
696 }
697 MessageType::GoAway => {
698 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
699 let uri = read_bytes(buf, uri_len)?;
700 let timeout = VarInt::decode(buf)?;
701 let request_id =
702 if buf.has_remaining() { Some(VarInt::decode(buf)?) } else { None };
703 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri, timeout, request_id }))
704 }
705 MessageType::RequestOk => {
706 let parameters = decode_parameters(buf)?;
707 let track_properties = decode_kvp_delta(buf)?;
708 Ok(ControlMessage::RequestOk(RequestOk { parameters, track_properties }))
709 }
710 MessageType::RequestError => {
711 let error_code = VarInt::decode(buf)?;
712 let retry_interval = VarInt::decode(buf)?;
713 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
714 let reason_phrase = read_bytes(buf, reason_len)?;
715 let redirect = if error_code.into_inner() == request_error_codes::REDIRECT {
716 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
717 let connect_uri = read_bytes(buf, uri_len)?;
718 let track_namespace = TrackNamespace::decode_allow_empty(buf)?;
719 let name_len = VarInt::decode(buf)?.into_inner() as usize;
720 let track_name = read_bytes(buf, name_len)?;
721 Some(Redirect { connect_uri, track_namespace, track_name })
722 } else {
723 None
724 };
725 Ok(ControlMessage::RequestError(RequestError {
726 error_code,
727 retry_interval,
728 reason_phrase,
729 redirect,
730 }))
731 }
732 MessageType::Subscribe => {
733 let request_id = VarInt::decode(buf)?;
734 let track_namespace = TrackNamespace::decode(buf)?;
735 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
736 let track_name = read_bytes(buf, tn_len)?;
737 let parameters = decode_parameters(buf)?;
738 Ok(ControlMessage::Subscribe(Subscribe {
739 request_id,
740 track_namespace,
741 track_name,
742 parameters,
743 }))
744 }
745 MessageType::SubscribeOk => {
746 let track_alias = VarInt::decode(buf)?;
747 let parameters = decode_parameters(buf)?;
748 let track_properties = decode_kvp_delta(buf)?;
749 Ok(ControlMessage::SubscribeOk(SubscribeOk {
750 track_alias,
751 parameters,
752 track_properties,
753 }))
754 }
755 MessageType::RequestUpdate => {
756 let request_id = VarInt::decode(buf)?;
757 let parameters = decode_parameters(buf)?;
758 Ok(ControlMessage::RequestUpdate(RequestUpdate { request_id, parameters }))
759 }
760 MessageType::Publish => {
761 let request_id = VarInt::decode(buf)?;
762 let track_namespace = TrackNamespace::decode(buf)?;
763 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
764 let track_name = read_bytes(buf, tn_len)?;
765 let track_alias = VarInt::decode(buf)?;
766 let parameters = decode_parameters(buf)?;
767 let track_properties = decode_kvp_delta(buf)?;
768 Ok(ControlMessage::Publish(Publish {
769 request_id,
770 track_namespace,
771 track_name,
772 track_alias,
773 parameters,
774 track_properties,
775 }))
776 }
777 MessageType::PublishDone => {
778 let status_code = VarInt::decode(buf)?;
779 let stream_count = VarInt::decode(buf)?;
780 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
781 let reason_phrase = read_bytes(buf, reason_len)?;
782 Ok(ControlMessage::PublishDone(PublishDone {
783 status_code,
784 stream_count,
785 reason_phrase,
786 }))
787 }
788 MessageType::PublishNamespace => {
789 let request_id = VarInt::decode(buf)?;
790 let track_namespace = TrackNamespace::decode(buf)?;
791 let parameters = decode_parameters(buf)?;
792 Ok(ControlMessage::PublishNamespace(PublishNamespace {
793 request_id,
794 track_namespace,
795 parameters,
796 }))
797 }
798 MessageType::Namespace => {
799 let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
800 Ok(ControlMessage::Namespace(Namespace { namespace_suffix }))
801 }
802 MessageType::NamespaceDone => {
803 let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
804 Ok(ControlMessage::NamespaceDone(NamespaceDone { namespace_suffix }))
805 }
806 MessageType::SubscribeNamespace => {
807 let request_id = VarInt::decode(buf)?;
808 let namespace_prefix = TrackNamespace::decode_allow_empty(buf)?;
809 let parameters = decode_parameters(buf)?;
810 Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
811 request_id,
812 namespace_prefix,
813 parameters,
814 }))
815 }
816 MessageType::SubscribeTracks => {
817 let request_id = VarInt::decode(buf)?;
818 let namespace_prefix = TrackNamespace::decode_allow_empty(buf)?;
819 let parameters = decode_parameters(buf)?;
820 Ok(ControlMessage::SubscribeTracks(SubscribeTracks {
821 request_id,
822 namespace_prefix,
823 parameters,
824 }))
825 }
826 MessageType::TrackStatus => {
827 let request_id = VarInt::decode(buf)?;
828 let track_namespace = TrackNamespace::decode(buf)?;
829 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
830 let track_name = read_bytes(buf, tn_len)?;
831 let parameters = decode_parameters(buf)?;
832 Ok(ControlMessage::TrackStatus(TrackStatus {
833 request_id,
834 track_namespace,
835 track_name,
836 parameters,
837 }))
838 }
839 MessageType::Fetch => {
840 let request_id = VarInt::decode(buf)?;
841 let fetch_type_val = VarInt::decode(buf)?.into_inner();
842 let fetch_type =
843 FetchType::from_u64(fetch_type_val).ok_or(CodecError::InvalidField)?;
844 let fetch_payload = match fetch_type {
845 FetchType::Standalone => {
846 let track_namespace = TrackNamespace::decode(buf)?;
847 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
848 let track_name = read_bytes(buf, tn_len)?;
849 let start_group = VarInt::decode(buf)?;
850 let start_object = VarInt::decode(buf)?;
851 let end_group = VarInt::decode(buf)?;
852 let end_object = VarInt::decode(buf)?;
853 FetchPayload::Standalone {
854 track_namespace,
855 track_name,
856 start_group,
857 start_object,
858 end_group,
859 end_object,
860 }
861 }
862 FetchType::RelativeJoining | FetchType::AbsoluteJoining => {
863 let joining_request_id = VarInt::decode(buf)?;
864 let joining_start = VarInt::decode(buf)?;
865 FetchPayload::Joining { joining_request_id, joining_start }
866 }
867 };
868 let parameters = decode_parameters(buf)?;
869 Ok(ControlMessage::Fetch(Fetch {
870 request_id,
871 fetch_type,
872 fetch_payload,
873 parameters,
874 }))
875 }
876 MessageType::FetchOk => {
877 if buf.remaining() < 1 {
878 return Err(CodecError::UnexpectedEnd);
879 }
880 let end_of_track = buf.get_u8();
881 let end_group = VarInt::decode(buf)?;
882 let end_object = VarInt::decode(buf)?;
883 let parameters = decode_parameters(buf)?;
884 let track_properties = decode_kvp_delta(buf)?;
885 Ok(ControlMessage::FetchOk(FetchOk {
886 end_of_track,
887 end_group,
888 end_object,
889 parameters,
890 track_properties,
891 }))
892 }
893 MessageType::PublishBlocked => {
894 let namespace_suffix = TrackNamespace::decode_allow_empty(buf)?;
895 let tn_len = VarInt::decode(buf)?.into_inner() as usize;
896 let track_name = read_bytes(buf, tn_len)?;
897 Ok(ControlMessage::PublishBlocked(PublishBlocked { namespace_suffix, track_name }))
898 }
899 }
900 }
901
902 pub fn message_type(&self) -> MessageType {
903 match self {
904 ControlMessage::Setup(_) => MessageType::Setup,
905 ControlMessage::GoAway(_) => MessageType::GoAway,
906 ControlMessage::RequestOk(_) => MessageType::RequestOk,
907 ControlMessage::RequestError(_) => MessageType::RequestError,
908 ControlMessage::Subscribe(_) => MessageType::Subscribe,
909 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
910 ControlMessage::RequestUpdate(_) => MessageType::RequestUpdate,
911 ControlMessage::Publish(_) => MessageType::Publish,
912 ControlMessage::PublishDone(_) => MessageType::PublishDone,
913 ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
914 ControlMessage::Namespace(_) => MessageType::Namespace,
915 ControlMessage::NamespaceDone(_) => MessageType::NamespaceDone,
916 ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
917 ControlMessage::SubscribeTracks(_) => MessageType::SubscribeTracks,
918 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
919 ControlMessage::Fetch(_) => MessageType::Fetch,
920 ControlMessage::FetchOk(_) => MessageType::FetchOk,
921 ControlMessage::PublishBlocked(_) => MessageType::PublishBlocked,
922 }
923 }
924}