Skip to main content

moqtap_client/draft14/
endpoint.rs

1use std::collections::HashMap;
2
3use crate::draft14::fetch::{FetchError, FetchStateMachine};
4use crate::draft14::namespace::{
5    NamespaceError, PublishNamespaceStateMachine, SubscribeNamespaceStateMachine,
6};
7use crate::draft14::publish::{PublishError as PublishFlowError, PublishStateMachine};
8use crate::draft14::session::request_id::{RequestIdAllocator, RequestIdError, Role};
9use crate::draft14::session::setup::{self, SetupError};
10use crate::draft14::session::state::{SessionError, SessionState, SessionStateMachine};
11use crate::draft14::subscription::{SubscriptionError, SubscriptionStateMachine};
12use crate::draft14::track_status::{TrackStatusError, TrackStatusStateMachine};
13use moqtap_codec::draft14::message::{
14    self, ClientSetup, ControlMessage, Fetch, FetchCancel, GoAway, MaxRequestId, PublishDone,
15    PublishNamespace, PublishNamespaceCancel, PublishNamespaceDone, PublishNamespaceError,
16    PublishNamespaceOk, RequestsBlocked, ServerSetup, Subscribe, SubscribeError,
17    SubscribeNamespace, SubscribeNamespaceError, SubscribeNamespaceOk, SubscribeOk,
18    SubscribeUpdate, Unsubscribe, UnsubscribeNamespace,
19};
20use moqtap_codec::kvp::{KeyValuePair, KvpValue};
21use moqtap_codec::types::*;
22use moqtap_codec::varint::VarInt;
23
24/// Errors that can occur during endpoint operations.
25#[derive(Debug, thiserror::Error)]
26pub enum EndpointError {
27    /// A session-level state machine error.
28    #[error("session error: {0}")]
29    Session(#[from] SessionError),
30    /// A request ID allocation or validation error.
31    #[error("request ID error: {0}")]
32    RequestId(#[from] RequestIdError),
33    /// A subscription state machine error.
34    #[error("subscription error: {0}")]
35    Subscription(#[from] SubscriptionError),
36    /// A fetch state machine error.
37    #[error("fetch error: {0}")]
38    Fetch(#[from] FetchError),
39    /// A namespace state machine error.
40    #[error("namespace error: {0}")]
41    Namespace(#[from] NamespaceError),
42    /// A track status state machine error.
43    #[error("track status error: {0}")]
44    TrackStatus(#[from] TrackStatusError),
45    /// A publish flow state machine error.
46    #[error("publish flow error: {0}")]
47    PublishFlow(#[from] PublishFlowError),
48    /// A setup negotiation error.
49    #[error("setup error: {0}")]
50    Setup(#[from] SetupError),
51    /// The request ID does not match any known state machine.
52    #[error("unknown request ID: {0}")]
53    UnknownRequest(u64),
54    /// The session is not in the Active state.
55    #[error("session not active")]
56    NotActive,
57    /// The session is draining and cannot accept new requests.
58    #[error("session is draining, no new requests allowed")]
59    Draining,
60}
61
62/// Unified MoQT endpoint wrapping session lifecycle, request ID allocation,
63/// and all per-request state machines (subscriptions, fetches, namespaces).
64pub struct Endpoint {
65    role: Role,
66    session: SessionStateMachine,
67    request_ids: RequestIdAllocator,
68    /// Tracks the MAX_REQUEST_ID we have advertised to the peer (for monotonic enforcement).
69    advertised_max_id: u64,
70    subscriptions: HashMap<u64, SubscriptionStateMachine>,
71    fetches: HashMap<u64, FetchStateMachine>,
72    subscribe_namespaces: HashMap<u64, SubscribeNamespaceStateMachine>,
73    publish_namespaces: HashMap<u64, PublishNamespaceStateMachine>,
74    track_statuses: HashMap<u64, TrackStatusStateMachine>,
75    publishes: HashMap<u64, PublishStateMachine>,
76    negotiated_version: Option<VarInt>,
77    offered_versions: Vec<VarInt>,
78    goaway_uri: Option<Vec<u8>>,
79}
80
81impl Endpoint {
82    /// Create a new endpoint with the given role.
83    pub fn new(role: Role) -> Self {
84        Self {
85            role,
86            session: SessionStateMachine::new(),
87            request_ids: RequestIdAllocator::new(role),
88            advertised_max_id: 0,
89            subscriptions: HashMap::new(),
90            fetches: HashMap::new(),
91            subscribe_namespaces: HashMap::new(),
92            publish_namespaces: HashMap::new(),
93            track_statuses: HashMap::new(),
94            publishes: HashMap::new(),
95            negotiated_version: None,
96            offered_versions: Vec::new(),
97            goaway_uri: None,
98        }
99    }
100
101    // ── Accessors ──────────────────────────────────────────────
102
103    /// Returns the role (client or server) of this endpoint.
104    pub fn role(&self) -> Role {
105        self.role
106    }
107
108    /// Returns the current session state.
109    pub fn session_state(&self) -> SessionState {
110        self.session.state()
111    }
112
113    /// Returns the negotiated MoQT version, if setup is complete.
114    pub fn negotiated_version(&self) -> Option<VarInt> {
115        self.negotiated_version
116    }
117
118    /// Returns the URI from a received GOAWAY message, if any.
119    pub fn goaway_uri(&self) -> Option<&[u8]> {
120        self.goaway_uri.as_deref()
121    }
122
123    /// Returns whether this endpoint is blocked on request ID allocation.
124    pub fn is_blocked(&self) -> bool {
125        self.request_ids.is_blocked()
126    }
127
128    /// Returns the number of active subscription state machines.
129    pub fn active_subscription_count(&self) -> usize {
130        self.subscriptions.len()
131    }
132
133    /// Returns the number of active fetch state machines.
134    pub fn active_fetch_count(&self) -> usize {
135        self.fetches.len()
136    }
137
138    /// Returns the number of active subscribe-namespace state machines.
139    pub fn active_subscribe_namespace_count(&self) -> usize {
140        self.subscribe_namespaces.len()
141    }
142
143    /// Returns the number of active publish-namespace state machines.
144    pub fn active_publish_namespace_count(&self) -> usize {
145        self.publish_namespaces.len()
146    }
147
148    /// Returns the number of active track status state machines.
149    pub fn active_track_status_count(&self) -> usize {
150        self.track_statuses.len()
151    }
152
153    /// Returns the number of active publish state machines.
154    pub fn active_publish_count(&self) -> usize {
155        self.publishes.len()
156    }
157
158    // ── Session lifecycle ──────────────────────────────────────
159
160    /// Transition from Connecting to SetupExchange.
161    pub fn connect(&mut self) -> Result<(), EndpointError> {
162        self.session.on_connect()?;
163        Ok(())
164    }
165
166    /// Close the session (Active or Draining -> Closed).
167    pub fn close(&mut self) -> Result<(), EndpointError> {
168        self.session.on_close()?;
169        Ok(())
170    }
171
172    // ── Client setup ───────────────────────────────────────────
173
174    /// Generate a CLIENT_SETUP message (client-side).
175    pub fn send_client_setup(
176        &mut self,
177        versions: Vec<VarInt>,
178        parameters: Vec<KeyValuePair>,
179    ) -> Result<ControlMessage, EndpointError> {
180        self.offered_versions = versions.clone();
181        let msg = ClientSetup { supported_versions: versions, parameters };
182        setup::validate_client_setup(&msg)?;
183        Ok(ControlMessage::ClientSetup(msg))
184    }
185
186    /// Process a SERVER_SETUP message (client-side). Transitions to Active.
187    /// If the server includes a MAX_REQUEST_ID parameter (key 0x02), the
188    /// request ID allocator is initialized with that value.
189    pub fn receive_server_setup(&mut self, msg: &ServerSetup) -> Result<(), EndpointError> {
190        setup::validate_server_setup(msg)?;
191        let version = setup::negotiate_version(&self.offered_versions, msg.selected_version)?;
192        self.negotiated_version = Some(version);
193        self.session.on_setup_complete()?;
194        // Extract MAX_REQUEST_ID (key 0x02) from setup parameters if present
195        for param in &msg.parameters {
196            if param.key == VarInt::from_u64(0x02).unwrap() {
197                if let KvpValue::Varint(v) = &param.value {
198                    self.request_ids.update_max(v.into_inner())?;
199                }
200            }
201        }
202        Ok(())
203    }
204
205    // ── Server setup ───────────────────────────────────────────
206
207    /// Process CLIENT_SETUP and generate SERVER_SETUP (server-side).
208    pub fn receive_client_setup_and_respond(
209        &mut self,
210        client_setup: &ClientSetup,
211        selected_version: VarInt,
212    ) -> Result<ControlMessage, EndpointError> {
213        setup::validate_client_setup(client_setup)?;
214        let version = setup::negotiate_version(&client_setup.supported_versions, selected_version)?;
215        self.negotiated_version = Some(version);
216        self.session.on_setup_complete()?;
217        let msg = ServerSetup { selected_version: version, parameters: vec![] };
218        Ok(ControlMessage::ServerSetup(msg))
219    }
220
221    // ── MAX_REQUEST_ID ─────────────────────────────────────────
222
223    /// Process an incoming MAX_REQUEST_ID message.
224    pub fn receive_max_request_id(&mut self, msg: &MaxRequestId) -> Result<(), EndpointError> {
225        self.request_ids.update_max(msg.request_id.into_inner())?;
226        Ok(())
227    }
228
229    /// Generate a MAX_REQUEST_ID message (typically server-side).
230    /// The value must strictly increase over previous sends.
231    pub fn send_max_request_id(&mut self, max_id: VarInt) -> Result<ControlMessage, EndpointError> {
232        let new_val = max_id.into_inner();
233        if new_val <= self.advertised_max_id && self.advertised_max_id > 0 {
234            return Err(EndpointError::RequestId(RequestIdError::Decreased(
235                self.advertised_max_id,
236                new_val,
237            )));
238        }
239        self.advertised_max_id = new_val;
240        Ok(ControlMessage::MaxRequestId(MaxRequestId { request_id: max_id }))
241    }
242
243    /// Generate a REQUESTS_BLOCKED message indicating that this endpoint
244    /// wants to create a new request but is blocked by the current
245    /// MAX_REQUEST_ID. Per draft-14 §6.3.2.
246    pub fn send_requests_blocked(&self) -> Result<ControlMessage, EndpointError> {
247        let max_id = self.request_ids.max_id();
248        Ok(ControlMessage::RequestsBlocked(RequestsBlocked {
249            maximum_request_id: VarInt::from_u64(max_id).unwrap(),
250        }))
251    }
252
253    /// Process an incoming REQUESTS_BLOCKED message from the peer.
254    /// This signals that the peer wants to issue new requests but is
255    /// limited by the MAX_REQUEST_ID we advertised.
256    pub fn receive_requests_blocked(&self, _msg: &RequestsBlocked) -> Result<(), EndpointError> {
257        // The peer is telling us they're blocked. This is informational;
258        // the application layer should decide whether to increase MAX_REQUEST_ID.
259        Ok(())
260    }
261
262    // ── GoAway ─────────────────────────────────────────────────
263
264    /// Process an incoming GOAWAY message. Transitions to Draining.
265    pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
266        self.session.on_goaway()?;
267        self.goaway_uri = Some(msg.new_session_uri.clone());
268        Ok(())
269    }
270
271    // ── Subscribe flow ─────────────────────────────────────────
272
273    fn require_active_or_err(&self) -> Result<(), EndpointError> {
274        match self.session.state() {
275            SessionState::Active => Ok(()),
276            SessionState::Draining => Err(EndpointError::Draining),
277            _ => Err(EndpointError::NotActive),
278        }
279    }
280
281    /// Send a SUBSCRIBE message. Allocates a request ID and creates a
282    /// subscription state machine.
283    pub fn subscribe(
284        &mut self,
285        track_namespace: TrackNamespace,
286        track_name: Vec<u8>,
287        subscriber_priority: u8,
288        group_order: GroupOrder,
289        filter_type: FilterType,
290    ) -> Result<(VarInt, ControlMessage), EndpointError> {
291        self.require_active_or_err()?;
292        let req_id = self.request_ids.allocate()?;
293
294        let mut sm = SubscriptionStateMachine::new();
295        sm.on_subscribe_sent()?;
296        self.subscriptions.insert(req_id.into_inner(), sm);
297
298        let msg = ControlMessage::Subscribe(Subscribe {
299            request_id: req_id,
300            track_namespace,
301            track_name,
302            subscriber_priority,
303            group_order,
304            forward: Forward::Forward,
305            filter_type,
306            start_location: None,
307            end_group: None,
308            parameters: vec![],
309        });
310        Ok((req_id, msg))
311    }
312
313    /// Process an incoming SUBSCRIBE_OK.
314    pub fn receive_subscribe_ok(&mut self, msg: &SubscribeOk) -> Result<(), EndpointError> {
315        let id = msg.request_id.into_inner();
316        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
317        sm.on_subscribe_ok()?;
318        Ok(())
319    }
320
321    /// Process an incoming SUBSCRIBE_ERROR.
322    pub fn receive_subscribe_error(&mut self, msg: &SubscribeError) -> Result<(), EndpointError> {
323        let id = msg.request_id.into_inner();
324        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
325        sm.on_subscribe_error()?;
326        Ok(())
327    }
328
329    /// Send an UNSUBSCRIBE message for an active subscription.
330    pub fn unsubscribe(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
331        let id = request_id.into_inner();
332        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
333        sm.on_unsubscribe()?;
334        Ok(ControlMessage::Unsubscribe(Unsubscribe { request_id }))
335    }
336
337    /// Process an incoming SUBSCRIBE_UPDATE.
338    pub fn receive_subscribe_update(&mut self, msg: &SubscribeUpdate) -> Result<(), EndpointError> {
339        let id = msg.subscription_request_id.into_inner();
340        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
341        sm.on_subscribe_update()?;
342        Ok(())
343    }
344
345    /// Send a SUBSCRIBE_UPDATE for an active subscription. Allocates a fresh
346    /// request ID for the update message and returns it alongside the message.
347    pub fn subscribe_update(
348        &mut self,
349        subscription_request_id: VarInt,
350        start_location: Location,
351        end_group: VarInt,
352        subscriber_priority: u8,
353        forward: Forward,
354        parameters: Vec<KeyValuePair>,
355    ) -> Result<(VarInt, ControlMessage), EndpointError> {
356        self.require_active_or_err()?;
357        let sub_id = subscription_request_id.into_inner();
358        let sm =
359            self.subscriptions.get_mut(&sub_id).ok_or(EndpointError::UnknownRequest(sub_id))?;
360        sm.on_subscribe_update()?;
361        let req_id = self.request_ids.allocate()?;
362        let msg = ControlMessage::SubscribeUpdate(SubscribeUpdate {
363            request_id: req_id,
364            subscription_request_id,
365            start_location,
366            end_group,
367            subscriber_priority,
368            forward,
369            parameters,
370        });
371        Ok((req_id, msg))
372    }
373
374    /// Process an incoming PUBLISH_DONE (subscriber side — publisher finished).
375    pub fn receive_publish_done(&mut self, msg: &PublishDone) -> Result<(), EndpointError> {
376        let id = msg.request_id.into_inner();
377        let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
378        sm.on_publish_done()?;
379        Ok(())
380    }
381
382    // ── Fetch flow ─────────────────────────────────────────────
383
384    /// Send a FETCH message. Allocates a request ID and creates a fetch state machine.
385    pub fn fetch(
386        &mut self,
387        track_namespace: TrackNamespace,
388        track_name: Vec<u8>,
389        start_group: VarInt,
390        start_object: VarInt,
391    ) -> Result<(VarInt, ControlMessage), EndpointError> {
392        self.require_active_or_err()?;
393        let req_id = self.request_ids.allocate()?;
394
395        let mut sm = FetchStateMachine::new();
396        sm.on_fetch_sent()?;
397        self.fetches.insert(req_id.into_inner(), sm);
398
399        let msg = ControlMessage::Fetch(Fetch {
400            request_id: req_id,
401            subscriber_priority: 128,
402            group_order: GroupOrder::Ascending,
403            fetch_type: message::FetchType::Standalone,
404            fetch_payload: message::FetchPayload::Standalone {
405                track_namespace,
406                track_name,
407                start_group,
408                start_object,
409                end_group: VarInt::from_u64(0).unwrap(),
410                end_object: VarInt::from_u64(0).unwrap(),
411            },
412            parameters: vec![],
413        });
414        Ok((req_id, msg))
415    }
416
417    /// Process an incoming FETCH_OK.
418    pub fn receive_fetch_ok(&mut self, msg: &message::FetchOk) -> Result<(), EndpointError> {
419        let id = msg.request_id.into_inner();
420        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
421        sm.on_fetch_ok()?;
422        Ok(())
423    }
424
425    /// Process an incoming FETCH_ERROR.
426    pub fn receive_fetch_error(&mut self, msg: &message::FetchError) -> Result<(), EndpointError> {
427        let id = msg.request_id.into_inner();
428        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
429        sm.on_fetch_error()?;
430        Ok(())
431    }
432
433    /// Send a FETCH_CANCEL message.
434    pub fn fetch_cancel(&mut self, request_id: VarInt) -> Result<ControlMessage, EndpointError> {
435        let id = request_id.into_inner();
436        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
437        sm.on_fetch_cancel()?;
438        Ok(ControlMessage::FetchCancel(FetchCancel { request_id }))
439    }
440
441    /// Notify that a fetch data stream received FIN.
442    pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
443        let id = request_id.into_inner();
444        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
445        sm.on_stream_fin()?;
446        Ok(())
447    }
448
449    /// Notify that a fetch data stream was reset.
450    pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
451        let id = request_id.into_inner();
452        let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
453        sm.on_stream_reset()?;
454        Ok(())
455    }
456
457    // ── Subscribe Namespace flow ───────────────────────────────
458
459    /// Send a SUBSCRIBE_NAMESPACE message.
460    pub fn subscribe_namespace(
461        &mut self,
462        track_namespace: TrackNamespace,
463    ) -> Result<(VarInt, ControlMessage), EndpointError> {
464        self.require_active_or_err()?;
465        let req_id = self.request_ids.allocate()?;
466
467        let mut sm = SubscribeNamespaceStateMachine::new();
468        sm.on_subscribe_namespace_sent()?;
469        self.subscribe_namespaces.insert(req_id.into_inner(), sm);
470
471        let msg = ControlMessage::SubscribeNamespace(SubscribeNamespace {
472            request_id: req_id,
473            track_namespace,
474            parameters: vec![],
475        });
476        Ok((req_id, msg))
477    }
478
479    /// Process an incoming SUBSCRIBE_NAMESPACE_OK.
480    pub fn receive_subscribe_namespace_ok(
481        &mut self,
482        msg: &SubscribeNamespaceOk,
483    ) -> Result<(), EndpointError> {
484        let id = msg.request_id.into_inner();
485        let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
486        sm.on_subscribe_namespace_ok()?;
487        Ok(())
488    }
489
490    /// Process an incoming SUBSCRIBE_NAMESPACE_ERROR.
491    pub fn receive_subscribe_namespace_error(
492        &mut self,
493        msg: &SubscribeNamespaceError,
494    ) -> Result<(), EndpointError> {
495        let id = msg.request_id.into_inner();
496        let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
497        sm.on_subscribe_namespace_error()?;
498        Ok(())
499    }
500
501    /// Send an UNSUBSCRIBE_NAMESPACE message.
502    pub fn unsubscribe_namespace(
503        &mut self,
504        request_id: VarInt,
505        _track_namespace: TrackNamespace,
506    ) -> Result<ControlMessage, EndpointError> {
507        let id = request_id.into_inner();
508        let sm = self.subscribe_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
509        sm.on_unsubscribe_namespace()?;
510        let _ = request_id;
511        Ok(ControlMessage::UnsubscribeNamespace(UnsubscribeNamespace {
512            track_namespace_prefix: _track_namespace,
513        }))
514    }
515
516    // ── Publish Namespace flow ─────────────────────────────────
517
518    /// Send a PUBLISH_NAMESPACE message.
519    pub fn publish_namespace(
520        &mut self,
521        track_namespace: TrackNamespace,
522    ) -> Result<(VarInt, ControlMessage), EndpointError> {
523        self.require_active_or_err()?;
524        let req_id = self.request_ids.allocate()?;
525
526        let mut sm = PublishNamespaceStateMachine::new();
527        sm.on_publish_namespace_sent()?;
528        self.publish_namespaces.insert(req_id.into_inner(), sm);
529
530        let msg = ControlMessage::PublishNamespace(PublishNamespace {
531            request_id: req_id,
532            track_namespace,
533            parameters: vec![],
534        });
535        Ok((req_id, msg))
536    }
537
538    /// Process an incoming PUBLISH_NAMESPACE_OK.
539    pub fn receive_publish_namespace_ok(
540        &mut self,
541        msg: &PublishNamespaceOk,
542    ) -> Result<(), EndpointError> {
543        let id = msg.request_id.into_inner();
544        let sm = self.publish_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
545        sm.on_publish_namespace_ok()?;
546        Ok(())
547    }
548
549    /// Process an incoming PUBLISH_NAMESPACE_ERROR.
550    pub fn receive_publish_namespace_error(
551        &mut self,
552        msg: &PublishNamespaceError,
553    ) -> Result<(), EndpointError> {
554        let id = msg.request_id.into_inner();
555        let sm = self.publish_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
556        sm.on_publish_namespace_error()?;
557        Ok(())
558    }
559
560    /// Process an incoming PUBLISH_NAMESPACE_DONE.
561    ///
562    /// Draft-14 PUBLISH_NAMESPACE_DONE is keyed by `track_namespace`
563    /// rather than a request ID, so the endpoint advances every
564    /// currently tracked publish-namespace state machine. Callers that
565    /// need per-namespace tracking should match on `msg.track_namespace`
566    /// directly.
567    pub fn receive_publish_namespace_done(
568        &mut self,
569        _msg: &PublishNamespaceDone,
570    ) -> Result<(), EndpointError> {
571        for sm in self.publish_namespaces.values_mut() {
572            // Best-effort: ignore state-machine errors for machines
573            // that are already past the Done transition.
574            let _ = sm.on_publish_namespace_done();
575        }
576        Ok(())
577    }
578
579    /// Send a PUBLISH_NAMESPACE_CANCEL message.
580    pub fn publish_namespace_cancel(
581        &mut self,
582        request_id: VarInt,
583        reason_phrase: Vec<u8>,
584    ) -> Result<ControlMessage, EndpointError> {
585        let id = request_id.into_inner();
586        let sm = self.publish_namespaces.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
587        sm.on_publish_namespace_cancel()?;
588        Ok(ControlMessage::PublishNamespaceCancel(PublishNamespaceCancel {
589            track_namespace: TrackNamespace(Vec::new()),
590            error_code: VarInt::from_u64(0).unwrap(),
591            reason_phrase,
592        }))
593    }
594
595    // ── Track Status flow ────────────────────────────────────
596
597    /// Send a TRACK_STATUS message. Allocates a request ID.
598    pub fn track_status(
599        &mut self,
600        track_namespace: TrackNamespace,
601        track_name: Vec<u8>,
602    ) -> Result<(VarInt, ControlMessage), EndpointError> {
603        self.require_active_or_err()?;
604        let req_id = self.request_ids.allocate()?;
605        let mut sm = TrackStatusStateMachine::new();
606        sm.on_track_status_sent()?;
607        self.track_statuses.insert(req_id.into_inner(), sm);
608        let msg = ControlMessage::TrackStatus(message::TrackStatus {
609            request_id: req_id,
610            track_namespace,
611            track_name,
612            subscriber_priority: 128,
613            group_order: GroupOrder::Ascending,
614            forward: Forward::Forward,
615            filter_type: FilterType::LargestObject,
616            start_location: None,
617            end_group: None,
618            parameters: vec![],
619        });
620        Ok((req_id, msg))
621    }
622
623    /// Process an incoming TRACK_STATUS_OK.
624    pub fn receive_track_status_ok(
625        &mut self,
626        msg: &message::TrackStatusOk,
627    ) -> Result<(), EndpointError> {
628        let id = msg.request_id.into_inner();
629        let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
630        sm.on_track_status_ok()?;
631        Ok(())
632    }
633
634    /// Process an incoming TRACK_STATUS_ERROR.
635    pub fn receive_track_status_error(
636        &mut self,
637        msg: &message::TrackStatusError,
638    ) -> Result<(), EndpointError> {
639        let id = msg.request_id.into_inner();
640        let sm = self.track_statuses.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
641        sm.on_track_status_error()?;
642        Ok(())
643    }
644
645    // ── Publish flow (publisher side) ─────────────────────────
646
647    /// Send a PUBLISH message (publisher side). Allocates a request ID.
648    pub fn publish(
649        &mut self,
650        track_namespace: TrackNamespace,
651        track_name: Vec<u8>,
652        forward: Forward,
653    ) -> Result<(VarInt, ControlMessage), EndpointError> {
654        self.require_active_or_err()?;
655        let req_id = self.request_ids.allocate()?;
656        let mut sm = PublishStateMachine::new();
657        sm.on_publish_sent()?;
658        self.publishes.insert(req_id.into_inner(), sm);
659        let msg = ControlMessage::Publish(message::Publish {
660            request_id: req_id,
661            track_namespace,
662            track_name,
663            track_alias: VarInt::from_u64(0).unwrap(),
664            group_order: GroupOrder::Ascending,
665            content_exists: ContentExists::NoLargestLocation,
666            largest_location: None,
667            forward,
668            parameters: vec![],
669        });
670        Ok((req_id, msg))
671    }
672
673    /// Process an incoming PUBLISH_OK (publisher side).
674    pub fn receive_publish_ok(&mut self, msg: &message::PublishOk) -> Result<(), EndpointError> {
675        let id = msg.request_id.into_inner();
676        let sm = self.publishes.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
677        sm.on_publish_ok()?;
678        Ok(())
679    }
680
681    /// Send a PUBLISH_DONE message (publisher finishing).
682    pub fn send_publish_done(
683        &mut self,
684        request_id: VarInt,
685        status_code: VarInt,
686        reason_phrase: Vec<u8>,
687    ) -> Result<ControlMessage, EndpointError> {
688        let id = request_id.into_inner();
689        let sm = self.publishes.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
690        sm.on_publish_done_sent()?;
691        Ok(ControlMessage::PublishDone(PublishDone {
692            request_id,
693            status_code,
694            stream_count: VarInt::from_u64(0).unwrap(),
695            reason_phrase,
696        }))
697    }
698
699    // ── Publish error ─────────────────────────────────────────
700
701    /// Generate a PUBLISH_ERROR message (draft-14 §6.5.3).
702    /// Used by the server/relay to reject a publish request.
703    pub fn send_publish_error(
704        &self,
705        request_id: VarInt,
706        error_code: VarInt,
707        reason_phrase: Vec<u8>,
708    ) -> Result<ControlMessage, EndpointError> {
709        Ok(ControlMessage::PublishError(message::PublishError {
710            request_id,
711            error_code,
712            reason_phrase,
713        }))
714    }
715
716    /// Process an incoming PUBLISH_ERROR (draft-14 §6.5.3).
717    /// Checks publisher-side publishes first, then subscriber-side subscriptions,
718    /// then silently ignores unknown IDs.
719    pub fn receive_publish_error(
720        &mut self,
721        msg: &message::PublishError,
722    ) -> Result<(), EndpointError> {
723        let id = msg.request_id.into_inner();
724        // Check publisher-side publishes first
725        if let Some(sm) = self.publishes.get_mut(&id) {
726            sm.on_publish_error()?;
727            return Ok(());
728        }
729        // Then check subscriber-side subscriptions
730        if let Some(sm) = self.subscriptions.get_mut(&id) {
731            sm.on_subscribe_error()?;
732        }
733        Ok(())
734    }
735
736    // ── Unified message dispatch ───────────────────────────────
737
738    /// Dispatch an incoming control message to the appropriate handler.
739    pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
740        match msg {
741            ControlMessage::GoAway(ref m) => self.receive_goaway(m),
742            ControlMessage::MaxRequestId(ref m) => self.receive_max_request_id(m),
743            ControlMessage::RequestsBlocked(ref m) => self.receive_requests_blocked(m),
744            ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(m),
745            ControlMessage::SubscribeError(ref m) => self.receive_subscribe_error(m),
746            ControlMessage::SubscribeUpdate(ref m) => self.receive_subscribe_update(m),
747            ControlMessage::PublishDone(ref m) => self.receive_publish_done(m),
748            ControlMessage::PublishOk(ref m) => self.receive_publish_ok(m),
749            ControlMessage::PublishError(ref m) => self.receive_publish_error(m),
750            ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(m),
751            ControlMessage::FetchError(ref m) => self.receive_fetch_error(m),
752            ControlMessage::SubscribeNamespaceOk(ref m) => self.receive_subscribe_namespace_ok(m),
753            ControlMessage::SubscribeNamespaceError(ref m) => {
754                self.receive_subscribe_namespace_error(m)
755            }
756            ControlMessage::PublishNamespaceOk(ref m) => self.receive_publish_namespace_ok(m),
757            ControlMessage::PublishNamespaceError(ref m) => self.receive_publish_namespace_error(m),
758            ControlMessage::PublishNamespaceDone(ref m) => self.receive_publish_namespace_done(m),
759            ControlMessage::TrackStatusOk(ref m) => self.receive_track_status_ok(m),
760            ControlMessage::TrackStatusError(ref m) => self.receive_track_status_error(m),
761            _ => Ok(()),
762        }
763    }
764}