Skip to main content

moqtap_client/
dispatch.rs

1//! Unified multi-draft entry-point types.
2//!
3//! This module is the facade downstream consumers (e.g. a CLI or desktop app)
4//! use to hold a MoQT connection without caring which draft was negotiated.
5//! It mirrors [`moqtap_codec::dispatch`]: one enum variant per enabled draft,
6//! gated on its feature flag.
7//!
8//! Three types live here:
9//!
10//! - `AnyConnection` — wraps a draft-specific `Connection`.
11//! - `AnyClientEvent` — wraps a draft-specific `ClientEvent`.
12//! - `AnyConnectionObserver` — a trait that receives `AnyClientEvent`s.
13//!   Attached to an `AnyConnection` via `AnyConnection::set_observer`,
14//!   which installs a per-draft adapter on the inner connection.
15//!
16//! Draft-specific protocol methods (e.g. `subscribe`, `fetch`) are not on
17//! `AnyConnection` because their signatures differ across drafts — match
18//! on the variant to reach them.
19
20use std::sync::Arc;
21
22use moqtap_codec::kvp::KeyValuePair;
23use moqtap_codec::version::DraftVersion;
24
25/// Generates the `AnyConnection` and `AnyClientEvent` enums plus the per-draft
26/// observer adapter, with one variant per enabled draft feature.
27macro_rules! dispatch_all {
28    (
29        $(
30            #[cfg(feature = $feat:literal)]
31            $variant:ident => $module:ident,
32        )+
33    ) => {
34        /// A MoQT client connection of any enabled draft version.
35        ///
36        /// Wraps the draft-specific `Connection` type. Methods common to all
37        /// drafts are forwarded; for draft-specific protocol calls, match on
38        /// the variant.
39        pub enum AnyConnection {
40            $(
41                #[cfg(feature = $feat)]
42                #[doc = concat!("A draft-", $feat, " connection.")]
43                $variant(crate::$module::connection::Connection),
44            )+
45        }
46
47        impl AnyConnection {
48            /// Returns the draft version this connection is using.
49            #[allow(unreachable_code)]
50            pub fn draft(&self) -> DraftVersion {
51                match self {
52                    $(
53                        #[cfg(feature = $feat)]
54                        Self::$variant(_) => DraftVersion::$variant,
55                    )+
56                    #[allow(unreachable_patterns)]
57                    _ => unreachable!("AnyConnection has no enabled variants"),
58                }
59            }
60
61            /// Attach an observer. The observer is adapted into the
62            /// draft-specific observer trait and installed on the inner
63            /// connection; events are forwarded as [`AnyClientEvent`].
64            ///
65            /// Replaces any previously attached observer.
66            #[allow(unused_variables)]
67            pub fn set_observer(&mut self, observer: Arc<dyn AnyConnectionObserver>) {
68                match self {
69                    $(
70                        #[cfg(feature = $feat)]
71                        Self::$variant(c) => {
72                            c.set_observer(Box::new($variant::Adapter(observer)));
73                        }
74                    )+
75                    #[allow(unreachable_patterns)]
76                    _ => {}
77                }
78            }
79
80            /// Remove any attached observer.
81            pub fn clear_observer(&mut self) {
82                match self {
83                    $(
84                        #[cfg(feature = $feat)]
85                        Self::$variant(c) => c.clear_observer(),
86                    )+
87                    #[allow(unreachable_patterns)]
88                    _ => {}
89                }
90            }
91
92            /// Close the connection with the given application error code
93            /// and reason.
94            #[allow(unused_variables)]
95            pub fn close(&self, code: u32, reason: &[u8]) {
96                match self {
97                    $(
98                        #[cfg(feature = $feat)]
99                        Self::$variant(c) => c.close(code, reason),
100                    )+
101                    #[allow(unreachable_patterns)]
102                    _ => {}
103                }
104            }
105        }
106
107        /// An event from a MoQT connection of any enabled draft version.
108        ///
109        /// Event shapes differ across drafts (e.g. draft-17's
110        /// `SubgroupObjectReceived` carries header types, while earlier
111        /// drafts carry decoded objects). Match on the variant to inspect
112        /// the draft-specific event.
113        #[non_exhaustive]
114        #[derive(Debug, Clone)]
115        pub enum AnyClientEvent {
116            $(
117                #[cfg(feature = $feat)]
118                #[doc = concat!("A draft-", $feat, " event.")]
119                $variant(crate::$module::event::ClientEvent),
120            )+
121        }
122
123        impl AnyClientEvent {
124            /// Returns the draft version this event belongs to.
125            #[allow(unreachable_code)]
126            pub fn draft(&self) -> DraftVersion {
127                match self {
128                    $(
129                        #[cfg(feature = $feat)]
130                        Self::$variant(_) => DraftVersion::$variant,
131                    )+
132                    #[allow(unreachable_patterns)]
133                    _ => unreachable!("AnyClientEvent has no enabled variants"),
134                }
135            }
136        }
137
138        // Per-draft adapter modules. Each holds an `Adapter` struct that
139        // implements the draft's `ConnectionObserver` trait by forwarding to
140        // an `AnyConnectionObserver`.
141        $(
142            #[cfg(feature = $feat)]
143            #[allow(non_snake_case)]
144            mod $variant {
145                use super::{AnyClientEvent, AnyConnectionObserver};
146                use std::sync::Arc;
147
148                pub(super) struct Adapter(pub(super) Arc<dyn AnyConnectionObserver>);
149
150                impl crate::$module::observer::ConnectionObserver for Adapter {
151                    fn on_event(&self, event: &crate::$module::event::ClientEvent) {
152                        self.0.on_event(&AnyClientEvent::$variant(event.clone()));
153                    }
154
155                    fn on_event_owned(&self, event: crate::$module::event::ClientEvent) {
156                        self.0.on_event(&AnyClientEvent::$variant(event));
157                    }
158                }
159            }
160        )+
161    };
162}
163
164dispatch_all! {
165    #[cfg(feature = "draft07")]
166    Draft07 => draft07,
167    #[cfg(feature = "draft08")]
168    Draft08 => draft08,
169    #[cfg(feature = "draft09")]
170    Draft09 => draft09,
171    #[cfg(feature = "draft10")]
172    Draft10 => draft10,
173    #[cfg(feature = "draft11")]
174    Draft11 => draft11,
175    #[cfg(feature = "draft12")]
176    Draft12 => draft12,
177    #[cfg(feature = "draft13")]
178    Draft13 => draft13,
179    #[cfg(feature = "draft14")]
180    Draft14 => draft14,
181    #[cfg(feature = "draft15")]
182    Draft15 => draft15,
183    #[cfg(feature = "draft16")]
184    Draft16 => draft16,
185    #[cfg(feature = "draft17")]
186    Draft17 => draft17,
187    #[cfg(feature = "draft18")]
188    Draft18 => draft18,
189}
190
191/// Draft-agnostic transport choice for [`AnyConnection::connect`].
192#[derive(Debug, Clone)]
193pub enum AnyTransportType {
194    /// Raw QUIC via quinn. The `addr` passed to `connect` should be `host:port`.
195    Quic,
196    /// WebTransport via wtransport. The `url` is the WebTransport endpoint.
197    WebTransport {
198        /// The WebTransport endpoint URL (e.g., `https://host:port/path`).
199        url: String,
200    },
201}
202
203/// Draft-agnostic client configuration. The exact per-draft `ClientConfig`
204/// is constructed internally by [`AnyConnection::connect`] based on `draft`.
205///
206/// Fields that aren't meaningful for the selected draft are ignored:
207/// `additional_versions` is not carried by drafts 15–17 (single-version
208/// setup) and drafts 07–13 always offer their own draft first.
209#[derive(Debug, Clone)]
210pub struct AnyClientConfig {
211    /// Primary draft version for the connection.
212    pub draft: DraftVersion,
213    /// Additional draft versions to offer in CLIENT_SETUP.
214    pub additional_versions: Vec<DraftVersion>,
215    /// Transport type (QUIC or WebTransport).
216    pub transport: AnyTransportType,
217    /// Whether to skip TLS certificate verification (for testing).
218    pub skip_cert_verification: bool,
219    /// Custom CA certificates to trust (DER-encoded).
220    pub ca_certs: Vec<Vec<u8>>,
221    /// Setup parameters to include in CLIENT_SETUP (e.g., auth tokens).
222    pub setup_parameters: Vec<KeyValuePair>,
223}
224
225/// Error returned by [`AnyConnection::connect`] and
226/// [`AnyConnection::recv_and_dispatch`]. Draft-specific errors are flattened
227/// to strings so callers don't have to branch on draft to inspect errors.
228#[derive(Debug, thiserror::Error)]
229#[error("{0}")]
230pub struct AnyConnectionError(pub String);
231
232impl AnyConnection {
233    /// Connect to a MoQT server using the requested draft. Builds the
234    /// draft-specific `ClientConfig` from the provided [`AnyClientConfig`]
235    /// and dispatches to the appropriate `Connection::connect`.
236    pub async fn connect(addr: &str, config: AnyClientConfig) -> Result<Self, AnyConnectionError> {
237        match config.draft {
238            #[cfg(feature = "draft07")]
239            DraftVersion::Draft07 => {
240                use crate::draft07::connection::{ClientConfig, Connection, TransportType};
241                let transport = match config.transport {
242                    AnyTransportType::Quic => TransportType::Quic,
243                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
244                };
245                let inner = ClientConfig {
246                    additional_versions: config.additional_versions,
247                    transport,
248                    skip_cert_verification: config.skip_cert_verification,
249                    ca_certs: config.ca_certs,
250                    setup_parameters: config.setup_parameters,
251                };
252                let c = Connection::connect(addr, inner)
253                    .await
254                    .map_err(|e| AnyConnectionError(e.to_string()))?;
255                Ok(AnyConnection::Draft07(c))
256            }
257            #[cfg(feature = "draft08")]
258            DraftVersion::Draft08 => {
259                use crate::draft08::connection::{ClientConfig, Connection, TransportType};
260                let transport = match config.transport {
261                    AnyTransportType::Quic => TransportType::Quic,
262                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
263                };
264                let inner = ClientConfig {
265                    additional_versions: config.additional_versions,
266                    transport,
267                    skip_cert_verification: config.skip_cert_verification,
268                    ca_certs: config.ca_certs,
269                    setup_parameters: config.setup_parameters,
270                };
271                let c = Connection::connect(addr, inner)
272                    .await
273                    .map_err(|e| AnyConnectionError(e.to_string()))?;
274                Ok(AnyConnection::Draft08(c))
275            }
276            #[cfg(feature = "draft09")]
277            DraftVersion::Draft09 => {
278                use crate::draft09::connection::{ClientConfig, Connection, TransportType};
279                let transport = match config.transport {
280                    AnyTransportType::Quic => TransportType::Quic,
281                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
282                };
283                let inner = ClientConfig {
284                    additional_versions: config.additional_versions,
285                    transport,
286                    skip_cert_verification: config.skip_cert_verification,
287                    ca_certs: config.ca_certs,
288                    setup_parameters: config.setup_parameters,
289                };
290                let c = Connection::connect(addr, inner)
291                    .await
292                    .map_err(|e| AnyConnectionError(e.to_string()))?;
293                Ok(AnyConnection::Draft09(c))
294            }
295            #[cfg(feature = "draft10")]
296            DraftVersion::Draft10 => {
297                use crate::draft10::connection::{ClientConfig, Connection, TransportType};
298                let transport = match config.transport {
299                    AnyTransportType::Quic => TransportType::Quic,
300                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
301                };
302                let inner = ClientConfig {
303                    additional_versions: config.additional_versions,
304                    transport,
305                    skip_cert_verification: config.skip_cert_verification,
306                    ca_certs: config.ca_certs,
307                    setup_parameters: config.setup_parameters,
308                };
309                let c = Connection::connect(addr, inner)
310                    .await
311                    .map_err(|e| AnyConnectionError(e.to_string()))?;
312                Ok(AnyConnection::Draft10(c))
313            }
314            #[cfg(feature = "draft11")]
315            DraftVersion::Draft11 => {
316                use crate::draft11::connection::{ClientConfig, Connection, TransportType};
317                let transport = match config.transport {
318                    AnyTransportType::Quic => TransportType::Quic,
319                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
320                };
321                let inner = ClientConfig {
322                    additional_versions: config.additional_versions,
323                    transport,
324                    skip_cert_verification: config.skip_cert_verification,
325                    ca_certs: config.ca_certs,
326                    setup_parameters: config.setup_parameters,
327                };
328                let c = Connection::connect(addr, inner)
329                    .await
330                    .map_err(|e| AnyConnectionError(e.to_string()))?;
331                Ok(AnyConnection::Draft11(c))
332            }
333            #[cfg(feature = "draft12")]
334            DraftVersion::Draft12 => {
335                use crate::draft12::connection::{ClientConfig, Connection, TransportType};
336                let transport = match config.transport {
337                    AnyTransportType::Quic => TransportType::Quic,
338                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
339                };
340                let inner = ClientConfig {
341                    additional_versions: config.additional_versions,
342                    transport,
343                    skip_cert_verification: config.skip_cert_verification,
344                    ca_certs: config.ca_certs,
345                    setup_parameters: config.setup_parameters,
346                };
347                let c = Connection::connect(addr, inner)
348                    .await
349                    .map_err(|e| AnyConnectionError(e.to_string()))?;
350                Ok(AnyConnection::Draft12(c))
351            }
352            #[cfg(feature = "draft13")]
353            DraftVersion::Draft13 => {
354                use crate::draft13::connection::{ClientConfig, Connection, TransportType};
355                let transport = match config.transport {
356                    AnyTransportType::Quic => TransportType::Quic,
357                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
358                };
359                let inner = ClientConfig {
360                    additional_versions: config.additional_versions,
361                    transport,
362                    skip_cert_verification: config.skip_cert_verification,
363                    ca_certs: config.ca_certs,
364                    setup_parameters: config.setup_parameters,
365                };
366                let c = Connection::connect(addr, inner)
367                    .await
368                    .map_err(|e| AnyConnectionError(e.to_string()))?;
369                Ok(AnyConnection::Draft13(c))
370            }
371            #[cfg(feature = "draft14")]
372            DraftVersion::Draft14 => {
373                use crate::draft14::connection::{ClientConfig, Connection, TransportType};
374                let transport = match config.transport {
375                    AnyTransportType::Quic => TransportType::Quic,
376                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
377                };
378                let inner = ClientConfig {
379                    draft: config.draft,
380                    additional_versions: config.additional_versions,
381                    transport,
382                    skip_cert_verification: config.skip_cert_verification,
383                    ca_certs: config.ca_certs,
384                    setup_parameters: config.setup_parameters,
385                };
386                let c = Connection::connect(addr, inner)
387                    .await
388                    .map_err(|e| AnyConnectionError(e.to_string()))?;
389                Ok(AnyConnection::Draft14(c))
390            }
391            #[cfg(feature = "draft15")]
392            DraftVersion::Draft15 => {
393                use crate::draft15::connection::{ClientConfig, Connection, TransportType};
394                let transport = match config.transport {
395                    AnyTransportType::Quic => TransportType::Quic,
396                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
397                };
398                let inner = ClientConfig {
399                    draft: config.draft,
400                    transport,
401                    skip_cert_verification: config.skip_cert_verification,
402                    ca_certs: config.ca_certs,
403                    setup_parameters: config.setup_parameters,
404                };
405                let c = Connection::connect(addr, inner)
406                    .await
407                    .map_err(|e| AnyConnectionError(e.to_string()))?;
408                Ok(AnyConnection::Draft15(c))
409            }
410            #[cfg(feature = "draft16")]
411            DraftVersion::Draft16 => {
412                use crate::draft16::connection::{ClientConfig, Connection, TransportType};
413                let transport = match config.transport {
414                    AnyTransportType::Quic => TransportType::Quic,
415                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
416                };
417                let inner = ClientConfig {
418                    draft: config.draft,
419                    transport,
420                    skip_cert_verification: config.skip_cert_verification,
421                    ca_certs: config.ca_certs,
422                    setup_parameters: config.setup_parameters,
423                };
424                let c = Connection::connect(addr, inner)
425                    .await
426                    .map_err(|e| AnyConnectionError(e.to_string()))?;
427                Ok(AnyConnection::Draft16(c))
428            }
429            #[cfg(feature = "draft17")]
430            DraftVersion::Draft17 => {
431                use crate::draft17::connection::{ClientConfig, Connection, TransportType};
432                let transport = match config.transport {
433                    AnyTransportType::Quic => TransportType::Quic,
434                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
435                };
436                let inner = ClientConfig {
437                    draft: config.draft,
438                    transport,
439                    skip_cert_verification: config.skip_cert_verification,
440                    ca_certs: config.ca_certs,
441                    setup_parameters: config.setup_parameters,
442                };
443                let c = Connection::connect(addr, inner)
444                    .await
445                    .map_err(|e| AnyConnectionError(e.to_string()))?;
446                Ok(AnyConnection::Draft17(c))
447            }
448            #[cfg(feature = "draft18")]
449            DraftVersion::Draft18 => {
450                use crate::draft18::connection::{ClientConfig, Connection, TransportType};
451                let transport = match config.transport {
452                    AnyTransportType::Quic => TransportType::Quic,
453                    AnyTransportType::WebTransport { url } => TransportType::WebTransport { url },
454                };
455                let inner = ClientConfig {
456                    draft: config.draft,
457                    transport,
458                    skip_cert_verification: config.skip_cert_verification,
459                    ca_certs: config.ca_certs,
460                    setup_parameters: config.setup_parameters,
461                };
462                let c = Connection::connect(addr, inner)
463                    .await
464                    .map_err(|e| AnyConnectionError(e.to_string()))?;
465                Ok(AnyConnection::Draft18(c))
466            }
467            #[allow(unreachable_patterns)]
468            other => Err(AnyConnectionError(format!("draft {other:?} not enabled in this build",))),
469        }
470    }
471
472    /// Read and dispatch one control message on the active draft. Draft-specific
473    /// control-message return values are discarded because event delivery goes
474    /// through the attached observer; callers only care about success/failure.
475    pub async fn recv_and_dispatch(&mut self) -> Result<(), AnyConnectionError> {
476        match self {
477            #[cfg(feature = "draft07")]
478            Self::Draft07(c) => c
479                .recv_and_dispatch()
480                .await
481                .map(|_| ())
482                .map_err(|e| AnyConnectionError(e.to_string())),
483            #[cfg(feature = "draft08")]
484            Self::Draft08(c) => c
485                .recv_and_dispatch()
486                .await
487                .map(|_| ())
488                .map_err(|e| AnyConnectionError(e.to_string())),
489            #[cfg(feature = "draft09")]
490            Self::Draft09(c) => c
491                .recv_and_dispatch()
492                .await
493                .map(|_| ())
494                .map_err(|e| AnyConnectionError(e.to_string())),
495            #[cfg(feature = "draft10")]
496            Self::Draft10(c) => c
497                .recv_and_dispatch()
498                .await
499                .map(|_| ())
500                .map_err(|e| AnyConnectionError(e.to_string())),
501            #[cfg(feature = "draft11")]
502            Self::Draft11(c) => c
503                .recv_and_dispatch()
504                .await
505                .map(|_| ())
506                .map_err(|e| AnyConnectionError(e.to_string())),
507            #[cfg(feature = "draft12")]
508            Self::Draft12(c) => c
509                .recv_and_dispatch()
510                .await
511                .map(|_| ())
512                .map_err(|e| AnyConnectionError(e.to_string())),
513            #[cfg(feature = "draft13")]
514            Self::Draft13(c) => c
515                .recv_and_dispatch()
516                .await
517                .map(|_| ())
518                .map_err(|e| AnyConnectionError(e.to_string())),
519            #[cfg(feature = "draft14")]
520            Self::Draft14(c) => c
521                .recv_and_dispatch()
522                .await
523                .map(|_| ())
524                .map_err(|e| AnyConnectionError(e.to_string())),
525            #[cfg(feature = "draft15")]
526            Self::Draft15(c) => c
527                .recv_and_dispatch()
528                .await
529                .map(|_| ())
530                .map_err(|e| AnyConnectionError(e.to_string())),
531            #[cfg(feature = "draft16")]
532            Self::Draft16(c) => c
533                .recv_and_dispatch()
534                .await
535                .map(|_| ())
536                .map_err(|e| AnyConnectionError(e.to_string())),
537            #[cfg(feature = "draft17")]
538            Self::Draft17(c) => c
539                .recv_and_dispatch()
540                .await
541                .map(|_| ())
542                .map_err(|e| AnyConnectionError(e.to_string())),
543            #[cfg(feature = "draft18")]
544            Self::Draft18(c) => c
545                .recv_and_dispatch()
546                .await
547                .map(|_| ())
548                .map_err(|e| AnyConnectionError(e.to_string())),
549            #[allow(unreachable_patterns)]
550            _ => Err(AnyConnectionError("AnyConnection has no enabled variants".into())),
551        }
552    }
553
554    // ── Unified control-message helpers ──────────────────────────────────
555    //
556    // Draft-agnostic shorthands. Each dispatches to the active variant and
557    // defaults fields not expressible in the unified shape; drafts that
558    // lack the operation return an `AnyConnectionError`. Match on the
559    // variant directly when full per-draft control is needed.
560
561    /// Send an UNSUBSCRIBE for the given request ID. Identical across all drafts.
562    #[allow(unused_variables)]
563    pub async fn unsubscribe(
564        &mut self,
565        request_id: moqtap_codec::varint::VarInt,
566    ) -> Result<(), AnyConnectionError> {
567        match self {
568            #[cfg(feature = "draft07")]
569            Self::Draft07(c) => {
570                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
571            }
572            #[cfg(feature = "draft08")]
573            Self::Draft08(c) => {
574                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
575            }
576            #[cfg(feature = "draft09")]
577            Self::Draft09(c) => {
578                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
579            }
580            #[cfg(feature = "draft10")]
581            Self::Draft10(c) => {
582                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
583            }
584            #[cfg(feature = "draft11")]
585            Self::Draft11(c) => {
586                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
587            }
588            #[cfg(feature = "draft12")]
589            Self::Draft12(c) => {
590                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
591            }
592            #[cfg(feature = "draft13")]
593            Self::Draft13(c) => {
594                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
595            }
596            #[cfg(feature = "draft14")]
597            Self::Draft14(c) => {
598                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
599            }
600            #[cfg(feature = "draft15")]
601            Self::Draft15(c) => {
602                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
603            }
604            #[cfg(feature = "draft16")]
605            Self::Draft16(c) => {
606                c.unsubscribe(request_id).await.map_err(|e| AnyConnectionError(e.to_string()))
607            }
608            #[allow(unreachable_patterns)]
609            other => Err(AnyConnectionError(format!(
610                "unsubscribe: not yet wired up for draft {:?} via AnyConnection",
611                other.draft()
612            ))),
613        }
614    }
615
616    /// Send a SUBSCRIBE with the given filter, priority, and group order.
617    /// Supported on drafts 12–17. Drafts 15–17 carry priority/order/filter via
618    /// parameters; this helper passes an empty parameter list, so those fields
619    /// default to protocol-defined values on those drafts.
620    #[allow(unused_variables)]
621    pub async fn subscribe(
622        &mut self,
623        namespace: moqtap_codec::types::TrackNamespace,
624        track_name: Vec<u8>,
625        subscriber_priority: u8,
626        group_order: moqtap_codec::types::GroupOrder,
627        filter_type: moqtap_codec::types::FilterType,
628    ) -> Result<moqtap_codec::varint::VarInt, AnyConnectionError> {
629        use moqtap_codec::varint::VarInt;
630        match self {
631            #[cfg(feature = "draft12")]
632            Self::Draft12(c) => {
633                let go = VarInt::from_u64(group_order as u64)
634                    .map_err(|e| AnyConnectionError(e.to_string()))?;
635                let ft = VarInt::from_u64(filter_type as u64)
636                    .map_err(|e| AnyConnectionError(e.to_string()))?;
637                c.subscribe(namespace, track_name, subscriber_priority, go, ft)
638                    .await
639                    .map_err(|e| AnyConnectionError(e.to_string()))
640            }
641            #[cfg(feature = "draft13")]
642            Self::Draft13(c) => {
643                let go = VarInt::from_u64(group_order as u64)
644                    .map_err(|e| AnyConnectionError(e.to_string()))?;
645                let ft = VarInt::from_u64(filter_type as u64)
646                    .map_err(|e| AnyConnectionError(e.to_string()))?;
647                c.subscribe(namespace, track_name, subscriber_priority, go, ft)
648                    .await
649                    .map_err(|e| AnyConnectionError(e.to_string()))
650            }
651            #[cfg(feature = "draft14")]
652            Self::Draft14(c) => c
653                .subscribe(namespace, track_name, subscriber_priority, group_order, filter_type)
654                .await
655                .map_err(|e| AnyConnectionError(e.to_string())),
656            #[cfg(feature = "draft15")]
657            Self::Draft15(c) => c
658                .subscribe(namespace, track_name, Vec::new())
659                .await
660                .map_err(|e| AnyConnectionError(e.to_string())),
661            #[cfg(feature = "draft16")]
662            Self::Draft16(c) => c
663                .subscribe(namespace, track_name, Vec::new())
664                .await
665                .map_err(|e| AnyConnectionError(e.to_string())),
666            #[cfg(feature = "draft17")]
667            Self::Draft17(c) => c
668                .subscribe(namespace, track_name, Vec::new())
669                .await
670                .map_err(|e| AnyConnectionError(e.to_string())),
671            #[cfg(feature = "draft18")]
672            Self::Draft18(c) => c
673                .subscribe(namespace, track_name, Vec::new())
674                .await
675                .map_err(|e| AnyConnectionError(e.to_string())),
676            #[allow(unreachable_patterns)]
677            other => Err(AnyConnectionError(format!(
678                "subscribe: not yet wired up for draft {:?} via AnyConnection",
679                other.draft()
680            ))),
681        }
682    }
683
684    /// Send a standalone FETCH. Supported on drafts 14–17. On draft 14 the
685    /// `end_group`/`end_object` are ignored (draft 14's wrapper only accepts
686    /// a start location).
687    #[allow(unused_variables)]
688    pub async fn fetch(
689        &mut self,
690        namespace: moqtap_codec::types::TrackNamespace,
691        track_name: Vec<u8>,
692        start_group: moqtap_codec::varint::VarInt,
693        start_object: moqtap_codec::varint::VarInt,
694        end_group: moqtap_codec::varint::VarInt,
695        end_object: moqtap_codec::varint::VarInt,
696    ) -> Result<moqtap_codec::varint::VarInt, AnyConnectionError> {
697        match self {
698            #[cfg(feature = "draft14")]
699            Self::Draft14(c) => c
700                .fetch(namespace, track_name, start_group, start_object)
701                .await
702                .map_err(|e| AnyConnectionError(e.to_string())),
703            #[cfg(feature = "draft15")]
704            Self::Draft15(c) => c
705                .fetch(namespace, track_name, start_group, start_object, end_group, end_object)
706                .await
707                .map_err(|e| AnyConnectionError(e.to_string())),
708            #[cfg(feature = "draft16")]
709            Self::Draft16(c) => c
710                .fetch(namespace, track_name, start_group, start_object, end_group, end_object)
711                .await
712                .map_err(|e| AnyConnectionError(e.to_string())),
713            #[cfg(feature = "draft17")]
714            Self::Draft17(c) => c
715                .fetch(namespace, track_name, start_group, start_object, end_group, end_object)
716                .await
717                .map_err(|e| AnyConnectionError(e.to_string())),
718            #[cfg(feature = "draft18")]
719            Self::Draft18(c) => c
720                .fetch(namespace, track_name, start_group, start_object, end_group, end_object)
721                .await
722                .map_err(|e| AnyConnectionError(e.to_string())),
723            #[allow(unreachable_patterns)]
724            other => Err(AnyConnectionError(format!(
725                "fetch: not yet wired up for draft {:?} via AnyConnection",
726                other.draft()
727            ))),
728        }
729    }
730
731    /// Send a TRACK_STATUS query for the given track. Supported on drafts 14–17.
732    /// On drafts 15–17, passes an empty parameter list.
733    #[allow(unused_variables)]
734    pub async fn track_status(
735        &mut self,
736        namespace: moqtap_codec::types::TrackNamespace,
737        track_name: Vec<u8>,
738    ) -> Result<moqtap_codec::varint::VarInt, AnyConnectionError> {
739        match self {
740            #[cfg(feature = "draft14")]
741            Self::Draft14(c) => c
742                .track_status(namespace, track_name)
743                .await
744                .map_err(|e| AnyConnectionError(e.to_string())),
745            #[cfg(feature = "draft15")]
746            Self::Draft15(c) => c
747                .track_status(namespace, track_name, Vec::new())
748                .await
749                .map_err(|e| AnyConnectionError(e.to_string())),
750            #[cfg(feature = "draft16")]
751            Self::Draft16(c) => c
752                .track_status(namespace, track_name, Vec::new())
753                .await
754                .map_err(|e| AnyConnectionError(e.to_string())),
755            #[cfg(feature = "draft17")]
756            Self::Draft17(c) => c
757                .track_status(namespace, track_name, Vec::new())
758                .await
759                .map_err(|e| AnyConnectionError(e.to_string())),
760            #[cfg(feature = "draft18")]
761            Self::Draft18(c) => c
762                .track_status(namespace, track_name, Vec::new())
763                .await
764                .map_err(|e| AnyConnectionError(e.to_string())),
765            #[allow(unreachable_patterns)]
766            other => Err(AnyConnectionError(format!(
767                "track_status: not yet wired up for draft {:?} via AnyConnection",
768                other.draft()
769            ))),
770        }
771    }
772
773    /// Send a SUBSCRIBE_NAMESPACE (or SUBSCRIBE_ANNOUNCES on drafts 11–12).
774    /// Supported on drafts 11–17. Drafts 16–17 pass default subscribe options
775    /// and an empty parameter list.
776    #[allow(unused_variables)]
777    pub async fn subscribe_namespace(
778        &mut self,
779        namespace_prefix: moqtap_codec::types::TrackNamespace,
780    ) -> Result<moqtap_codec::varint::VarInt, AnyConnectionError> {
781        use moqtap_codec::varint::VarInt;
782        match self {
783            #[cfg(feature = "draft11")]
784            Self::Draft11(c) => c
785                .subscribe_announces(namespace_prefix)
786                .await
787                .map_err(|e| AnyConnectionError(e.to_string())),
788            #[cfg(feature = "draft12")]
789            Self::Draft12(c) => c
790                .subscribe_announces(namespace_prefix)
791                .await
792                .map_err(|e| AnyConnectionError(e.to_string())),
793            #[cfg(feature = "draft13")]
794            Self::Draft13(c) => c
795                .subscribe_namespace(namespace_prefix)
796                .await
797                .map_err(|e| AnyConnectionError(e.to_string())),
798            #[cfg(feature = "draft14")]
799            Self::Draft14(c) => c
800                .subscribe_namespace(namespace_prefix)
801                .await
802                .map_err(|e| AnyConnectionError(e.to_string())),
803            #[cfg(feature = "draft15")]
804            Self::Draft15(c) => c
805                .subscribe_namespace(namespace_prefix, Vec::new())
806                .await
807                .map_err(|e| AnyConnectionError(e.to_string())),
808            #[cfg(feature = "draft16")]
809            Self::Draft16(c) => {
810                let opts = VarInt::from_u64(0).expect("0 fits in VarInt");
811                c.subscribe_namespace(namespace_prefix, opts, Vec::new())
812                    .await
813                    .map_err(|e| AnyConnectionError(e.to_string()))
814            }
815            #[cfg(feature = "draft17")]
816            Self::Draft17(c) => {
817                let opts = VarInt::from_u64(0).expect("0 fits in VarInt");
818                c.subscribe_namespace(namespace_prefix, opts, Vec::new())
819                    .await
820                    .map_err(|e| AnyConnectionError(e.to_string()))
821            }
822            #[cfg(feature = "draft18")]
823            Self::Draft18(c) => c
824                .subscribe_namespace(namespace_prefix, Vec::new())
825                .await
826                .map_err(|e| AnyConnectionError(e.to_string())),
827            #[allow(unreachable_patterns)]
828            other => Err(AnyConnectionError(format!(
829                "subscribe_namespace: not yet wired up for draft {:?} via AnyConnection",
830                other.draft()
831            ))),
832        }
833    }
834
835    /// Send a SUBSCRIBE_UPDATE for an active subscription. Draft 14 only —
836    /// earlier/later drafts either don't expose a matching client wrapper or
837    /// use a different message shape.
838    #[allow(unused_variables)]
839    pub async fn subscribe_update(
840        &mut self,
841        subscription_request_id: moqtap_codec::varint::VarInt,
842        start_location: moqtap_codec::types::Location,
843        end_group: moqtap_codec::varint::VarInt,
844        subscriber_priority: u8,
845        forward: moqtap_codec::types::Forward,
846    ) -> Result<(), AnyConnectionError> {
847        match self {
848            #[cfg(feature = "draft14")]
849            Self::Draft14(c) => c
850                .subscribe_update(
851                    subscription_request_id,
852                    start_location,
853                    end_group,
854                    subscriber_priority,
855                    forward,
856                    Vec::new(),
857                )
858                .await
859                .map(|_| ())
860                .map_err(|e| AnyConnectionError(e.to_string())),
861            #[allow(unreachable_patterns)]
862            other => Err(AnyConnectionError(format!(
863                "subscribe_update: not yet wired up for draft {:?} via AnyConnection",
864                other.draft()
865            ))),
866        }
867    }
868}
869
870/// Trait for receiving events from an [`AnyConnection`].
871///
872/// Implementations must be `Send + Sync` because the adapter installed on
873/// the inner draft-specific connection may emit events from async tasks.
874/// `on_event` takes `&self` — implementations that need mutation should use
875/// interior mutability (e.g. `Mutex`, `mpsc::Sender`).
876///
877/// The per-draft adapter clones the draft-specific event into the matching
878/// [`AnyClientEvent`] variant before invoking `on_event`.
879pub trait AnyConnectionObserver: Send + Sync {
880    /// Called when a connection event occurs on any draft.
881    fn on_event(&self, event: &AnyClientEvent);
882}
883
884/// A no-op observer that discards all events.
885pub struct NoOpObserver;
886
887impl AnyConnectionObserver for NoOpObserver {
888    fn on_event(&self, _event: &AnyClientEvent) {}
889}