1#![allow(missing_docs)]
2use std::collections::HashMap;
18
19use crate::draft18::fetch::{FetchError, FetchStateMachine};
20use crate::draft18::namespace::{
21 NamespaceError, PublishNamespaceStateMachine, SubscribeNamespaceStateMachine,
22};
23use crate::draft18::publish::{PublishError as PublishFlowError, PublishStateMachine};
24use crate::draft18::session::request_id::{RequestIdAllocator, RequestIdError, Role};
25use crate::draft18::session::setup::{self, SetupError};
26use crate::draft18::session::state::{SessionError, SessionState, SessionStateMachine};
27use crate::draft18::subscription::{SubscriptionError, SubscriptionStateMachine};
28use crate::draft18::track_status::{TrackStatusError, TrackStatusStateMachine};
29use moqtap_codec::draft18::message::{
30 self, ControlMessage, Fetch, FetchPayload, FetchType, GoAway, Publish, PublishBlocked,
31 PublishDone, PublishNamespace, RequestError, RequestOk, RequestUpdate, Setup, Subscribe,
32 SubscribeNamespace, SubscribeOk, SubscribeTracks,
33};
34use moqtap_codec::kvp::KeyValuePair;
35use moqtap_codec::types::*;
36use moqtap_codec::varint::VarInt;
37
38#[derive(Debug, thiserror::Error)]
40pub enum EndpointError {
41 #[error("session error: {0}")]
42 Session(#[from] SessionError),
43 #[error("request ID error: {0}")]
44 RequestId(#[from] RequestIdError),
45 #[error("subscription error: {0}")]
46 Subscription(#[from] SubscriptionError),
47 #[error("fetch error: {0}")]
48 Fetch(#[from] FetchError),
49 #[error("namespace error: {0}")]
50 Namespace(#[from] NamespaceError),
51 #[error("track status error: {0}")]
52 TrackStatus(#[from] TrackStatusError),
53 #[error("publish flow error: {0}")]
54 PublishFlow(#[from] PublishFlowError),
55 #[error("setup error: {0}")]
56 Setup(#[from] SetupError),
57 #[error("unknown request ID: {0}")]
58 UnknownRequest(u64),
59 #[error(
60 "response message received on control stream; d18 responses belong on bidi request streams"
61 )]
62 ResponseOnControlStream,
63 #[error("session not active")]
64 NotActive,
65 #[error("session is draining, no new requests allowed")]
66 Draining,
67}
68
69pub struct Endpoint {
70 role: Role,
71 session: SessionStateMachine,
72 request_ids: RequestIdAllocator,
73 subscriptions: HashMap<u64, SubscriptionStateMachine>,
74 fetches: HashMap<u64, FetchStateMachine>,
75 subscribe_namespaces: HashMap<u64, SubscribeNamespaceStateMachine>,
76 subscribe_tracks: HashMap<u64, SubscribeNamespaceStateMachine>,
77 publish_namespaces: HashMap<u64, PublishNamespaceStateMachine>,
78 track_statuses: HashMap<u64, TrackStatusStateMachine>,
79 publishes: HashMap<u64, PublishStateMachine>,
80 goaway_uri: Option<Vec<u8>>,
81}
82
83impl Endpoint {
84 pub fn new(role: Role) -> Self {
85 Self {
86 role,
87 session: SessionStateMachine::new(),
88 request_ids: RequestIdAllocator::new(role),
89 subscriptions: HashMap::new(),
90 fetches: HashMap::new(),
91 subscribe_namespaces: HashMap::new(),
92 subscribe_tracks: HashMap::new(),
93 publish_namespaces: HashMap::new(),
94 track_statuses: HashMap::new(),
95 publishes: HashMap::new(),
96 goaway_uri: None,
97 }
98 }
99
100 pub fn role(&self) -> Role {
101 self.role
102 }
103
104 pub fn session_state(&self) -> SessionState {
105 self.session.state()
106 }
107
108 pub fn goaway_uri(&self) -> Option<&[u8]> {
109 self.goaway_uri.as_deref()
110 }
111
112 pub fn active_subscription_count(&self) -> usize {
113 self.subscriptions.len()
114 }
115
116 pub fn active_fetch_count(&self) -> usize {
117 self.fetches.len()
118 }
119
120 pub fn active_subscribe_namespace_count(&self) -> usize {
121 self.subscribe_namespaces.len()
122 }
123
124 pub fn active_subscribe_tracks_count(&self) -> usize {
125 self.subscribe_tracks.len()
126 }
127
128 pub fn active_publish_namespace_count(&self) -> usize {
129 self.publish_namespaces.len()
130 }
131
132 pub fn active_track_status_count(&self) -> usize {
133 self.track_statuses.len()
134 }
135
136 pub fn active_publish_count(&self) -> usize {
137 self.publishes.len()
138 }
139
140 pub fn connect(&mut self) -> Result<(), EndpointError> {
143 self.session.on_connect()?;
144 Ok(())
145 }
146
147 pub fn close(&mut self) -> Result<(), EndpointError> {
148 self.session.on_close()?;
149 Ok(())
150 }
151
152 pub fn send_setup(
157 &mut self,
158 options: Vec<KeyValuePair>,
159 ) -> Result<ControlMessage, EndpointError> {
160 let msg = Setup { options };
161 setup::validate_setup(&msg)?;
162 Ok(ControlMessage::Setup(msg))
163 }
164
165 pub fn receive_setup(&mut self, msg: &Setup) -> Result<(), EndpointError> {
167 setup::validate_setup(msg)?;
168 self.session.on_setup_complete()?;
169 Ok(())
170 }
171
172 pub fn receive_goaway(&mut self, msg: &GoAway) -> Result<(), EndpointError> {
175 self.session.on_goaway()?;
176 self.goaway_uri = Some(msg.new_session_uri.clone());
177 Ok(())
178 }
179
180 fn require_active_or_err(&self) -> Result<(), EndpointError> {
181 match self.session.state() {
182 SessionState::Active => Ok(()),
183 SessionState::Draining => Err(EndpointError::Draining),
184 _ => Err(EndpointError::NotActive),
185 }
186 }
187
188 pub fn subscribe(
191 &mut self,
192 track_namespace: TrackNamespace,
193 track_name: Vec<u8>,
194 parameters: Vec<KeyValuePair>,
195 ) -> Result<(VarInt, ControlMessage), EndpointError> {
196 self.require_active_or_err()?;
197 let req_id = self.request_ids.allocate()?;
198
199 let mut sm = SubscriptionStateMachine::new();
200 sm.on_subscribe_sent()?;
201 self.subscriptions.insert(req_id.into_inner(), sm);
202
203 let msg = ControlMessage::Subscribe(Subscribe {
204 request_id: req_id,
205 track_namespace,
206 track_name,
207 parameters,
208 });
209 Ok((req_id, msg))
210 }
211
212 pub fn receive_subscribe_ok(
216 &mut self,
217 request_id: VarInt,
218 _msg: &SubscribeOk,
219 ) -> Result<(), EndpointError> {
220 let id = request_id.into_inner();
221 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
222 sm.on_subscribe_ok()?;
223 Ok(())
224 }
225
226 pub fn receive_request_update(&mut self, msg: &RequestUpdate) -> Result<(), EndpointError> {
227 let id = msg.request_id.into_inner();
228 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
229 sm.on_subscribe_update()?;
230 Ok(())
231 }
232
233 pub fn receive_publish_done(
234 &mut self,
235 request_id: VarInt,
236 _msg: &PublishDone,
237 ) -> Result<(), EndpointError> {
238 let id = request_id.into_inner();
239 let sm = self.subscriptions.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
240 sm.on_publish_done()?;
241 Ok(())
242 }
243
244 pub fn fetch(
247 &mut self,
248 track_namespace: TrackNamespace,
249 track_name: Vec<u8>,
250 start_group: VarInt,
251 start_object: VarInt,
252 end_group: VarInt,
253 end_object: VarInt,
254 ) -> Result<(VarInt, ControlMessage), EndpointError> {
255 self.require_active_or_err()?;
256 let req_id = self.request_ids.allocate()?;
257
258 let mut sm = FetchStateMachine::new();
259 sm.on_fetch_sent()?;
260 self.fetches.insert(req_id.into_inner(), sm);
261
262 let msg = ControlMessage::Fetch(Fetch {
263 request_id: req_id,
264 fetch_type: FetchType::Standalone,
265 fetch_payload: FetchPayload::Standalone {
266 track_namespace,
267 track_name,
268 start_group,
269 start_object,
270 end_group,
271 end_object,
272 },
273 parameters: vec![],
274 });
275 Ok((req_id, msg))
276 }
277
278 pub fn joining_fetch(
279 &mut self,
280 joining_request_id: VarInt,
281 joining_start: VarInt,
282 ) -> Result<(VarInt, ControlMessage), EndpointError> {
283 self.require_active_or_err()?;
284 let req_id = self.request_ids.allocate()?;
285
286 let mut sm = FetchStateMachine::new();
287 sm.on_fetch_sent()?;
288 self.fetches.insert(req_id.into_inner(), sm);
289
290 let msg = ControlMessage::Fetch(Fetch {
291 request_id: req_id,
292 fetch_type: FetchType::RelativeJoining,
293 fetch_payload: FetchPayload::Joining { joining_request_id, joining_start },
294 parameters: vec![],
295 });
296 Ok((req_id, msg))
297 }
298
299 pub fn receive_fetch_ok(
300 &mut self,
301 request_id: VarInt,
302 _msg: &message::FetchOk,
303 ) -> Result<(), EndpointError> {
304 let id = request_id.into_inner();
305 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
306 sm.on_fetch_ok()?;
307 Ok(())
308 }
309
310 pub fn on_fetch_stream_fin(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
311 let id = request_id.into_inner();
312 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
313 sm.on_stream_fin()?;
314 Ok(())
315 }
316
317 pub fn on_fetch_stream_reset(&mut self, request_id: VarInt) -> Result<(), EndpointError> {
318 let id = request_id.into_inner();
319 let sm = self.fetches.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
320 sm.on_stream_reset()?;
321 Ok(())
322 }
323
324 pub fn subscribe_namespace(
327 &mut self,
328 namespace_prefix: TrackNamespace,
329 parameters: Vec<KeyValuePair>,
330 ) -> Result<(VarInt, ControlMessage), EndpointError> {
331 self.require_active_or_err()?;
332 let req_id = self.request_ids.allocate()?;
333
334 let mut sm = SubscribeNamespaceStateMachine::new();
335 sm.on_subscribe_namespace_sent()?;
336 self.subscribe_namespaces.insert(req_id.into_inner(), sm);
337
338 let msg = ControlMessage::SubscribeNamespace(SubscribeNamespace {
339 request_id: req_id,
340 namespace_prefix,
341 parameters,
342 });
343 Ok((req_id, msg))
344 }
345
346 pub fn subscribe_tracks(
349 &mut self,
350 namespace_prefix: TrackNamespace,
351 parameters: Vec<KeyValuePair>,
352 ) -> Result<(VarInt, ControlMessage), EndpointError> {
353 self.require_active_or_err()?;
354 let req_id = self.request_ids.allocate()?;
355
356 let mut sm = SubscribeNamespaceStateMachine::new();
360 sm.on_subscribe_namespace_sent()?;
361 self.subscribe_tracks.insert(req_id.into_inner(), sm);
362
363 let msg = ControlMessage::SubscribeTracks(SubscribeTracks {
364 request_id: req_id,
365 namespace_prefix,
366 parameters,
367 });
368 Ok((req_id, msg))
369 }
370
371 pub fn publish_namespace(
374 &mut self,
375 track_namespace: TrackNamespace,
376 parameters: Vec<KeyValuePair>,
377 ) -> Result<(VarInt, ControlMessage), EndpointError> {
378 self.require_active_or_err()?;
379 let req_id = self.request_ids.allocate()?;
380
381 let mut sm = PublishNamespaceStateMachine::new();
382 sm.on_publish_namespace_sent()?;
383 self.publish_namespaces.insert(req_id.into_inner(), sm);
384
385 let msg = ControlMessage::PublishNamespace(PublishNamespace {
386 request_id: req_id,
387 track_namespace,
388 parameters,
389 });
390 Ok((req_id, msg))
391 }
392
393 pub fn track_status(
396 &mut self,
397 track_namespace: TrackNamespace,
398 track_name: Vec<u8>,
399 parameters: Vec<KeyValuePair>,
400 ) -> Result<(VarInt, ControlMessage), EndpointError> {
401 self.require_active_or_err()?;
402 let req_id = self.request_ids.allocate()?;
403 let mut sm = TrackStatusStateMachine::new();
404 sm.on_track_status_sent()?;
405 self.track_statuses.insert(req_id.into_inner(), sm);
406
407 let msg = ControlMessage::TrackStatus(message::TrackStatus {
408 request_id: req_id,
409 track_namespace,
410 track_name,
411 parameters,
412 });
413 Ok((req_id, msg))
414 }
415
416 pub fn publish(
419 &mut self,
420 track_namespace: TrackNamespace,
421 track_name: Vec<u8>,
422 track_alias: VarInt,
423 parameters: Vec<KeyValuePair>,
424 track_properties: Vec<KeyValuePair>,
425 ) -> Result<(VarInt, ControlMessage), EndpointError> {
426 self.require_active_or_err()?;
427 let req_id = self.request_ids.allocate()?;
428 let mut sm = PublishStateMachine::new();
429 sm.on_publish_sent()?;
430 self.publishes.insert(req_id.into_inner(), sm);
431
432 let msg = ControlMessage::Publish(Publish {
433 request_id: req_id,
434 track_namespace,
435 track_name,
436 track_alias,
437 parameters,
438 track_properties,
439 });
440 Ok((req_id, msg))
441 }
442
443 pub fn send_publish_done(
444 &mut self,
445 request_id: VarInt,
446 status_code: VarInt,
447 stream_count: VarInt,
448 reason_phrase: Vec<u8>,
449 ) -> Result<ControlMessage, EndpointError> {
450 let id = request_id.into_inner();
451 let sm = self.publishes.get_mut(&id).ok_or(EndpointError::UnknownRequest(id))?;
452 sm.on_publish_done_sent()?;
453 Ok(ControlMessage::PublishDone(PublishDone { status_code, stream_count, reason_phrase }))
454 }
455
456 pub fn receive_request_ok(
462 &mut self,
463 request_id: VarInt,
464 _msg: &RequestOk,
465 ) -> Result<(), EndpointError> {
466 let id = request_id.into_inner();
467 if let Some(sm) = self.publishes.get_mut(&id) {
468 sm.on_publish_ok()?;
469 return Ok(());
470 }
471 if let Some(sm) = self.subscribe_namespaces.get_mut(&id) {
472 sm.on_subscribe_namespace_ok()?;
473 return Ok(());
474 }
475 if let Some(sm) = self.subscribe_tracks.get_mut(&id) {
476 sm.on_subscribe_namespace_ok()?;
477 return Ok(());
478 }
479 if let Some(sm) = self.publish_namespaces.get_mut(&id) {
480 sm.on_publish_namespace_ok()?;
481 return Ok(());
482 }
483 if let Some(sm) = self.track_statuses.get_mut(&id) {
484 sm.on_track_status_ok()?;
485 return Ok(());
486 }
487 Err(EndpointError::UnknownRequest(id))
488 }
489
490 pub fn receive_request_error(
493 &mut self,
494 request_id: VarInt,
495 _msg: &RequestError,
496 ) -> Result<(), EndpointError> {
497 let id = request_id.into_inner();
498 if let Some(sm) = self.subscriptions.get_mut(&id) {
499 sm.on_subscribe_error()?;
500 return Ok(());
501 }
502 if let Some(sm) = self.fetches.get_mut(&id) {
503 sm.on_fetch_error()?;
504 return Ok(());
505 }
506 if let Some(sm) = self.publishes.get_mut(&id) {
507 sm.on_publish_error()?;
508 return Ok(());
509 }
510 if let Some(sm) = self.subscribe_namespaces.get_mut(&id) {
511 sm.on_subscribe_namespace_error()?;
512 return Ok(());
513 }
514 if let Some(sm) = self.subscribe_tracks.get_mut(&id) {
515 sm.on_subscribe_namespace_error()?;
516 return Ok(());
517 }
518 if let Some(sm) = self.publish_namespaces.get_mut(&id) {
519 sm.on_publish_namespace_error()?;
520 return Ok(());
521 }
522 if let Some(sm) = self.track_statuses.get_mut(&id) {
523 sm.on_track_status_error()?;
524 return Ok(());
525 }
526 Err(EndpointError::UnknownRequest(id))
527 }
528
529 pub fn receive_namespace(&mut self, _msg: &message::Namespace) -> Result<(), EndpointError> {
532 Ok(())
533 }
534
535 pub fn receive_namespace_done(
536 &mut self,
537 _msg: &message::NamespaceDone,
538 ) -> Result<(), EndpointError> {
539 Ok(())
540 }
541
542 pub fn receive_publish_blocked(&mut self, _msg: &PublishBlocked) -> Result<(), EndpointError> {
543 Ok(())
544 }
545
546 pub fn receive_message(&mut self, msg: ControlMessage) -> Result<(), EndpointError> {
549 match msg {
550 ControlMessage::Setup(ref m) => self.receive_setup(m),
551 ControlMessage::GoAway(ref m) => self.receive_goaway(m),
552 ControlMessage::RequestUpdate(ref m) => self.receive_request_update(m),
553 ControlMessage::Namespace(ref m) => self.receive_namespace(m),
554 ControlMessage::NamespaceDone(ref m) => self.receive_namespace_done(m),
555 ControlMessage::PublishBlocked(ref m) => self.receive_publish_blocked(m),
556 ControlMessage::SubscribeOk(_)
557 | ControlMessage::PublishDone(_)
558 | ControlMessage::FetchOk(_)
559 | ControlMessage::RequestOk(_)
560 | ControlMessage::RequestError(_) => Err(EndpointError::ResponseOnControlStream),
561 _ => Ok(()),
562 }
563 }
564
565 pub fn receive_response_on_stream(
568 &mut self,
569 request_id: VarInt,
570 msg: ControlMessage,
571 ) -> Result<(), EndpointError> {
572 match msg {
573 ControlMessage::SubscribeOk(ref m) => self.receive_subscribe_ok(request_id, m),
574 ControlMessage::PublishDone(ref m) => self.receive_publish_done(request_id, m),
575 ControlMessage::FetchOk(ref m) => self.receive_fetch_ok(request_id, m),
576 ControlMessage::RequestOk(ref m) => self.receive_request_ok(request_id, m),
577 ControlMessage::RequestError(ref m) => self.receive_request_error(request_id, m),
578 _ => Err(EndpointError::ResponseOnControlStream),
579 }
580 }
581}