Skip to main content

moqtap_client/draft18/
endpoint.rs

1#![allow(missing_docs)]
2//! Draft-18 MoQT endpoint.
3//!
4//! Major changes from draft-17:
5//!
6//! * `Required Request ID Delta` field removed from every request message.
7//! * `SubscribeNamespace` lost its `subscribe_options` field; namespace
8//!   subscriptions only produce NAMESPACE / NAMESPACE_DONE. The new
9//!   `SubscribeTracks` request type subscribes to PUBLISH messages and
10//!   carries the FORWARD parameter.
11//! * `PublishOk` collapsed into `RequestOk` (REQUEST_OK at type 0x07);
12//!   `RequestOk` gains a trailing Track Properties block.
13//! * `GoAway` gains an optional `request_id` (control stream only).
14//! * `RequestError` gains an optional Redirect structure (carried when
15//!   `error_code == REDIRECT`).
16
17use std::collections::HashMap;
18
19use crate::draft18::fetch::{FetchError, FetchStateMachine};
20use crate::draft18::namespace::{
21    NamespaceError, PublishNamespaceStateMachine, SubscribeNamespaceStateMachine,
22};
23use crate::draft18::publish::{PublishError as PublishFlowError, PublishStateMachine};
24use crate::draft18::session::request_id::{RequestIdAllocator, RequestIdError, Role};
25use crate::draft18::session::setup::{self, SetupError};
26use crate::draft18::session::state::{SessionError, SessionState, SessionStateMachine};
27use crate::draft18::subscription::{SubscriptionError, SubscriptionStateMachine};
28use crate::draft18::track_status::{TrackStatusError, TrackStatusStateMachine};
29use moqtap_codec::draft18::message::{
30    self, ControlMessage, Fetch, FetchPayload, FetchType, GoAway, Publish, PublishBlocked,
31    PublishDone, PublishNamespace, RequestError, RequestOk, RequestUpdate, Setup, Subscribe,
32    SubscribeNamespace, SubscribeOk, SubscribeTracks,
33};
34use moqtap_codec::kvp::KeyValuePair;
35use moqtap_codec::types::*;
36use moqtap_codec::varint::VarInt;
37
38/// Errors that can occur during endpoint operations.
39#[derive(Debug, thiserror::Error)]
40pub enum EndpointError {
41    #[error("session error: {0}")]
42    Session(#[from] SessionError),
43    #[error("request ID error: {0}")]
44    RequestId(#[from] RequestIdError),
45    #[error("subscription error: {0}")]
46    Subscription(#[from] SubscriptionError),
47    #[error("fetch error: {0}")]
48    Fetch(#[from] FetchError),
49    #[error("namespace error: {0}")]
50    Namespace(#[from] NamespaceError),
51    #[error("track status error: {0}")]
52    TrackStatus(#[from] TrackStatusError),
53    #[error("publish flow error: {0}")]
54    PublishFlow(#[from] PublishFlowError),
55    #[error("setup error: {0}")]
56    Setup(#[from] SetupError),
57    #[error("unknown request ID: {0}")]
58    UnknownRequest(u64),
59    #[error(
60        "response message received on control stream; d18 responses belong on bidi request streams"
61    )]
62    ResponseOnControlStream,
63    #[error("session not active")]
64    NotActive,
65    #[error("session is draining, no new requests allowed")]
66    Draining,
67}
68
69pub struct Endpoint {
70    role: Role,
71    session: SessionStateMachine,
72    request_ids: RequestIdAllocator,
73    subscriptions: HashMap<u64, SubscriptionStateMachine>,
74    fetches: HashMap<u64, FetchStateMachine>,
75    subscribe_namespaces: HashMap<u64, SubscribeNamespaceStateMachine>,
76    subscribe_tracks: HashMap<u64, SubscribeNamespaceStateMachine>,
77    publish_namespaces: HashMap<u64, PublishNamespaceStateMachine>,
78    track_statuses: HashMap<u64, TrackStatusStateMachine>,
79    publishes: HashMap<u64, PublishStateMachine>,
80    goaway_uri: Option<Vec<u8>>,
81}
82
83impl Endpoint {
84    pub fn new(role: Role) -> Self {
85        Self {
86            role,
87            session: SessionStateMachine::new(),
88            request_ids: RequestIdAllocator::new(role),
89            subscriptions: HashMap::new(),
90            fetches: HashMap::new(),
91            subscribe_namespaces: HashMap::new(),
92            subscribe_tracks: HashMap::new(),
93            publish_namespaces: HashMap::new(),
94            track_statuses: HashMap::new(),
95            publishes: HashMap::new(),
96            goaway_uri: None,
97        }
98    }
99
100    pub fn role(&self) -> Role {
101        self.role
102    }
103
104    pub fn session_state(&self) -> SessionState {
105        self.session.state()
106    }
107
108    pub fn goaway_uri(&self) -> Option<&[u8]> {
109        self.goaway_uri.as_deref()
110    }
111
112    pub fn active_subscription_count(&self) -> usize {
113        self.subscriptions.len()
114    }
115
116    pub fn active_fetch_count(&self) -> usize {
117        self.fetches.len()
118    }
119
120    pub fn active_subscribe_namespace_count(&self) -> usize {
121        self.subscribe_namespaces.len()
122    }
123
124    pub fn active_subscribe_tracks_count(&self) -> usize {
125        self.subscribe_tracks.len()
126    }
127
128    pub fn active_publish_namespace_count(&self) -> usize {
129        self.publish_namespaces.len()
130    }
131
132    pub fn active_track_status_count(&self) -> usize {
133        self.track_statuses.len()
134    }
135
136    pub fn active_publish_count(&self) -> usize {
137        self.publishes.len()
138    }
139
140    // -- Session lifecycle ------------------------------------------
141
142    pub fn connect(&mut self) -> Result<(), EndpointError> {
143        self.session.on_connect()?;
144        Ok(())
145    }
146
147    pub fn close(&mut self) -> Result<(), EndpointError> {
148        self.session.on_close()?;
149        Ok(())
150    }
151
152    // -- Unified SETUP ----------------------------------------------
153
154    /// Generate a SETUP message. Both client and server use the same message
155    /// type; only the role (and the order of send/receive) distinguishes them.
156    pub fn send_setup(
157        &mut self,
158        options: Vec<KeyValuePair>,
159    ) -> Result<ControlMessage, EndpointError> {
160        let msg = Setup { options };
161        setup::validate_setup(&msg)?;
162        Ok(ControlMessage::Setup(msg))
163    }
164
165    /// Process an incoming SETUP message. Transitions the session to Active.
166    pub fn receive_setup(&mut self, msg: &Setup) -> Result<(), EndpointError> {
167        setup::validate_setup(msg)?;
168        self.session.on_setup_complete()?;
169        Ok(())
170    }
171
172    // -- GoAway -----------------------------------------------------
173
174    pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
175        self.session.on_goaway()?;
176        self.goaway_uri = Some(msg.new_session_uri.clone());
177        Ok(())
178    }
179
180    fn require_active_or_err(&self) -> Result<(), EndpointError> {
181        match self.session.state() {
182            SessionState::Active => Ok(()),
183            SessionState::Draining => Err(EndpointError::Draining),
184            _ => Err(EndpointError::NotActive),
185        }
186    }
187
188    // -- Subscribe flow ---------------------------------------------
189
190    pub fn subscribe(
191        &mut self,
192        track_namespace: TrackNamespace,
193        track_name: Vec<u8>,
194        parameters: Vec<KeyValuePair>,
195    ) -> Result<(VarInt, ControlMessage), EndpointError> {
196        self.require_active_or_err()?;
197        let req_id = self.request_ids.allocate()?;
198
199        let mut sm = SubscriptionStateMachine::new();
200        sm.on_subscribe_sent()?;
201        self.subscriptions.insert(req_id.into_inner(), sm);
202
203        let msg = ControlMessage::Subscribe(Subscribe {
204            request_id: req_id,
205            track_namespace,
206            track_name,
207            parameters,
208        });
209        Ok((req_id, msg))
210    }
211
212    /// Process an incoming SUBSCRIBE_OK. Draft-18: no request_id on wire; the
213    /// caller supplies the `request_id` of the bidi stream on which the
214    /// response arrived.
215    pub fn receive_subscribe_ok(
216        &mut self,
217        request_id: VarInt,
218        _msg: &SubscribeOk,
219    ) -> Result<(), EndpointError> {
220        let id = request_id.into_inner();
221        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
222        sm.on_subscribe_ok()?;
223        Ok(())
224    }
225
226    pub fn receive_request_update(&mut self, msg: &RequestUpdate) -> Result<(), EndpointError> {
227        let id = msg.request_id.into_inner();
228        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
229        sm.on_subscribe_update()?;
230        Ok(())
231    }
232
233    pub fn receive_publish_done(
234        &mut self,
235        request_id: VarInt,
236        _msg: &PublishDone,
237    ) -> Result<(), EndpointError> {
238        let id = request_id.into_inner();
239        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
240        sm.on_publish_done()?;
241        Ok(())
242    }
243
244    // -- Fetch flow -------------------------------------------------
245
246    pub fn fetch(
247        &mut self,
248        track_namespace: TrackNamespace,
249        track_name: Vec<u8>,
250        start_group: VarInt,
251        start_object: VarInt,
252        end_group: VarInt,
253        end_object: VarInt,
254    ) -> Result<(VarInt, ControlMessage), EndpointError> {
255        self.require_active_or_err()?;
256        let req_id = self.request_ids.allocate()?;
257
258        let mut sm = FetchStateMachine::new();
259        sm.on_fetch_sent()?;
260        self.fetches.insert(req_id.into_inner(), sm);
261
262        let msg = ControlMessage::Fetch(Fetch {
263            request_id: req_id,
264            fetch_type: FetchType::Standalone,
265            fetch_payload: FetchPayload::Standalone {
266                track_namespace,
267                track_name,
268                start_group,
269                start_object,
270                end_group,
271                end_object,
272            },
273            parameters: vec![],
274        });
275        Ok((req_id, msg))
276    }
277
278    pub fn joining_fetch(
279        &mut self,
280        joining_request_id: VarInt,
281        joining_start: VarInt,
282    ) -> Result<(VarInt, ControlMessage), EndpointError> {
283        self.require_active_or_err()?;
284        let req_id = self.request_ids.allocate()?;
285
286        let mut sm = FetchStateMachine::new();
287        sm.on_fetch_sent()?;
288        self.fetches.insert(req_id.into_inner(), sm);
289
290        let msg = ControlMessage::Fetch(Fetch {
291            request_id: req_id,
292            fetch_type: FetchType::RelativeJoining,
293            fetch_payload: FetchPayload::Joining { joining_request_id, joining_start },
294            parameters: vec![],
295        });
296        Ok((req_id, msg))
297    }
298
299    pub fn receive_fetch_ok(
300        &mut self,
301        request_id: VarInt,
302        _msg: &message::FetchOk,
303    ) -> Result<(), EndpointError> {
304        let id = request_id.into_inner();
305        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
306        sm.on_fetch_ok()?;
307        Ok(())
308    }
309
310    pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
311        let id = request_id.into_inner();
312        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
313        sm.on_stream_fin()?;
314        Ok(())
315    }
316
317    pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
318        let id = request_id.into_inner();
319        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
320        sm.on_stream_reset()?;
321        Ok(())
322    }
323
324    // -- Subscribe Namespace flow -----------------------------------
325
326    pub fn subscribe_namespace(
327        &mut self,
328        namespace_prefix: TrackNamespace,
329        parameters: Vec<KeyValuePair>,
330    ) -> Result<(VarInt, ControlMessage), EndpointError> {
331        self.require_active_or_err()?;
332        let req_id = self.request_ids.allocate()?;
333
334        let mut sm = SubscribeNamespaceStateMachine::new();
335        sm.on_subscribe_namespace_sent()?;
336        self.subscribe_namespaces.insert(req_id.into_inner(), sm);
337
338        let msg = ControlMessage::SubscribeNamespace(SubscribeNamespace {
339            request_id: req_id,
340            namespace_prefix,
341            parameters,
342        });
343        Ok((req_id, msg))
344    }
345
346    // -- Subscribe Tracks flow (new in draft-18) --------------------
347
348    pub fn subscribe_tracks(
349        &mut self,
350        namespace_prefix: TrackNamespace,
351        parameters: Vec<KeyValuePair>,
352    ) -> Result<(VarInt, ControlMessage), EndpointError> {
353        self.require_active_or_err()?;
354        let req_id = self.request_ids.allocate()?;
355
356        // Reuse the SubscribeNamespace state machine — the lifecycle is the
357        // same (request → ok/error → done) and adding a parallel state
358        // machine purely to disambiguate would be churn.
359        let mut sm = SubscribeNamespaceStateMachine::new();
360        sm.on_subscribe_namespace_sent()?;
361        self.subscribe_tracks.insert(req_id.into_inner(), sm);
362
363        let msg = ControlMessage::SubscribeTracks(SubscribeTracks {
364            request_id: req_id,
365            namespace_prefix,
366            parameters,
367        });
368        Ok((req_id, msg))
369    }
370
371    // -- Publish Namespace flow -------------------------------------
372
373    pub fn publish_namespace(
374        &mut self,
375        track_namespace: TrackNamespace,
376        parameters: Vec<KeyValuePair>,
377    ) -> Result<(VarInt, ControlMessage), EndpointError> {
378        self.require_active_or_err()?;
379        let req_id = self.request_ids.allocate()?;
380
381        let mut sm = PublishNamespaceStateMachine::new();
382        sm.on_publish_namespace_sent()?;
383        self.publish_namespaces.insert(req_id.into_inner(), sm);
384
385        let msg = ControlMessage::PublishNamespace(PublishNamespace {
386            request_id: req_id,
387            track_namespace,
388            parameters,
389        });
390        Ok((req_id, msg))
391    }
392
393    // -- Track Status flow ------------------------------------------
394
395    pub fn track_status(
396        &mut self,
397        track_namespace: TrackNamespace,
398        track_name: Vec<u8>,
399        parameters: Vec<KeyValuePair>,
400    ) -> Result<(VarInt, ControlMessage), EndpointError> {
401        self.require_active_or_err()?;
402        let req_id = self.request_ids.allocate()?;
403        let mut sm = TrackStatusStateMachine::new();
404        sm.on_track_status_sent()?;
405        self.track_statuses.insert(req_id.into_inner(), sm);
406
407        let msg = ControlMessage::TrackStatus(message::TrackStatus {
408            request_id: req_id,
409            track_namespace,
410            track_name,
411            parameters,
412        });
413        Ok((req_id, msg))
414    }
415
416    // -- Publish flow (publisher side) ------------------------------
417
418    pub fn publish(
419        &mut self,
420        track_namespace: TrackNamespace,
421        track_name: Vec<u8>,
422        track_alias: VarInt,
423        parameters: Vec<KeyValuePair>,
424        track_properties: Vec<KeyValuePair>,
425    ) -> Result<(VarInt, ControlMessage), EndpointError> {
426        self.require_active_or_err()?;
427        let req_id = self.request_ids.allocate()?;
428        let mut sm = PublishStateMachine::new();
429        sm.on_publish_sent()?;
430        self.publishes.insert(req_id.into_inner(), sm);
431
432        let msg = ControlMessage::Publish(Publish {
433            request_id: req_id,
434            track_namespace,
435            track_name,
436            track_alias,
437            parameters,
438            track_properties,
439        });
440        Ok((req_id, msg))
441    }
442
443    pub fn send_publish_done(
444        &mut self,
445        request_id: VarInt,
446        status_code: VarInt,
447        stream_count: VarInt,
448        reason_phrase: Vec<u8>,
449    ) -> Result<ControlMessage, EndpointError> {
450        let id = request_id.into_inner();
451        let sm = self.publishes.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
452        sm.on_publish_done_sent()?;
453        Ok(ControlMessage::PublishDone(PublishDone { status_code, stream_count, reason_phrase }))
454    }
455
456    // -- Consolidated responses (per-bidi-stream routing) -----------
457
458    /// Process an incoming REQUEST_OK on the bidi stream identified by
459    /// `request_id`. Draft-18: PUBLISH_OK is now a REQUEST_OK alias, so this
460    /// handler also resolves outstanding PUBLISH requests.
461    pub fn receive_request_ok(
462        &mut self,
463        request_id: VarInt,
464        _msg: &RequestOk,
465    ) -> Result<(), EndpointError> {
466        let id = request_id.into_inner();
467        if let Some(sm) = self.publishes.get_mut(&id) {
468            sm.on_publish_ok()?;
469            return Ok(());
470        }
471        if let Some(sm) = self.subscribe_namespaces.get_mut(&id) {
472            sm.on_subscribe_namespace_ok()?;
473            return Ok(());
474        }
475        if let Some(sm) = self.subscribe_tracks.get_mut(&id) {
476            sm.on_subscribe_namespace_ok()?;
477            return Ok(());
478        }
479        if let Some(sm) = self.publish_namespaces.get_mut(&id) {
480            sm.on_publish_namespace_ok()?;
481            return Ok(());
482        }
483        if let Some(sm) = self.track_statuses.get_mut(&id) {
484            sm.on_track_status_ok()?;
485            return Ok(());
486        }
487        Err(EndpointError::UnknownRequest(id))
488    }
489
490    /// Process an incoming REQUEST_ERROR on the bidi stream identified by
491    /// `request_id`.
492    pub fn receive_request_error(
493        &mut self,
494        request_id: VarInt,
495        _msg: &RequestError,
496    ) -> Result<(), EndpointError> {
497        let id = request_id.into_inner();
498        if let Some(sm) = self.subscriptions.get_mut(&id) {
499            sm.on_subscribe_error()?;
500            return Ok(());
501        }
502        if let Some(sm) = self.fetches.get_mut(&id) {
503            sm.on_fetch_error()?;
504            return Ok(());
505        }
506        if let Some(sm) = self.publishes.get_mut(&id) {
507            sm.on_publish_error()?;
508            return Ok(());
509        }
510        if let Some(sm) = self.subscribe_namespaces.get_mut(&id) {
511            sm.on_subscribe_namespace_error()?;
512            return Ok(());
513        }
514        if let Some(sm) = self.subscribe_tracks.get_mut(&id) {
515            sm.on_subscribe_namespace_error()?;
516            return Ok(());
517        }
518        if let Some(sm) = self.publish_namespaces.get_mut(&id) {
519            sm.on_publish_namespace_error()?;
520            return Ok(());
521        }
522        if let Some(sm) = self.track_statuses.get_mut(&id) {
523            sm.on_track_status_error()?;
524            return Ok(());
525        }
526        Err(EndpointError::UnknownRequest(id))
527    }
528
529    // -- PublishBlocked / Namespace announcements -------------------
530
531    pub fn receive_namespace(&mut self, _msg: &message::Namespace) -> Result<(), EndpointError> {
532        Ok(())
533    }
534
535    pub fn receive_namespace_done(
536        &mut self,
537        _msg: &message::NamespaceDone,
538    ) -> Result<(), EndpointError> {
539        Ok(())
540    }
541
542    pub fn receive_publish_blocked(&mut self, _msg: &PublishBlocked) -> Result<(), EndpointError> {
543        Ok(())
544    }
545
546    // -- Unified message dispatch -----------------------------------
547
548    pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
549        match msg {
550            ControlMessage::Setup(ref m) => self.receive_setup(m),
551            ControlMessage::GoAway(ref m) => self.receive_goaway(m),
552            ControlMessage::RequestUpdate(ref m) => self.receive_request_update(m),
553            ControlMessage::Namespace(ref m) => self.receive_namespace(m),
554            ControlMessage::NamespaceDone(ref m) => self.receive_namespace_done(m),
555            ControlMessage::PublishBlocked(ref m) => self.receive_publish_blocked(m),
556            ControlMessage::SubscribeOk(_)
557            | ControlMessage::PublishDone(_)
558            | ControlMessage::FetchOk(_)
559            | ControlMessage::RequestOk(_)
560            | ControlMessage::RequestError(_) => Err(EndpointError::ResponseOnControlStream),
561            _ => Ok(()),
562        }
563    }
564
565    /// Dispatch a response message that arrived on the bidi request stream
566    /// identified by `request_id`.
567    pub fn receive_response_on_stream(
568        &mut self,
569        request_id: VarInt,
570        msg: ControlMessage,
571    ) -> Result<(), EndpointError> {
572        match msg {
573            ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(request_id, m),
574            ControlMessage::PublishDone(ref m) => self.receive_publish_done(request_id, m),
575            ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(request_id, m),
576            ControlMessage::RequestOk(ref m) => self.receive_request_ok(request_id, m),
577            ControlMessage::RequestError(ref m) => self.receive_request_error(request_id, m),
578            _ => Err(EndpointError::ResponseOnControlStream),
579        }
580    }
581}