1use crate::kvp::KeyValuePair;
2use crate::types::*;
3use crate::varint::VarInt;
4use bytes::{Buf, BufMut};
5
6pub const MAX_MESSAGE_LENGTH: usize = 65535;
8pub const MAX_REASON_PHRASE_LENGTH: usize = 1024;
10pub const MAX_GOAWAY_URI_LENGTH: usize = 8192;
12pub const MAX_FULL_TRACK_NAME_LENGTH: usize = 4096;
14pub const MAX_NAMESPACE_TUPLE_SIZE: usize = 32;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
19#[repr(u64)]
20pub enum MessageType {
21 SubscribeUpdate = 0x02,
22 Subscribe = 0x03,
23 SubscribeOk = 0x04,
24 SubscribeError = 0x05,
25 PublishNamespace = 0x06,
26 PublishNamespaceOk = 0x07,
27 PublishNamespaceError = 0x08,
28 PublishNamespaceDone = 0x09,
29 Unsubscribe = 0x0A,
30 PublishDone = 0x0B,
31 PublishNamespaceCancel = 0x0C,
32 TrackStatus = 0x0D,
33 TrackStatusOk = 0x0E,
34 TrackStatusError = 0x0F,
35 GoAway = 0x10,
36 SubscribeNamespace = 0x11,
37 SubscribeNamespaceOk = 0x12,
38 SubscribeNamespaceError = 0x13,
39 UnsubscribeNamespace = 0x14,
40 MaxRequestId = 0x15,
41 Fetch = 0x16,
42 FetchCancel = 0x17,
43 FetchOk = 0x18,
44 FetchError = 0x19,
45 RequestsBlocked = 0x1A,
46 Publish = 0x1D,
47 PublishOk = 0x1E,
48 PublishError = 0x1F,
49 ClientSetup = 0x20,
50 ServerSetup = 0x21,
51}
52
53impl MessageType {
54 pub fn from_id(id: u64) -> Option<Self> {
55 match id {
56 0x02 => Some(MessageType::SubscribeUpdate),
57 0x03 => Some(MessageType::Subscribe),
58 0x04 => Some(MessageType::SubscribeOk),
59 0x05 => Some(MessageType::SubscribeError),
60 0x06 => Some(MessageType::PublishNamespace),
61 0x07 => Some(MessageType::PublishNamespaceOk),
62 0x08 => Some(MessageType::PublishNamespaceError),
63 0x09 => Some(MessageType::PublishNamespaceDone),
64 0x0A => Some(MessageType::Unsubscribe),
65 0x0B => Some(MessageType::PublishDone),
66 0x0C => Some(MessageType::PublishNamespaceCancel),
67 0x0D => Some(MessageType::TrackStatus),
68 0x0E => Some(MessageType::TrackStatusOk),
69 0x0F => Some(MessageType::TrackStatusError),
70 0x10 => Some(MessageType::GoAway),
71 0x11 => Some(MessageType::SubscribeNamespace),
72 0x12 => Some(MessageType::SubscribeNamespaceOk),
73 0x13 => Some(MessageType::SubscribeNamespaceError),
74 0x14 => Some(MessageType::UnsubscribeNamespace),
75 0x15 => Some(MessageType::MaxRequestId),
76 0x16 => Some(MessageType::Fetch),
77 0x17 => Some(MessageType::FetchCancel),
78 0x18 => Some(MessageType::FetchOk),
79 0x19 => Some(MessageType::FetchError),
80 0x1A => Some(MessageType::RequestsBlocked),
81 0x1D => Some(MessageType::Publish),
82 0x1E => Some(MessageType::PublishOk),
83 0x1F => Some(MessageType::PublishError),
84 0x20 => Some(MessageType::ClientSetup),
85 0x21 => Some(MessageType::ServerSetup),
86 _ => None,
87 }
88 }
89
90 pub fn id(&self) -> u64 {
91 *self as u64
92 }
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
100pub struct ClientSetup {
101 pub supported_versions: Vec<VarInt>,
102 pub parameters: Vec<KeyValuePair>,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct ServerSetup {
107 pub selected_version: VarInt,
108 pub parameters: Vec<KeyValuePair>,
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct GoAway {
113 pub new_session_uri: Vec<u8>,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct MaxRequestIdMsg {
118 pub request_id: VarInt,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct RequestsBlocked {
123 pub maximum_request_id: VarInt,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
131pub struct Subscribe {
132 pub request_id: VarInt,
133 pub track_namespace: TrackNamespace,
134 pub track_name: Vec<u8>,
135 pub subscriber_priority: u8,
136 pub group_order: GroupOrder,
137 pub forward: Forward,
138 pub filter_type: FilterType,
139 pub start_location: Option<Location>,
141 pub end_group: Option<VarInt>,
143 pub parameters: Vec<KeyValuePair>,
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct SubscribeOk {
148 pub request_id: VarInt,
149 pub track_alias: VarInt,
150 pub expires: VarInt,
151 pub group_order: GroupOrder,
152 pub content_exists: ContentExists,
153 pub largest_location: Option<Location>,
155 pub parameters: Vec<KeyValuePair>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
159pub struct SubscribeError {
160 pub request_id: VarInt,
161 pub error_code: VarInt,
162 pub reason_phrase: Vec<u8>,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct SubscribeUpdate {
167 pub request_id: VarInt,
168 pub subscription_request_id: VarInt,
169 pub start_location: Location,
170 pub end_group: VarInt,
171 pub subscriber_priority: u8,
172 pub forward: Forward,
173 pub parameters: Vec<KeyValuePair>,
174}
175
176#[derive(Debug, Clone, PartialEq, Eq)]
177pub struct Unsubscribe {
178 pub request_id: VarInt,
179}
180
181#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct Publish {
187 pub request_id: VarInt,
188 pub track_namespace: TrackNamespace,
189 pub track_name: Vec<u8>,
190 pub forward: Forward,
191 pub parameters: Vec<KeyValuePair>,
192}
193
194#[derive(Debug, Clone, PartialEq, Eq)]
195pub struct PublishOk {
196 pub request_id: VarInt,
197 pub track_alias: VarInt,
198 pub forward: Forward,
199 pub parameters: Vec<KeyValuePair>,
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct PublishError {
204 pub request_id: VarInt,
205 pub error_code: VarInt,
206 pub reason_phrase: Vec<u8>,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
210pub struct PublishDone {
211 pub request_id: VarInt,
212 pub status_code: VarInt,
213 pub reason_phrase: Vec<u8>,
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
221pub struct PublishNamespaceMsg {
222 pub request_id: VarInt,
223 pub track_namespace: TrackNamespace,
224 pub parameters: Vec<KeyValuePair>,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
228pub struct PublishNamespaceOk {
229 pub request_id: VarInt,
230 pub parameters: Vec<KeyValuePair>,
231}
232
233#[derive(Debug, Clone, PartialEq, Eq)]
234pub struct PublishNamespaceError {
235 pub request_id: VarInt,
236 pub error_code: VarInt,
237 pub reason_phrase: Vec<u8>,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct PublishNamespaceDone {
242 pub request_id: VarInt,
243 pub reason_phrase: Vec<u8>,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct PublishNamespaceCancel {
248 pub request_id: VarInt,
249 pub reason_phrase: Vec<u8>,
250}
251
252#[derive(Debug, Clone, PartialEq, Eq)]
257pub struct SubscribeNamespace {
258 pub request_id: VarInt,
259 pub track_namespace: TrackNamespace,
260 pub parameters: Vec<KeyValuePair>,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq)]
264pub struct SubscribeNamespaceOk {
265 pub request_id: VarInt,
266 pub parameters: Vec<KeyValuePair>,
267}
268
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub struct SubscribeNamespaceError {
271 pub request_id: VarInt,
272 pub error_code: VarInt,
273 pub reason_phrase: Vec<u8>,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct UnsubscribeNamespace {
278 pub request_id: VarInt,
279 pub track_namespace: TrackNamespace,
280}
281
282#[derive(Debug, Clone, PartialEq, Eq)]
287pub struct Fetch {
288 pub request_id: VarInt,
289 pub track_namespace: TrackNamespace,
290 pub track_name: Vec<u8>,
291 pub start_group: VarInt,
292 pub start_object: VarInt,
293 pub end_group: Option<VarInt>,
294 pub priority: Option<u8>,
295 pub parameters: Vec<KeyValuePair>,
296}
297
298#[derive(Debug, Clone, PartialEq, Eq)]
299pub struct FetchOk {
300 pub request_id: VarInt,
301 pub group_order: GroupOrder,
302 pub parameters: Vec<KeyValuePair>,
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
306pub struct FetchError {
307 pub request_id: VarInt,
308 pub error_code: VarInt,
309 pub reason_phrase: Vec<u8>,
310}
311
312#[derive(Debug, Clone, PartialEq, Eq)]
313pub struct FetchCancel {
314 pub request_id: VarInt,
315}
316
317#[derive(Debug, Clone, PartialEq, Eq)]
322pub struct TrackStatusMsg {
323 pub request_id: VarInt,
324 pub track_namespace: TrackNamespace,
325 pub track_name: Vec<u8>,
326 pub parameters: Vec<KeyValuePair>,
327}
328
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct TrackStatusOk {
331 pub request_id: VarInt,
332 pub status_code: VarInt,
333 pub largest_location: Option<Location>,
334 pub parameters: Vec<KeyValuePair>,
335}
336
337#[derive(Debug, Clone, PartialEq, Eq)]
338pub struct TrackStatusError {
339 pub request_id: VarInt,
340 pub error_code: VarInt,
341 pub reason_phrase: Vec<u8>,
342}
343
344#[derive(Debug, Clone, PartialEq, Eq)]
349pub enum ControlMessage {
350 ClientSetup(ClientSetup),
351 ServerSetup(ServerSetup),
352 GoAway(GoAway),
353 MaxRequestId(MaxRequestIdMsg),
354 RequestsBlocked(RequestsBlocked),
355 Subscribe(Subscribe),
356 SubscribeOk(SubscribeOk),
357 SubscribeError(SubscribeError),
358 SubscribeUpdate(SubscribeUpdate),
359 Unsubscribe(Unsubscribe),
360 Publish(Publish),
361 PublishOk(PublishOk),
362 PublishError(PublishError),
363 PublishDone(PublishDone),
364 PublishNamespace(PublishNamespaceMsg),
365 PublishNamespaceOk(PublishNamespaceOk),
366 PublishNamespaceError(PublishNamespaceError),
367 PublishNamespaceDone(PublishNamespaceDone),
368 PublishNamespaceCancel(PublishNamespaceCancel),
369 SubscribeNamespace(SubscribeNamespace),
370 SubscribeNamespaceOk(SubscribeNamespaceOk),
371 SubscribeNamespaceError(SubscribeNamespaceError),
372 UnsubscribeNamespace(UnsubscribeNamespace),
373 Fetch(Fetch),
374 FetchOk(FetchOk),
375 FetchError(FetchError),
376 FetchCancel(FetchCancel),
377 TrackStatus(TrackStatusMsg),
378 TrackStatusOk(TrackStatusOk),
379 TrackStatusError(TrackStatusError),
380}
381
382#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
383pub enum CodecError {
384 #[error("unknown message type: 0x{0:x}")]
385 UnknownMessageType(u64),
386 #[error("insufficient bytes")]
387 UnexpectedEnd,
388 #[error("message too long: {0} bytes (max {MAX_MESSAGE_LENGTH})")]
389 MessageTooLong(usize),
390 #[error("varint error: {0}")]
391 VarInt(#[from] crate::varint::VarIntError),
392 #[error("kvp error: {0}")]
393 Kvp(#[from] crate::kvp::KvpError),
394 #[error("invalid field value")]
395 InvalidField,
396 #[error("namespace tuple must have 1-{MAX_NAMESPACE_TUPLE_SIZE} elements, got {0}")]
397 InvalidNamespaceTupleSize(usize),
398 #[error("full track name exceeds {MAX_FULL_TRACK_NAME_LENGTH} bytes")]
399 TrackNameTooLong,
400 #[error("reason phrase exceeds {MAX_REASON_PHRASE_LENGTH} bytes")]
401 ReasonPhraseTooLong,
402 #[error("GOAWAY URI exceeds {MAX_GOAWAY_URI_LENGTH} bytes")]
403 GoAwayUriTooLong,
404}
405
406impl ControlMessage {
407 pub fn encode(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
409 let mut payload = Vec::new();
411 self.encode_payload(&mut payload)?;
412
413 VarInt::from_u64(self.message_type().id()).unwrap().encode(buf);
415 VarInt::from_u64(payload.len() as u64).unwrap().encode(buf);
416 buf.put_slice(&payload);
417 Ok(())
418 }
419
420 pub fn decode(buf: &mut impl Buf) -> Result<Self, CodecError> {
422 let type_id = VarInt::decode(buf)?.into_inner();
423 let msg_type =
424 MessageType::from_id(type_id).ok_or(CodecError::UnknownMessageType(type_id))?;
425 let payload_len = VarInt::decode(buf)?.into_inner() as usize;
426 if buf.remaining() < payload_len {
427 return Err(CodecError::UnexpectedEnd);
428 }
429 let payload_bytes = buf.copy_to_bytes(payload_len);
430 let mut payload = &payload_bytes[..];
431 Self::decode_payload(msg_type, &mut payload)
432 }
433
434 fn encode_payload(&self, buf: &mut impl BufMut) -> Result<(), CodecError> {
435 match self {
436 ControlMessage::ClientSetup(m) => {
437 VarInt::from_u64(m.supported_versions.len() as u64).unwrap().encode(buf);
438 for v in &m.supported_versions {
439 v.encode(buf);
440 }
441 KeyValuePair::encode_list(&m.parameters, buf);
442 }
443 ControlMessage::ServerSetup(m) => {
444 m.selected_version.encode(buf);
445 KeyValuePair::encode_list(&m.parameters, buf);
446 }
447 ControlMessage::GoAway(m) => {
448 if m.new_session_uri.len() > MAX_GOAWAY_URI_LENGTH {
449 return Err(CodecError::GoAwayUriTooLong);
450 }
451 VarInt::from_u64(m.new_session_uri.len() as u64).unwrap().encode(buf);
452 buf.put_slice(&m.new_session_uri);
453 }
454 ControlMessage::MaxRequestId(m) => {
455 m.request_id.encode(buf);
456 }
457 ControlMessage::RequestsBlocked(m) => {
458 m.maximum_request_id.encode(buf);
459 }
460 ControlMessage::Subscribe(m) => {
461 m.request_id.encode(buf);
462 m.track_namespace.encode(buf);
463 VarInt::from_u64(m.track_name.len() as u64).unwrap().encode(buf);
464 buf.put_slice(&m.track_name);
465 buf.put_u8(m.subscriber_priority);
466 buf.put_u8(m.group_order as u8);
467 buf.put_u8(m.forward as u8);
468 buf.put_u8(m.filter_type as u8);
469 if let Some(loc) = &m.start_location {
470 loc.encode(buf);
471 }
472 if let Some(eg) = &m.end_group {
473 eg.encode(buf);
474 }
475 KeyValuePair::encode_list(&m.parameters, buf);
476 }
477 ControlMessage::SubscribeOk(m) => {
478 m.request_id.encode(buf);
479 m.track_alias.encode(buf);
480 m.expires.encode(buf);
481 buf.put_u8(m.group_order as u8);
482 buf.put_u8(m.content_exists as u8);
483 if let Some(loc) = &m.largest_location {
484 loc.encode(buf);
485 }
486 KeyValuePair::encode_list(&m.parameters, buf);
487 }
488 ControlMessage::SubscribeError(m) => {
489 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
490 return Err(CodecError::ReasonPhraseTooLong);
491 }
492 m.request_id.encode(buf);
493 m.error_code.encode(buf);
494 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
495 buf.put_slice(&m.reason_phrase);
496 }
497 ControlMessage::SubscribeUpdate(m) => {
498 m.request_id.encode(buf);
499 m.subscription_request_id.encode(buf);
500 m.start_location.encode(buf);
501 m.end_group.encode(buf);
502 buf.put_u8(m.subscriber_priority);
503 buf.put_u8(m.forward as u8);
504 KeyValuePair::encode_list(&m.parameters, buf);
505 }
506 ControlMessage::Unsubscribe(m) => {
507 m.request_id.encode(buf);
508 }
509 ControlMessage::Publish(m) => {
510 m.request_id.encode(buf);
511 m.track_namespace.encode(buf);
512 VarInt::from_u64(m.track_name.len() as u64).unwrap().encode(buf);
513 buf.put_slice(&m.track_name);
514 buf.put_u8(m.forward as u8);
515 KeyValuePair::encode_list(&m.parameters, buf);
516 }
517 ControlMessage::PublishOk(m) => {
518 m.request_id.encode(buf);
519 m.track_alias.encode(buf);
520 buf.put_u8(m.forward as u8);
521 KeyValuePair::encode_list(&m.parameters, buf);
522 }
523 ControlMessage::PublishError(m) => {
524 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
525 return Err(CodecError::ReasonPhraseTooLong);
526 }
527 m.request_id.encode(buf);
528 m.error_code.encode(buf);
529 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
530 buf.put_slice(&m.reason_phrase);
531 }
532 ControlMessage::PublishDone(m) => {
533 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
534 return Err(CodecError::ReasonPhraseTooLong);
535 }
536 m.request_id.encode(buf);
537 m.status_code.encode(buf);
538 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
539 buf.put_slice(&m.reason_phrase);
540 }
541 ControlMessage::PublishNamespace(m) => {
542 m.request_id.encode(buf);
543 m.track_namespace.encode(buf);
544 KeyValuePair::encode_list(&m.parameters, buf);
545 }
546 ControlMessage::PublishNamespaceOk(m) => {
547 m.request_id.encode(buf);
548 KeyValuePair::encode_list(&m.parameters, buf);
549 }
550 ControlMessage::PublishNamespaceError(m) => {
551 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
552 return Err(CodecError::ReasonPhraseTooLong);
553 }
554 m.request_id.encode(buf);
555 m.error_code.encode(buf);
556 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
557 buf.put_slice(&m.reason_phrase);
558 }
559 ControlMessage::PublishNamespaceDone(m) => {
560 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
561 return Err(CodecError::ReasonPhraseTooLong);
562 }
563 m.request_id.encode(buf);
564 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
565 buf.put_slice(&m.reason_phrase);
566 }
567 ControlMessage::PublishNamespaceCancel(m) => {
568 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
569 return Err(CodecError::ReasonPhraseTooLong);
570 }
571 m.request_id.encode(buf);
572 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
573 buf.put_slice(&m.reason_phrase);
574 }
575 ControlMessage::SubscribeNamespace(m) => {
576 m.request_id.encode(buf);
577 m.track_namespace.encode(buf);
578 KeyValuePair::encode_list(&m.parameters, buf);
579 }
580 ControlMessage::SubscribeNamespaceOk(m) => {
581 m.request_id.encode(buf);
582 KeyValuePair::encode_list(&m.parameters, buf);
583 }
584 ControlMessage::SubscribeNamespaceError(m) => {
585 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
586 return Err(CodecError::ReasonPhraseTooLong);
587 }
588 m.request_id.encode(buf);
589 m.error_code.encode(buf);
590 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
591 buf.put_slice(&m.reason_phrase);
592 }
593 ControlMessage::UnsubscribeNamespace(m) => {
594 m.request_id.encode(buf);
595 m.track_namespace.encode(buf);
596 }
597 ControlMessage::Fetch(m) => {
598 m.request_id.encode(buf);
599 m.track_namespace.encode(buf);
600 VarInt::from_u64(m.track_name.len() as u64).unwrap().encode(buf);
601 buf.put_slice(&m.track_name);
602 m.start_group.encode(buf);
603 m.start_object.encode(buf);
604 match &m.end_group {
606 None => VarInt::from_u64(0).unwrap().encode(buf),
607 Some(v) => VarInt::from_u64(v.into_inner() + 1).unwrap().encode(buf),
608 }
609 match &m.priority {
611 None => VarInt::from_u64(0).unwrap().encode(buf),
612 Some(v) => VarInt::from_u64(*v as u64 + 1).unwrap().encode(buf),
613 }
614 KeyValuePair::encode_list(&m.parameters, buf);
615 }
616 ControlMessage::FetchOk(m) => {
617 m.request_id.encode(buf);
618 buf.put_u8(m.group_order as u8);
619 KeyValuePair::encode_list(&m.parameters, buf);
620 }
621 ControlMessage::FetchError(m) => {
622 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
623 return Err(CodecError::ReasonPhraseTooLong);
624 }
625 m.request_id.encode(buf);
626 m.error_code.encode(buf);
627 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
628 buf.put_slice(&m.reason_phrase);
629 }
630 ControlMessage::FetchCancel(m) => {
631 m.request_id.encode(buf);
632 }
633 ControlMessage::TrackStatus(m) => {
634 m.request_id.encode(buf);
635 m.track_namespace.encode(buf);
636 VarInt::from_u64(m.track_name.len() as u64).unwrap().encode(buf);
637 buf.put_slice(&m.track_name);
638 KeyValuePair::encode_list(&m.parameters, buf);
639 }
640 ControlMessage::TrackStatusOk(m) => {
641 m.request_id.encode(buf);
642 m.status_code.encode(buf);
643 match &m.largest_location {
645 None => buf.put_u8(0),
646 Some(loc) => {
647 buf.put_u8(1);
648 loc.encode(buf);
649 }
650 }
651 KeyValuePair::encode_list(&m.parameters, buf);
652 }
653 ControlMessage::TrackStatusError(m) => {
654 if m.reason_phrase.len() > MAX_REASON_PHRASE_LENGTH {
655 return Err(CodecError::ReasonPhraseTooLong);
656 }
657 m.request_id.encode(buf);
658 m.error_code.encode(buf);
659 VarInt::from_u64(m.reason_phrase.len() as u64).unwrap().encode(buf);
660 buf.put_slice(&m.reason_phrase);
661 }
662 }
663 Ok(())
664 }
665
666 fn decode_payload(msg_type: MessageType, buf: &mut impl Buf) -> Result<Self, CodecError> {
667 match msg_type {
668 MessageType::ClientSetup => {
669 let num_versions = VarInt::decode(buf)?.into_inner() as usize;
670 let mut supported_versions = Vec::with_capacity(num_versions);
671 for _ in 0..num_versions {
672 supported_versions.push(VarInt::decode(buf)?);
673 }
674 let parameters = KeyValuePair::decode_list(buf)?;
675 Ok(ControlMessage::ClientSetup(ClientSetup { supported_versions, parameters }))
676 }
677 MessageType::ServerSetup => {
678 let selected_version = VarInt::decode(buf)?;
679 let parameters = KeyValuePair::decode_list(buf)?;
680 Ok(ControlMessage::ServerSetup(ServerSetup { selected_version, parameters }))
681 }
682 MessageType::GoAway => {
683 let uri_len = VarInt::decode(buf)?.into_inner() as usize;
684 if buf.remaining() < uri_len {
685 return Err(CodecError::UnexpectedEnd);
686 }
687 let mut uri = vec![0u8; uri_len];
688 buf.copy_to_slice(&mut uri);
689 Ok(ControlMessage::GoAway(GoAway { new_session_uri: uri }))
690 }
691 MessageType::MaxRequestId => {
692 let request_id = VarInt::decode(buf)?;
693 Ok(ControlMessage::MaxRequestId(MaxRequestIdMsg { request_id }))
694 }
695 MessageType::RequestsBlocked => {
696 let maximum_request_id = VarInt::decode(buf)?;
697 Ok(ControlMessage::RequestsBlocked(RequestsBlocked { maximum_request_id }))
698 }
699 MessageType::Subscribe => {
700 let request_id = VarInt::decode(buf)?;
701 let track_namespace = TrackNamespace::decode(buf)?;
702 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
703 if buf.remaining() < track_name_len {
704 return Err(CodecError::UnexpectedEnd);
705 }
706 let mut track_name = vec![0u8; track_name_len];
707 buf.copy_to_slice(&mut track_name);
708 if buf.remaining() < 4 {
709 return Err(CodecError::UnexpectedEnd);
710 }
711 let subscriber_priority = buf.get_u8();
712 let group_order =
713 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
714 let forward_val = buf.get_u8();
715 let forward = match forward_val {
716 0 => Forward::DontForward,
717 1 => Forward::Forward,
718 _ => return Err(CodecError::InvalidField),
719 };
720 let filter_val = buf.get_u8();
721 let filter_type =
722 FilterType::from_u8(filter_val).ok_or(CodecError::InvalidField)?;
723 let start_location = match filter_type {
724 FilterType::AbsoluteStart | FilterType::AbsoluteRange => {
725 Some(Location::decode(buf)?)
726 }
727 _ => None,
728 };
729 let end_group = match filter_type {
730 FilterType::AbsoluteRange => Some(VarInt::decode(buf)?),
731 _ => None,
732 };
733 let parameters = KeyValuePair::decode_list(buf)?;
734 Ok(ControlMessage::Subscribe(Subscribe {
735 request_id,
736 track_namespace,
737 track_name,
738 subscriber_priority,
739 group_order,
740 forward,
741 filter_type,
742 start_location,
743 end_group,
744 parameters,
745 }))
746 }
747 MessageType::SubscribeOk => {
748 let request_id = VarInt::decode(buf)?;
749 let track_alias = VarInt::decode(buf)?;
750 let expires = VarInt::decode(buf)?;
751 if buf.remaining() < 2 {
752 return Err(CodecError::UnexpectedEnd);
753 }
754 let group_order =
755 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
756 let content_exists_val = buf.get_u8();
757 let content_exists = match content_exists_val {
758 0 => ContentExists::NoLargestLocation,
759 1 => ContentExists::HasLargestLocation,
760 _ => return Err(CodecError::InvalidField),
761 };
762 let largest_location = if content_exists == ContentExists::HasLargestLocation {
763 Some(Location::decode(buf)?)
764 } else {
765 None
766 };
767 let parameters = KeyValuePair::decode_list(buf)?;
768 Ok(ControlMessage::SubscribeOk(SubscribeOk {
769 request_id,
770 track_alias,
771 expires,
772 group_order,
773 content_exists,
774 largest_location,
775 parameters,
776 }))
777 }
778 MessageType::SubscribeError => {
779 let request_id = VarInt::decode(buf)?;
780 let error_code = VarInt::decode(buf)?;
781 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
782 if buf.remaining() < reason_len {
783 return Err(CodecError::UnexpectedEnd);
784 }
785 let mut reason_phrase = vec![0u8; reason_len];
786 buf.copy_to_slice(&mut reason_phrase);
787 Ok(ControlMessage::SubscribeError(SubscribeError {
788 request_id,
789 error_code,
790 reason_phrase,
791 }))
792 }
793 MessageType::SubscribeUpdate => {
794 let request_id = VarInt::decode(buf)?;
795 let subscription_request_id = VarInt::decode(buf)?;
796 let start_location = Location::decode(buf)?;
797 let end_group = VarInt::decode(buf)?;
798 if buf.remaining() < 2 {
799 return Err(CodecError::UnexpectedEnd);
800 }
801 let subscriber_priority = buf.get_u8();
802 let forward_val = buf.get_u8();
803 let forward = match forward_val {
804 0 => Forward::DontForward,
805 1 => Forward::Forward,
806 _ => return Err(CodecError::InvalidField),
807 };
808 let parameters = KeyValuePair::decode_list(buf)?;
809 Ok(ControlMessage::SubscribeUpdate(SubscribeUpdate {
810 request_id,
811 subscription_request_id,
812 start_location,
813 end_group,
814 subscriber_priority,
815 forward,
816 parameters,
817 }))
818 }
819 MessageType::Unsubscribe => {
820 let request_id = VarInt::decode(buf)?;
821 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
822 }
823 MessageType::Publish => {
824 let request_id = VarInt::decode(buf)?;
825 let track_namespace = TrackNamespace::decode(buf)?;
826 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
827 if buf.remaining() < track_name_len {
828 return Err(CodecError::UnexpectedEnd);
829 }
830 let mut track_name = vec![0u8; track_name_len];
831 buf.copy_to_slice(&mut track_name);
832 if buf.remaining() < 1 {
833 return Err(CodecError::UnexpectedEnd);
834 }
835 let forward_val = buf.get_u8();
836 let forward = match forward_val {
837 0 => Forward::DontForward,
838 1 => Forward::Forward,
839 _ => return Err(CodecError::InvalidField),
840 };
841 let parameters = KeyValuePair::decode_list(buf)?;
842 Ok(ControlMessage::Publish(Publish {
843 request_id,
844 track_namespace,
845 track_name,
846 forward,
847 parameters,
848 }))
849 }
850 MessageType::PublishOk => {
851 let request_id = VarInt::decode(buf)?;
852 let track_alias = VarInt::decode(buf)?;
853 if buf.remaining() < 1 {
854 return Err(CodecError::UnexpectedEnd);
855 }
856 let forward_val = buf.get_u8();
857 let forward = match forward_val {
858 0 => Forward::DontForward,
859 1 => Forward::Forward,
860 _ => return Err(CodecError::InvalidField),
861 };
862 let parameters = KeyValuePair::decode_list(buf)?;
863 Ok(ControlMessage::PublishOk(PublishOk {
864 request_id,
865 track_alias,
866 forward,
867 parameters,
868 }))
869 }
870 MessageType::PublishError => {
871 let request_id = VarInt::decode(buf)?;
872 let error_code = VarInt::decode(buf)?;
873 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
874 if buf.remaining() < reason_len {
875 return Err(CodecError::UnexpectedEnd);
876 }
877 let mut reason_phrase = vec![0u8; reason_len];
878 buf.copy_to_slice(&mut reason_phrase);
879 Ok(ControlMessage::PublishError(PublishError {
880 request_id,
881 error_code,
882 reason_phrase,
883 }))
884 }
885 MessageType::PublishDone => {
886 let request_id = VarInt::decode(buf)?;
887 let status_code = VarInt::decode(buf)?;
888 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
889 if buf.remaining() < reason_len {
890 return Err(CodecError::UnexpectedEnd);
891 }
892 let mut reason_phrase = vec![0u8; reason_len];
893 buf.copy_to_slice(&mut reason_phrase);
894 Ok(ControlMessage::PublishDone(PublishDone {
895 request_id,
896 status_code,
897 reason_phrase,
898 }))
899 }
900 MessageType::PublishNamespace => {
901 let request_id = VarInt::decode(buf)?;
902 let track_namespace = TrackNamespace::decode(buf)?;
903 let parameters = KeyValuePair::decode_list(buf)?;
904 Ok(ControlMessage::PublishNamespace(PublishNamespaceMsg {
905 request_id,
906 track_namespace,
907 parameters,
908 }))
909 }
910 MessageType::PublishNamespaceOk => {
911 let request_id = VarInt::decode(buf)?;
912 let parameters = KeyValuePair::decode_list(buf)?;
913 Ok(ControlMessage::PublishNamespaceOk(PublishNamespaceOk {
914 request_id,
915 parameters,
916 }))
917 }
918 MessageType::PublishNamespaceError => {
919 let request_id = VarInt::decode(buf)?;
920 let error_code = VarInt::decode(buf)?;
921 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
922 if buf.remaining() < reason_len {
923 return Err(CodecError::UnexpectedEnd);
924 }
925 let mut reason_phrase = vec![0u8; reason_len];
926 buf.copy_to_slice(&mut reason_phrase);
927 Ok(ControlMessage::PublishNamespaceError(PublishNamespaceError {
928 request_id,
929 error_code,
930 reason_phrase,
931 }))
932 }
933 MessageType::PublishNamespaceDone => {
934 let request_id = VarInt::decode(buf)?;
935 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
936 if buf.remaining() < reason_len {
937 return Err(CodecError::UnexpectedEnd);
938 }
939 let mut reason_phrase = vec![0u8; reason_len];
940 buf.copy_to_slice(&mut reason_phrase);
941 Ok(ControlMessage::PublishNamespaceDone(PublishNamespaceDone {
942 request_id,
943 reason_phrase,
944 }))
945 }
946 MessageType::PublishNamespaceCancel => {
947 let request_id = VarInt::decode(buf)?;
948 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
949 if buf.remaining() < reason_len {
950 return Err(CodecError::UnexpectedEnd);
951 }
952 let mut reason_phrase = vec![0u8; reason_len];
953 buf.copy_to_slice(&mut reason_phrase);
954 Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
955 request_id,
956 reason_phrase,
957 }))
958 }
959 MessageType::SubscribeNamespace => {
960 let request_id = VarInt::decode(buf)?;
961 let track_namespace = TrackNamespace::decode(buf)?;
962 let parameters = KeyValuePair::decode_list(buf)?;
963 Ok(ControlMessage::SubscribeNamespace(SubscribeNamespace {
964 request_id,
965 track_namespace,
966 parameters,
967 }))
968 }
969 MessageType::SubscribeNamespaceOk => {
970 let request_id = VarInt::decode(buf)?;
971 let parameters = KeyValuePair::decode_list(buf)?;
972 Ok(ControlMessage::SubscribeNamespaceOk(SubscribeNamespaceOk {
973 request_id,
974 parameters,
975 }))
976 }
977 MessageType::SubscribeNamespaceError => {
978 let request_id = VarInt::decode(buf)?;
979 let error_code = VarInt::decode(buf)?;
980 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
981 if buf.remaining() < reason_len {
982 return Err(CodecError::UnexpectedEnd);
983 }
984 let mut reason_phrase = vec![0u8; reason_len];
985 buf.copy_to_slice(&mut reason_phrase);
986 Ok(ControlMessage::SubscribeNamespaceError(SubscribeNamespaceError {
987 request_id,
988 error_code,
989 reason_phrase,
990 }))
991 }
992 MessageType::UnsubscribeNamespace => {
993 let request_id = VarInt::decode(buf)?;
994 let track_namespace = TrackNamespace::decode(buf)?;
995 Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace {
996 request_id,
997 track_namespace,
998 }))
999 }
1000 MessageType::Fetch => {
1001 let request_id = VarInt::decode(buf)?;
1002 let track_namespace = TrackNamespace::decode(buf)?;
1003 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1004 if buf.remaining() < track_name_len {
1005 return Err(CodecError::UnexpectedEnd);
1006 }
1007 let mut track_name = vec![0u8; track_name_len];
1008 buf.copy_to_slice(&mut track_name);
1009 let start_group = VarInt::decode(buf)?;
1010 let start_object = VarInt::decode(buf)?;
1011 let end_group_raw = VarInt::decode(buf)?.into_inner();
1012 let end_group = if end_group_raw == 0 {
1013 None
1014 } else {
1015 Some(VarInt::from_u64(end_group_raw - 1).unwrap())
1016 };
1017 let priority_raw = VarInt::decode(buf)?.into_inner();
1018 let priority =
1019 if priority_raw == 0 { None } else { Some((priority_raw - 1) as u8) };
1020 let parameters = KeyValuePair::decode_list(buf)?;
1021 Ok(ControlMessage::Fetch(Fetch {
1022 request_id,
1023 track_namespace,
1024 track_name,
1025 start_group,
1026 start_object,
1027 end_group,
1028 priority,
1029 parameters,
1030 }))
1031 }
1032 MessageType::FetchOk => {
1033 let request_id = VarInt::decode(buf)?;
1034 if buf.remaining() < 1 {
1035 return Err(CodecError::UnexpectedEnd);
1036 }
1037 let group_order =
1038 GroupOrder::from_u8(buf.get_u8()).ok_or(CodecError::InvalidField)?;
1039 let parameters = KeyValuePair::decode_list(buf)?;
1040 Ok(ControlMessage::FetchOk(FetchOk { request_id, group_order, parameters }))
1041 }
1042 MessageType::FetchError => {
1043 let request_id = VarInt::decode(buf)?;
1044 let error_code = VarInt::decode(buf)?;
1045 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1046 if buf.remaining() < reason_len {
1047 return Err(CodecError::UnexpectedEnd);
1048 }
1049 let mut reason_phrase = vec![0u8; reason_len];
1050 buf.copy_to_slice(&mut reason_phrase);
1051 Ok(ControlMessage::FetchError(FetchError { request_id, error_code, reason_phrase }))
1052 }
1053 MessageType::FetchCancel => {
1054 let request_id = VarInt::decode(buf)?;
1055 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
1056 }
1057 MessageType::TrackStatus => {
1058 let request_id = VarInt::decode(buf)?;
1059 let track_namespace = TrackNamespace::decode(buf)?;
1060 let track_name_len = VarInt::decode(buf)?.into_inner() as usize;
1061 if buf.remaining() < track_name_len {
1062 return Err(CodecError::UnexpectedEnd);
1063 }
1064 let mut track_name = vec![0u8; track_name_len];
1065 buf.copy_to_slice(&mut track_name);
1066 let parameters = KeyValuePair::decode_list(buf)?;
1067 Ok(ControlMessage::TrackStatus(TrackStatusMsg {
1068 request_id,
1069 track_namespace,
1070 track_name,
1071 parameters,
1072 }))
1073 }
1074 MessageType::TrackStatusOk => {
1075 let request_id = VarInt::decode(buf)?;
1076 let status_code = VarInt::decode(buf)?;
1077 if buf.remaining() < 1 {
1078 return Err(CodecError::UnexpectedEnd);
1079 }
1080 let content_exists = buf.get_u8();
1081 let largest_location =
1082 if content_exists == 1 { Some(Location::decode(buf)?) } else { None };
1083 let parameters = KeyValuePair::decode_list(buf)?;
1084 Ok(ControlMessage::TrackStatusOk(TrackStatusOk {
1085 request_id,
1086 status_code,
1087 largest_location,
1088 parameters,
1089 }))
1090 }
1091 MessageType::TrackStatusError => {
1092 let request_id = VarInt::decode(buf)?;
1093 let error_code = VarInt::decode(buf)?;
1094 let reason_len = VarInt::decode(buf)?.into_inner() as usize;
1095 if buf.remaining() < reason_len {
1096 return Err(CodecError::UnexpectedEnd);
1097 }
1098 let mut reason_phrase = vec![0u8; reason_len];
1099 buf.copy_to_slice(&mut reason_phrase);
1100 Ok(ControlMessage::TrackStatusError(TrackStatusError {
1101 request_id,
1102 error_code,
1103 reason_phrase,
1104 }))
1105 }
1106 }
1107 }
1108
1109 pub fn message_type(&self) -> MessageType {
1111 match self {
1112 ControlMessage::ClientSetup(_) => MessageType::ClientSetup,
1113 ControlMessage::ServerSetup(_) => MessageType::ServerSetup,
1114 ControlMessage::GoAway(_) => MessageType::GoAway,
1115 ControlMessage::MaxRequestId(_) => MessageType::MaxRequestId,
1116 ControlMessage::RequestsBlocked(_) => MessageType::RequestsBlocked,
1117 ControlMessage::Subscribe(_) => MessageType::Subscribe,
1118 ControlMessage::SubscribeOk(_) => MessageType::SubscribeOk,
1119 ControlMessage::SubscribeError(_) => MessageType::SubscribeError,
1120 ControlMessage::SubscribeUpdate(_) => MessageType::SubscribeUpdate,
1121 ControlMessage::Unsubscribe(_) => MessageType::Unsubscribe,
1122 ControlMessage::Publish(_) => MessageType::Publish,
1123 ControlMessage::PublishOk(_) => MessageType::PublishOk,
1124 ControlMessage::PublishError(_) => MessageType::PublishError,
1125 ControlMessage::PublishDone(_) => MessageType::PublishDone,
1126 ControlMessage::PublishNamespace(_) => MessageType::PublishNamespace,
1127 ControlMessage::PublishNamespaceOk(_) => MessageType::PublishNamespaceOk,
1128 ControlMessage::PublishNamespaceError(_) => MessageType::PublishNamespaceError,
1129 ControlMessage::PublishNamespaceDone(_) => MessageType::PublishNamespaceDone,
1130 ControlMessage::PublishNamespaceCancel(_) => MessageType::PublishNamespaceCancel,
1131 ControlMessage::SubscribeNamespace(_) => MessageType::SubscribeNamespace,
1132 ControlMessage::SubscribeNamespaceOk(_) => MessageType::SubscribeNamespaceOk,
1133 ControlMessage::SubscribeNamespaceError(_) => MessageType::SubscribeNamespaceError,
1134 ControlMessage::UnsubscribeNamespace(_) => MessageType::UnsubscribeNamespace,
1135 ControlMessage::Fetch(_) => MessageType::Fetch,
1136 ControlMessage::FetchOk(_) => MessageType::FetchOk,
1137 ControlMessage::FetchError(_) => MessageType::FetchError,
1138 ControlMessage::FetchCancel(_) => MessageType::FetchCancel,
1139 ControlMessage::TrackStatus(_) => MessageType::TrackStatus,
1140 ControlMessage::TrackStatusOk(_) => MessageType::TrackStatusOk,
1141 ControlMessage::TrackStatusError(_) => MessageType::TrackStatusError,
1142 }
1143 }
1144}