1use std::collections::HashMap;
2
3use crate::draft14::fetch::{FetchError, FetchStateMachine};
4use crate::draft14::namespace::{
5 NamespaceError, PublishNamespaceStateMachine, SubscribeNamespaceStateMachine,
6};
7use crate::draft14::publish::{PublishError as PublishFlowError, PublishStateMachine};
8use crate::draft14::session::request_id::{RequestIdAllocator, RequestIdError, Role};
9use crate::draft14::session::setup::{self, SetupError};
10use crate::draft14::session::state::{SessionError, SessionState, SessionStateMachine};
11use crate::draft14::subscription::{SubscriptionError, SubscriptionStateMachine};
12use crate::draft14::track_status::{TrackStatusError, TrackStatusStateMachine};
13use moqtap_codec::draft14::message::{
14 self, ClientSetup, ControlMessage, Fetch, FetchCancel, GoAway, MaxRequestId, PublishDone,
15 PublishNamespace, PublishNamespaceCancel, PublishNamespaceDone, PublishNamespaceError,
16 PublishNamespaceOk, RequestsBlocked, ServerSetup, Subscribe, SubscribeError,
17 SubscribeNamespace, SubscribeNamespaceError, SubscribeNamespaceOk, SubscribeOk,
18 SubscribeUpdate, Unsubscribe, UnsubscribeNamespace,
19};
20use moqtap_codec::kvp::{KeyValuePair, KvpValue};
21use moqtap_codec::types::*;
22use moqtap_codec::varint::VarInt;
23
24#[derive(Debug, thiserror::Error)]
26pub enum EndpointError {
27 #[error("session error: {0}")]
29 Session(#[from] SessionError),
30 #[error("request ID error: {0}")]
32 RequestId(#[from] RequestIdError),
33 #[error("subscription error: {0}")]
35 Subscription(#[from] SubscriptionError),
36 #[error("fetch error: {0}")]
38 Fetch(#[from] FetchError),
39 #[error("namespace error: {0}")]
41 Namespace(#[from] NamespaceError),
42 #[error("track status error: {0}")]
44 TrackStatus(#[from] TrackStatusError),
45 #[error("publish flow error: {0}")]
47 PublishFlow(#[from] PublishFlowError),
48 #[error("setup error: {0}")]
50 Setup(#[from] SetupError),
51 #[error("unknown request ID: {0}")]
53 UnknownRequest(u64),
54 #[error("session not active")]
56 NotActive,
57 #[error("session is draining, no new requests allowed")]
59 Draining,
60}
61
62pub struct Endpoint {
65 role: Role,
66 session: SessionStateMachine,
67 request_ids: RequestIdAllocator,
68 advertised_max_id: u64,
70 subscriptions: HashMap<u64, SubscriptionStateMachine>,
71 fetches: HashMap<u64, FetchStateMachine>,
72 subscribe_namespaces: HashMap<u64, SubscribeNamespaceStateMachine>,
73 publish_namespaces: HashMap<u64, PublishNamespaceStateMachine>,
74 track_statuses: HashMap<u64, TrackStatusStateMachine>,
75 publishes: HashMap<u64, PublishStateMachine>,
76 negotiated_version: Option<VarInt>,
77 offered_versions: Vec<VarInt>,
78 goaway_uri: Option<Vec<u8>>,
79}
80
81impl Endpoint {
82 pub fn new(role: Role) -> Self {
84 Self {
85 role,
86 session: SessionStateMachine::new(),
87 request_ids: RequestIdAllocator::new(role),
88 advertised_max_id: 0,
89 subscriptions: HashMap::new(),
90 fetches: HashMap::new(),
91 subscribe_namespaces: HashMap::new(),
92 publish_namespaces: HashMap::new(),
93 track_statuses: HashMap::new(),
94 publishes: HashMap::new(),
95 negotiated_version: None,
96 offered_versions: Vec::new(),
97 goaway_uri: None,
98 }
99 }
100
101 pub fn role(&self) -> Role {
105 self.role
106 }
107
108 pub fn session_state(&self) -> SessionState {
110 self.session.state()
111 }
112
113 pub fn negotiated_version(&self) -> Option<VarInt> {
115 self.negotiated_version
116 }
117
118 pub fn goaway_uri(&self) -> Option<&[u8]> {
120 self.goaway_uri.as_deref()
121 }
122
123 pub fn is_blocked(&self) -> bool {
125 self.request_ids.is_blocked()
126 }
127
128 pub fn active_subscription_count(&self) -> usize {
130 self.subscriptions.len()
131 }
132
133 pub fn active_fetch_count(&self) -> usize {
135 self.fetches.len()
136 }
137
138 pub fn active_subscribe_namespace_count(&self) -> usize {
140 self.subscribe_namespaces.len()
141 }
142
143 pub fn active_publish_namespace_count(&self) -> usize {
145 self.publish_namespaces.len()
146 }
147
148 pub fn active_track_status_count(&self) -> usize {
150 self.track_statuses.len()
151 }
152
153 pub fn active_publish_count(&self) -> usize {
155 self.publishes.len()
156 }
157
158 pub fn connect(&mut self) -> Result<(), EndpointError> {
162 self.session.on_connect()?;
163 Ok(())
164 }
165
166 pub fn close(&mut self) -> Result<(), EndpointError> {
168 self.session.on_close()?;
169 Ok(())
170 }
171
172 pub fn send_client_setup(
176 &mut self,
177 versions: Vec<VarInt>,
178 parameters: Vec<KeyValuePair>,
179 ) -> Result<ControlMessage, EndpointError> {
180 self.offered_versions = versions.clone();
181 let msg = ClientSetup { supported_versions: versions, parameters };
182 setup::validate_client_setup(&msg)?;
183 Ok(ControlMessage::ClientSetup(msg))
184 }
185
186 pub fn receive_server_setup(&mut self, msg: &ServerSetup) -> Result<(), EndpointError> {
190 setup::validate_server_setup(msg)?;
191 let version = setup::negotiate_version(&self.offered_versions, msg.selected_version)?;
192 self.negotiated_version = Some(version);
193 self.session.on_setup_complete()?;
194 for param in &msg.parameters {
196 if param.key == VarInt::from_u64(0x02).unwrap() {
197 if let KvpValue::Varint(v) = ¶m.value {
198 self.request_ids.update_max(v.into_inner())?;
199 }
200 }
201 }
202 Ok(())
203 }
204
205 pub fn receive_client_setup_and_respond(
209 &mut self,
210 client_setup: &ClientSetup,
211 selected_version: VarInt,
212 ) -> Result<ControlMessage, EndpointError> {
213 setup::validate_client_setup(client_setup)?;
214 let version = setup::negotiate_version(&client_setup.supported_versions, selected_version)?;
215 self.negotiated_version = Some(version);
216 self.session.on_setup_complete()?;
217 let msg = ServerSetup { selected_version: version, parameters: vec![] };
218 Ok(ControlMessage::ServerSetup(msg))
219 }
220
221 pub fn receive_max_request_id(&mut self, msg: &MaxRequestId) -> Result<(), EndpointError> {
225 self.request_ids.update_max(msg.request_id.into_inner())?;
226 Ok(())
227 }
228
229 pub fn send_max_request_id(&mut self, max_id: VarInt) -> Result<ControlMessage, EndpointError> {
232 let new_val = max_id.into_inner();
233 if new_val <= self.advertised_max_id && self.advertised_max_id > 0 {
234 return Err(EndpointError::RequestId(RequestIdError::Decreased(
235 self.advertised_max_id,
236 new_val,
237 )));
238 }
239 self.advertised_max_id = new_val;
240 Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id: max_id }))
241 }
242
243 pub fn send_requests_blocked(&self) -> Result<ControlMessage, EndpointError> {
247 let max_id = self.request_ids.max_id();
248 Ok(ControlMessage::RequestsBlocked(RequestsBlocked {
249 maximum_request_id: VarInt::from_u64(max_id).unwrap(),
250 }))
251 }
252
253 pub fn receive_requests_blocked(&self, _msg: &RequestsBlocked) -> Result<(), EndpointError> {
257 Ok(())
260 }
261
262 pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
266 self.session.on_goaway()?;
267 self.goaway_uri = Some(msg.new_session_uri.clone());
268 Ok(())
269 }
270
271 fn require_active_or_err(&self) -> Result<(), EndpointError> {
274 match self.session.state() {
275 SessionState::Active => Ok(()),
276 SessionState::Draining => Err(EndpointError::Draining),
277 _ => Err(EndpointError::NotActive),
278 }
279 }
280
281 pub fn subscribe(
284 &mut self,
285 track_namespace: TrackNamespace,
286 track_name: Vec<u8>,
287 subscriber_priority: u8,
288 group_order: GroupOrder,
289 filter_type: FilterType,
290 ) -> Result<(VarInt, ControlMessage), EndpointError> {
291 self.require_active_or_err()?;
292 let req_id = self.request_ids.allocate()?;
293
294 let mut sm = SubscriptionStateMachine::new();
295 sm.on_subscribe_sent()?;
296 self.subscriptions.insert(req_id.into_inner(), sm);
297
298 let msg = ControlMessage::Subscribe(Subscribe {
299 request_id: req_id,
300 track_namespace,
301 track_name,
302 subscriber_priority,
303 group_order,
304 forward: Forward::Forward,
305 filter_type,
306 start_location: None,
307 end_group: None,
308 parameters: vec![],
309 });
310 Ok((req_id, msg))
311 }
312
313 pub fn receive_subscribe_ok(&mut self, msg: &SubscribeOk) -> Result<(), EndpointError> {
315 let id = msg.request_id.into_inner();
316 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
317 sm.on_subscribe_ok()?;
318 Ok(())
319 }
320
321 pub fn receive_subscribe_error(&mut self, msg: &SubscribeError) -> Result<(), EndpointError> {
323 let id = msg.request_id.into_inner();
324 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
325 sm.on_subscribe_error()?;
326 Ok(())
327 }
328
329 pub fn unsubscribe(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
331 let id = request_id.into_inner();
332 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
333 sm.on_unsubscribe()?;
334 Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
335 }
336
337 pub fn receive_subscribe_update(&mut self, msg: &SubscribeUpdate) -> Result<(), EndpointError> {
339 let id = msg.subscription_request_id.into_inner();
340 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
341 sm.on_subscribe_update()?;
342 Ok(())
343 }
344
345 pub fn subscribe_update(
348 &mut self,
349 subscription_request_id: VarInt,
350 start_location: Location,
351 end_group: VarInt,
352 subscriber_priority: u8,
353 forward: Forward,
354 parameters: Vec<KeyValuePair>,
355 ) -> Result<(VarInt, ControlMessage), EndpointError> {
356 self.require_active_or_err()?;
357 let sub_id = subscription_request_id.into_inner();
358 let sm =
359 self.subscriptions.get_mut(&sub_id).ok_or(EndpointError::UnknownRequest(sub_id))?;
360 sm.on_subscribe_update()?;
361 let req_id = self.request_ids.allocate()?;
362 let msg = ControlMessage::SubscribeUpdate(SubscribeUpdate {
363 request_id: req_id,
364 subscription_request_id,
365 start_location,
366 end_group,
367 subscriber_priority,
368 forward,
369 parameters,
370 });
371 Ok((req_id, msg))
372 }
373
374 pub fn receive_publish_done(&mut self, msg: &PublishDone) -> Result<(), EndpointError> {
376 let id = msg.request_id.into_inner();
377 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
378 sm.on_publish_done()?;
379 Ok(())
380 }
381
382 pub fn fetch(
386 &mut self,
387 track_namespace: TrackNamespace,
388 track_name: Vec<u8>,
389 start_group: VarInt,
390 start_object: VarInt,
391 ) -> Result<(VarInt, ControlMessage), EndpointError> {
392 self.require_active_or_err()?;
393 let req_id = self.request_ids.allocate()?;
394
395 let mut sm = FetchStateMachine::new();
396 sm.on_fetch_sent()?;
397 self.fetches.insert(req_id.into_inner(), sm);
398
399 let msg = ControlMessage::Fetch(Fetch {
400 request_id: req_id,
401 subscriber_priority: 128,
402 group_order: GroupOrder::Ascending,
403 fetch_type: message::FetchType::Standalone,
404 fetch_payload: message::FetchPayload::Standalone {
405 track_namespace,
406 track_name,
407 start_group,
408 start_object,
409 end_group: VarInt::from_u64(0).unwrap(),
410 end_object: VarInt::from_u64(0).unwrap(),
411 },
412 parameters: vec![],
413 });
414 Ok((req_id, msg))
415 }
416
417 pub fn receive_fetch_ok(&mut self, msg: &message::FetchOk) -> Result<(), EndpointError> {
419 let id = msg.request_id.into_inner();
420 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
421 sm.on_fetch_ok()?;
422 Ok(())
423 }
424
425 pub fn receive_fetch_error(&mut self, msg: &message::FetchError) -> Result<(), EndpointError> {
427 let id = msg.request_id.into_inner();
428 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
429 sm.on_fetch_error()?;
430 Ok(())
431 }
432
433 pub fn fetch_cancel(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
435 let id = request_id.into_inner();
436 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
437 sm.on_fetch_cancel()?;
438 Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
439 }
440
441 pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
443 let id = request_id.into_inner();
444 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
445 sm.on_stream_fin()?;
446 Ok(())
447 }
448
449 pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
451 let id = request_id.into_inner();
452 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
453 sm.on_stream_reset()?;
454 Ok(())
455 }
456
457 pub fn subscribe_namespace(
461 &mut self,
462 track_namespace: TrackNamespace,
463 ) -> Result<(VarInt, ControlMessage), EndpointError> {
464 self.require_active_or_err()?;
465 let req_id = self.request_ids.allocate()?;
466
467 let mut sm = SubscribeNamespaceStateMachine::new();
468 sm.on_subscribe_namespace_sent()?;
469 self.subscribe_namespaces.insert(req_id.into_inner(), sm);
470
471 let msg = ControlMessage::SubscribeNamespace(SubscribeNamespace {
472 request_id: req_id,
473 track_namespace,
474 parameters: vec![],
475 });
476 Ok((req_id, msg))
477 }
478
479 pub fn receive_subscribe_namespace_ok(
481 &mut self,
482 msg: &SubscribeNamespaceOk,
483 ) -> Result<(), EndpointError> {
484 let id = msg.request_id.into_inner();
485 let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
486 sm.on_subscribe_namespace_ok()?;
487 Ok(())
488 }
489
490 pub fn receive_subscribe_namespace_error(
492 &mut self,
493 msg: &SubscribeNamespaceError,
494 ) -> Result<(), EndpointError> {
495 let id = msg.request_id.into_inner();
496 let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
497 sm.on_subscribe_namespace_error()?;
498 Ok(())
499 }
500
501 pub fn unsubscribe_namespace(
503 &mut self,
504 request_id: VarInt,
505 _track_namespace: TrackNamespace,
506 ) -> Result<ControlMessage, EndpointError> {
507 let id = request_id.into_inner();
508 let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
509 sm.on_unsubscribe_namespace()?;
510 let _ = request_id;
511 Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace {
512 track_namespace_prefix: _track_namespace,
513 }))
514 }
515
516 pub fn publish_namespace(
520 &mut self,
521 track_namespace: TrackNamespace,
522 ) -> Result<(VarInt, ControlMessage), EndpointError> {
523 self.require_active_or_err()?;
524 let req_id = self.request_ids.allocate()?;
525
526 let mut sm = PublishNamespaceStateMachine::new();
527 sm.on_publish_namespace_sent()?;
528 self.publish_namespaces.insert(req_id.into_inner(), sm);
529
530 let msg = ControlMessage::PublishNamespace(PublishNamespace {
531 request_id: req_id,
532 track_namespace,
533 parameters: vec![],
534 });
535 Ok((req_id, msg))
536 }
537
538 pub fn receive_publish_namespace_ok(
540 &mut self,
541 msg: &PublishNamespaceOk,
542 ) -> Result<(), EndpointError> {
543 let id = msg.request_id.into_inner();
544 let sm = self.publish_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
545 sm.on_publish_namespace_ok()?;
546 Ok(())
547 }
548
549 pub fn receive_publish_namespace_error(
551 &mut self,
552 msg: &PublishNamespaceError,
553 ) -> Result<(), EndpointError> {
554 let id = msg.request_id.into_inner();
555 let sm = self.publish_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
556 sm.on_publish_namespace_error()?;
557 Ok(())
558 }
559
560 pub fn receive_publish_namespace_done(
568 &mut self,
569 _msg: &PublishNamespaceDone,
570 ) -> Result<(), EndpointError> {
571 for sm in self.publish_namespaces.values_mut() {
572 let _ = sm.on_publish_namespace_done();
575 }
576 Ok(())
577 }
578
579 pub fn publish_namespace_cancel(
581 &mut self,
582 request_id: VarInt,
583 reason_phrase: Vec<u8>,
584 ) -> Result<ControlMessage, EndpointError> {
585 let id = request_id.into_inner();
586 let sm = self.publish_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
587 sm.on_publish_namespace_cancel()?;
588 Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
589 track_namespace: TrackNamespace(Vec::new()),
590 error_code: VarInt::from_u64(0).unwrap(),
591 reason_phrase,
592 }))
593 }
594
595 pub fn track_status(
599 &mut self,
600 track_namespace: TrackNamespace,
601 track_name: Vec<u8>,
602 ) -> Result<(VarInt, ControlMessage), EndpointError> {
603 self.require_active_or_err()?;
604 let req_id = self.request_ids.allocate()?;
605 let mut sm = TrackStatusStateMachine::new();
606 sm.on_track_status_sent()?;
607 self.track_statuses.insert(req_id.into_inner(), sm);
608 let msg = ControlMessage::TrackStatus(message::TrackStatus {
609 request_id: req_id,
610 track_namespace,
611 track_name,
612 subscriber_priority: 128,
613 group_order: GroupOrder::Ascending,
614 forward: Forward::Forward,
615 filter_type: FilterType::LargestObject,
616 start_location: None,
617 end_group: None,
618 parameters: vec![],
619 });
620 Ok((req_id, msg))
621 }
622
623 pub fn receive_track_status_ok(
625 &mut self,
626 msg: &message::TrackStatusOk,
627 ) -> Result<(), EndpointError> {
628 let id = msg.request_id.into_inner();
629 let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
630 sm.on_track_status_ok()?;
631 Ok(())
632 }
633
634 pub fn receive_track_status_error(
636 &mut self,
637 msg: &message::TrackStatusError,
638 ) -> Result<(), EndpointError> {
639 let id = msg.request_id.into_inner();
640 let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
641 sm.on_track_status_error()?;
642 Ok(())
643 }
644
645 pub fn publish(
649 &mut self,
650 track_namespace: TrackNamespace,
651 track_name: Vec<u8>,
652 forward: Forward,
653 ) -> Result<(VarInt, ControlMessage), EndpointError> {
654 self.require_active_or_err()?;
655 let req_id = self.request_ids.allocate()?;
656 let mut sm = PublishStateMachine::new();
657 sm.on_publish_sent()?;
658 self.publishes.insert(req_id.into_inner(), sm);
659 let msg = ControlMessage::Publish(message::Publish {
660 request_id: req_id,
661 track_namespace,
662 track_name,
663 track_alias: VarInt::from_u64(0).unwrap(),
664 group_order: GroupOrder::Ascending,
665 content_exists: ContentExists::NoLargestLocation,
666 largest_location: None,
667 forward,
668 parameters: vec![],
669 });
670 Ok((req_id, msg))
671 }
672
673 pub fn receive_publish_ok(&mut self, msg: &message::PublishOk) -> Result<(), EndpointError> {
675 let id = msg.request_id.into_inner();
676 let sm = self.publishes.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
677 sm.on_publish_ok()?;
678 Ok(())
679 }
680
681 pub fn send_publish_done(
683 &mut self,
684 request_id: VarInt,
685 status_code: VarInt,
686 reason_phrase: Vec<u8>,
687 ) -> Result<ControlMessage, EndpointError> {
688 let id = request_id.into_inner();
689 let sm = self.publishes.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
690 sm.on_publish_done_sent()?;
691 Ok(ControlMessage::PublishDone(PublishDone {
692 request_id,
693 status_code,
694 stream_count: VarInt::from_u64(0).unwrap(),
695 reason_phrase,
696 }))
697 }
698
699 pub fn send_publish_error(
704 &self,
705 request_id: VarInt,
706 error_code: VarInt,
707 reason_phrase: Vec<u8>,
708 ) -> Result<ControlMessage, EndpointError> {
709 Ok(ControlMessage::PublishError(message::PublishError {
710 request_id,
711 error_code,
712 reason_phrase,
713 }))
714 }
715
716 pub fn receive_publish_error(
720 &mut self,
721 msg: &message::PublishError,
722 ) -> Result<(), EndpointError> {
723 let id = msg.request_id.into_inner();
724 if let Some(sm) = self.publishes.get_mut(&id) {
726 sm.on_publish_error()?;
727 return Ok(());
728 }
729 if let Some(sm) = self.subscriptions.get_mut(&id) {
731 sm.on_subscribe_error()?;
732 }
733 Ok(())
734 }
735
736 pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
740 match msg {
741 ControlMessage::GoAway(ref m) => self.receive_goaway(m),
742 ControlMessage::MaxRequestId(ref m) => self.receive_max_request_id(m),
743 ControlMessage::RequestsBlocked(ref m) => self.receive_requests_blocked(m),
744 ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(m),
745 ControlMessage::SubscribeError(ref m) => self.receive_subscribe_error(m),
746 ControlMessage::SubscribeUpdate(ref m) => self.receive_subscribe_update(m),
747 ControlMessage::PublishDone(ref m) => self.receive_publish_done(m),
748 ControlMessage::PublishOk(ref m) => self.receive_publish_ok(m),
749 ControlMessage::PublishError(ref m) => self.receive_publish_error(m),
750 ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(m),
751 ControlMessage::FetchError(ref m) => self.receive_fetch_error(m),
752 ControlMessage::SubscribeNamespaceOk(ref m) => self.receive_subscribe_namespace_ok(m),
753 ControlMessage::SubscribeNamespaceError(ref m) => {
754 self.receive_subscribe_namespace_error(m)
755 }
756 ControlMessage::PublishNamespaceOk(ref m) => self.receive_publish_namespace_ok(m),
757 ControlMessage::PublishNamespaceError(ref m) => self.receive_publish_namespace_error(m),
758 ControlMessage::PublishNamespaceDone(ref m) => self.receive_publish_namespace_done(m),
759 ControlMessage::TrackStatusOk(ref m) => self.receive_track_status_ok(m),
760 ControlMessage::TrackStatusError(ref m) => self.receive_track_status_error(m),
761 _ => Ok(()),
762 }
763 }
764}