Skip to content

moq-lite/moq-relay: hop-based clustering#1322

Open
kixelated wants to merge 19 commits intomainfrom
hops-port
Open

moq-lite/moq-relay: hop-based clustering#1322
kixelated wants to merge 19 commits intomainfrom
hops-port

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

Summary

Ports the hop-based clustering design from origin/dev (#1082 + #1152) onto main. Replaces the three-tier primary/secondary/combined origin and the cluster: bool token flag with a single OriginProducer per relay tagged with a stable OriginId, and carries the hop chain on every Broadcast so loops are refused and the shortest path wins.

moq-lite

  • New OriginId type — non-zero 62-bit varint, encoded as u64 on the wire.
  • Broadcast now carries hops: Vec<OriginId>; BroadcastProducer / BroadcastConsumer / BroadcastDynamic expose it via a pub info: Broadcast field plus a Deref<Target = Broadcast>. Backwards-compat: Broadcast::produce() still works and a new produce_with_info is added.
  • Lite04 Announce changes from Vec<u64> to Vec<OriginId>. Lite03 still sends count-only, decoded as UNKNOWN placeholders. MAX_HOPS tightened from 256 to 32.
  • OriginProducer gains id, with_id, id(); OriginConsumer tracks the origin id so the wire publisher can append it to outbound hops.
  • OriginProducer::publish_broadcast refuses broadcasts whose hop chain already contains our id (loop detection) and prefers the shortest-hop entry as active; backups promote to active by shortest-hop on close.
  • lite::publisher appends origin.origin_id() to the hop list before each outbound Announce::Active; lite::subscriber attaches the received hops to the new BroadcastProducer.

moq-relay

  • cluster.rs collapses to a single origin: OriginProducer initialised with_id from config. primary/secondary/combined are gone; the special internal/origins/* registration dance is gone.
  • ClusterConfig replaces --cluster-root/--cluster-node/--cluster-prefix with --cluster-connect (repeat or comma-separated) for a full mesh, plus an optional --cluster-origin-id (env MOQ_CLUSTER_ORIGIN_ID) for deterministic IDs in tests/logs.
  • auth.rs keeps main's HTTP/mTLS additions; only the claims.cluster-driven routing is dropped (#[allow(deprecated)] on the one call site that unavoidably reads it for back-compat). AuthToken.register is still used for mTLS SAN validation in connection.rs.
  • connection.rs, websocket.rs, main.rs updated for the single-origin API.

moq-token

  • Claims::cluster is now #[deprecated]. Existing signed tokens still parse; the flag no longer affects routing. Test modules and the basic example opt into #[allow(deprecated)] so -D warnings still passes.

js/lite

  • Wire MAX_HOPS tightened to 32 to match Rust.
  • Publisher generates a random 53-bit non-zero originId (via crypto.getRandomValues) in its constructor and appends it to the hops list on every outbound Announce::Active. Ended announces still go out with empty hops (peer matches on path).

demo/relay

  • leaf0.toml / leaf1.toml / root.toml switched to the mesh connect = [...] format with pinned per-node origin_ids (1 / 10 / 11) for readable logs in dev.

Test plan

  • cargo test -p moq-lite -p moq-token -p moq-relay — all pass (257 + 100 + relay lib tests). Adjusted test_duplicate expectation: on equal hop lengths, the newer broadcast wins (was: always replace).
  • cargo check --workspace --all-targets — clean.
  • cargo clippy --workspace --all-targets -- -D warnings — clean.
  • bun run --filter='@moq/lite' check + bun biome check js/lite — clean.
  • Local smoke test against demo/relay/{root,leaf0,leaf1}.toml: publish from a client on leaf0, subscribe from a client on leaf1, confirm the broadcast arrives via the mesh and that a restart of one node triggers re-routing (not yet run).
  • Interop: browser @moq/lite publisher against the refactored relay — protocol is wire-compatible but the JS origin ID plumbing is new.

Notes

  • cargo-semver-checks will flag this as a breaking change on moq-lite and moq-relay. release-plz is expected to pick up the right version bumps automatically; no manual version edits in this PR.
  • Browser clients aren't full mesh peers — they only publish their own broadcasts and don't forward, so JS doesn't need the full publish_broadcast/loop-detection logic. A per-connection origin ID is enough for the relay to tag and deduplicate incoming announcements.

🤖 Generated with Claude Code

kixelated and others added 8 commits April 17, 2026 08:31
Introduce `OriginId` (non-zero 62-bit varint) as the hop identifier used
to track the path a broadcast has taken through a cluster. The wire format
changes `hops: Vec<u64>` to `Vec<OriginId>` and now actually carries per-hop
ids on Lite04+ (Lite03 still sends count-only, with UNKNOWN placeholders
decoded on receipt for forward compatibility).

The `Broadcast` info now carries `hops: Vec<OriginId>` end-to-end so the
publisher can append its `OriginProducer::id()` before re-announcing and the
subscriber can attach received hops to new `BroadcastProducer`s. This is
the foundation for hop-based loop detection and shortest-path preference
in subsequent commits.

This replaces the current behavior where hops were always emitted as an
empty Vec — the current clustering is effectively blind to loops.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Teach `OriginProducer::publish_broadcast` to refuse broadcasts whose
hop chain already contains our own `OriginId` — this is the loop
detection that was missing. When multiple paths to the same broadcast
arrive, the active one is now the one with the fewest hops; the rest
are kept as backups and on active close, the shortest-hop backup is
promoted in place.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous design split broadcasts across `primary`, `secondary`, and
`combined` origins and relied on special `internal/origins/*` registration
broadcasts plus a `cluster: bool` auth flag to prevent self-announce
loops. Loop prevention was brittle and the registration dance duplicated
state.

With hop-based routing now implemented end-to-end in moq-lite, the relay
can collapse all three tiers into a single `OriginProducer` tagged with
this node's `OriginId`. Local clients and remote cluster peers publish
into the same origin; loops are refused because `OriginProducer::publish_broadcast`
rejects any broadcast whose hop chain already contains our id, and the
shortest hop path wins when multiple peers announce the same broadcast.

Cluster topology now comes from `--cluster-connect` (repeat or comma-
separated) — every peer is dialed and kept alive with exponential backoff.
The old `--cluster-root`/`--cluster-node`/`--cluster-prefix` flags are
removed; `--cluster-origin-id` replaces them for deterministic IDs in tests.
Per-session `register` still exists on `AuthParams` for mTLS SAN
validation at accept time, but the cluster no longer looks at it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The `cluster` flag on signed tokens previously steered the relay's
primary/secondary/combined origin routing. Hop-based routing makes this
unnecessary — loops are detected by `OriginId` rather than by marking
the token's holder as a cluster peer. The field is retained so existing
tokens still parse, but is now tagged `#[deprecated]`; call sites that
unavoidably touch it get `#[allow(deprecated)]` for cleanliness.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Reflect the relay's new single-origin hop-routed clustering in the
example TOML. `connect` now takes a list of peer URLs; `node` and
`prefix` are gone. Pin per-node `origin_id`s (1/10/11) so dev logs
and loop-detection diagnostics are easier to follow.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- `js/lite` announce wire drops MAX_HOPS from 256 to 32, matching the
  Rust bound so cross-language interop fails the same way under attack.
- `moq-lite` origin uses `.contains()` directly (clippy::manual_contains).
- `moq-token` test modules and the `basic` example get
  `#[allow(deprecated)]` so `-D warnings` passes while the deprecated
  `Claims::cluster` round-trip is still exercised.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When two announcements land at the same path with equal hop lengths,
use the newer one as active (demote the previous to backup). The
stricter `<` broke `test_duplicate`'s expectation that sequential
publishes to the same path trigger reannouncements; `<=` restores that
behavior without defeating the "shorter hops wins" policy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Generate a 53-bit non-zero origin ID in each `Publisher` constructor and
append it to the `hops` list on every outbound Announce (Active only —
Ended matches on path). This mirrors the Rust side: relays can now run
loop detection against browser publishers the same way they do against
other relays, and the ID is stable for the session so re-announces
don't look like different paths.

`crypto.getRandomValues` picks the bits so the distribution is clean
without relying on `Math.random`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 17, 2026

Walkthrough

The PR converts routing to hop-based OriginId chaining across implementations. It adds a typed OriginId and enforces a MAX_HOPS = 32; Announce hop arrays now use 62-bit IDs (bigint/OriginId) and decoding/encoding was updated. Cluster config and control flow were refactored from root/leaf/registration to a shared OriginProducer with peer-list connect dialing and removed node/registration handling. Publishers/embed origin ids into outbound announces; subscribers and model types carry hop metadata. Token Claims.cluster was deprecated and tests/examples updated. Broadcast construction APIs were adjusted to Broadcast::new().produce().

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: implementing hop-based clustering to replace the three-tier origin model.
Description check ✅ Passed The description comprehensively explains the changes across moq-lite, moq-relay, moq-token, js/lite, and demo/relay, detailing the new OriginId type, Broadcast hop tracking, and API changes.
Docstring Coverage ✅ Passed Docstring coverage is 80.43% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch hops-port
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch hops-port

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
demo/relay/root.toml (1)

17-21: ⚠️ Potential issue | 🟡 Minor

Stale comment references deprecated cluster grant.

The comment still says mTLS clients are granted "publish/subscribe/cluster", but the cluster claim is deprecated in this PR in favor of hop-based routing. Consider updating the wording to reflect the new model (mesh peers, hop chains) to avoid confusing operators reading the example config.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@demo/relay/root.toml` around lines 17 - 21, Update the stale comment block
above tls.root to remove the deprecated "cluster" grant and describe the new
hop-based/mesh peer model: say that mTLS clients trusted by tls.root
(["ca.pem"]) are granted publish/subscribe and mesh-peer access used for
hop-based routing (instead of a `cluster` claim), and clarify that this is how
leaves join without a `cluster.token`; edit the three lines of explanatory text
preceding tls.root accordingly to reference mesh peers/hop chains rather than
"cluster".
rs/moq-lite/src/model/origin.rs (1)

415-439: ⚠️ Potential issue | 🟠 Major

Both publish_broadcast callers in lite/subscriber.rs and ietf/subscriber.rs discard the return value, creating a silent-failure bug when loop detection trips.

Loop detection adds a second reason for publish_broadcast to return false (in addition to permission denial). However, rs/moq-lite/src/lite/subscriber.rs:222-228 and rs/moq-lite/src/ietf/subscriber.rs:415-420 both ignore the return value. When a cluster peer re-advertises our own broadcast back to us and loop detection rejects it, these callers will:

  • Cache a BroadcastProducer entry in the subscriber's local state
  • Spawn a run_broadcast task against an origin that never registered the broadcast
  • Fail to propagate NotFound / cleanup to upstream consumers

Update both callers to handle the false return case—either by logging and aborting the spawned producer, or by not caching/spawning at all when publish_broadcast returns false.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 415 - 439, The callers in
lite/subscriber.rs and ietf/subscriber.rs must check publish_broadcast's boolean
result and avoid silently proceeding when it returns false; update the code that
creates/caches a BroadcastProducer and spawns run_broadcast so that after
calling publish_broadcast(...) you immediately return/log and do not insert the
BroadcastProducer into local state or spawn the run_broadcast task if
publish_broadcast returned false. Specifically, in the code paths that construct
a BroadcastProducer and call run_broadcast, gate those actions on
publish_broadcast(...) == true (or log + abort when false) so you don't cache an
unused BroadcastProducer or start a task against an origin that never registered
the broadcast.
🧹 Nitpick comments (5)
rs/moq-token-cli/src/main.rs (1)

62-64: Consider noting deprecation in the --cluster help text.

Per the PR, Claims.cluster is deprecated and the relay no longer uses it for routing. The flag is preserved for legacy token issuance, but the current help text ("Mark this token as a cluster node.") suggests it still has functional effect. A brief "(deprecated; ignored by relay)" note would better set expectations.

Proposed tweak
-		/// Mark this token as a cluster node.
+		/// Mark this token as a cluster node (deprecated; ignored by relay, retained for legacy tokens).
 		#[arg(long)]
 		cluster: bool,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-token-cli/src/main.rs` around lines 62 - 64, Update the CLI help for
the cluster flag to indicate it's deprecated and ignored by the relay: change
the arg/doc for the cluster field (the struct field named cluster with
#[arg(long)] or its doc comment) to something like "Mark this token as a cluster
node (deprecated; ignored by relay)" so users know Claims.cluster is preserved
only for legacy issuance and has no routing effect in the relay.
rs/moq-lite/src/lite/subscriber.rs (1)

199-207: Minor: consider importing OriginId at the top.

crate::OriginId is used inline in the signature. Adding it to the existing use crate::{...} block would match the style of the other imports (Broadcast, OriginProducer, etc.).

♻️ Proposed import move
 use crate::{
-	AsPath, BandwidthProducer, Broadcast, BroadcastDynamic, Error, Frame, FrameProducer, Group, GroupProducer,
-	OriginProducer, Path, PathOwned, TrackProducer,
+	AsPath, BandwidthProducer, Broadcast, BroadcastDynamic, Error, Frame, FrameProducer, Group, GroupProducer,
+	OriginId, OriginProducer, Path, PathOwned, TrackProducer,
 	coding::{Reader, Stream},
 	lite,
 	model::BroadcastProducer,
 };
-		hops: Vec<crate::OriginId>,
+		hops: Vec<OriginId>,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/subscriber.rs` around lines 199 - 207, The signature of
start_announce uses crate::OriginId inline; import OriginId in the module's
existing use crate::{...} block instead to match module style. Update the
top-level imports to include OriginId alongside Broadcast, OriginProducer, etc.,
and then remove the crate:: prefix from the start_announce parameter type (leave
the function name start_announce and PathOwned, BroadcastProducer identifiers
unchanged).
rs/moq-lite/src/model/broadcast.rs (1)

72-89: Optional: pub info plus Deref<Target = Broadcast> provides two paths to the same data.

Callers can reach hops via either producer.info.hops or producer.hops (auto-deref). If info is public to allow mutation (e.g., publisher appending its origin_id), consider making the field private and adding a small accessor like info_mut() — that preserves the mutation path while keeping a single read path through Deref. Not blocking.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/broadcast.rs` around lines 72 - 89, BroadcastProducer
exposes its inner Broadcast both via a public field info and via
Deref<Target=Broadcast>, creating two different access paths; make info private
(remove pub) and add a minimal mutable accessor like info_mut(&mut self) -> &mut
Broadcast so callers can still mutate hops (e.g., append origin_id) while
preserving a single read path through Deref (update struct field visibility and
add the info_mut method on BroadcastProducer).
rs/moq-relay/src/cluster.rs (1)

95-123: No dedup on config.connect.

If the same peer appears twice in the config (typo, env + config file merge, etc.), you'll spin up two parallel dial loops to the same destination. Consider dedup'ing before spawning:

♻️ Suggested dedup
-        let mut tasks = tokio::task::JoinSet::new();
-        for peer in &self.config.connect {
+        let mut tasks = tokio::task::JoinSet::new();
+        let mut seen = std::collections::HashSet::new();
+        for peer in &self.config.connect {
+            if !seen.insert(peer.as_str()) {
+                tracing::warn!(%peer, "duplicate cluster peer; skipping");
+                continue;
+            }
             let this = self.clone();
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-relay/src/cluster.rs` around lines 95 - 123, The run method currently
spawns a tasks.spawn per entry in self.config.connect allowing duplicate peers
to create duplicate dial loops; before the for loop in run deduplicate the peer
list (e.g., collect self.config.connect into a HashSet or otherwise unique
iterator) and iterate over the unique peers so you only call tasks.spawn and
this.run_remote(&peer, token) once per distinct peer; update references around
the for peer in &self.config.connect loop and the spawned closure that calls
run_remote accordingly.
rs/moq-lite/src/lite/announce.rs (1)

81-98: Minor: MAX_HOPS check fires for versions that don't encode hops.

The hops.len() > MAX_HOPS guard runs before the match version, so Lite01/Lite02 (which drop hops on the floor) will still return EncodeError::TooMany. Harmless because any state with >32 hops is already a bug, but you could move the check under the arms that actually serialize to make the failure mode version-aware.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/announce.rs` around lines 81 - 98, In encode_hops, the
MAX_HOPS guard runs even for versions that ignore hops (Lite01/Lite02); move the
hops.len() > MAX_HOPS check inside the match arms that actually serialize hops
(i.e., at least inside the arm that loops over and encodes each OriginId, the
default arm), and only perform any length-related encoding for versions that
write the length (e.g., keep the (hops.len() as u64).encode(...) in Lite03 if
desired but do not enforce MAX_HOPS there unless you intentionally want that
check for Lite03); update encode_hops accordingly so Version::Lite01 |
Version::Lite02 skip the check and serialization entirely while the arm that
encodes ids enforces MAX_HOPS before encoding.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-lite/src/model/origin_id.rs`:
- Around line 17-32: OriginId::UNKNOWN = 0 violates the documented non-zero
invariant and breaks round-trip decoding; either make UNKNOWN internal-only or
accept 0 on decode. Update the OriginId implementation and encoding/decoding
paths so they agree: either (A) change the public constant by making pub const
UNKNOWN private/internal (remove or mark non-public) and ensure no Encode path
emits 0 for real hops (adjust functions that produce Lite03 placeholders), or
(B) modify the TryFrom/Decode path to treat wire value 0 as OriginId::UNKNOWN
(map 0 → OriginId::UNKNOWN) while keeping the non-zero invariant for all other
values; also update documentation to state whether UNKNOWN is ever encoded and
for which versions (reference OriginId::UNKNOWN, OriginId::random, the
Encode/Decode or TryFrom implementations, and Lite03/Lite04 usage).

In `@rs/moq-lite/src/model/origin.rs`:
- Around line 146-158: The backup vector is growing with duplicate broadcast
entries; in the publish path (the method handling existing = &mut
self.broadcast, i.e., publish_broadcast logic) check for duplicates before
pushing: when replacing existing.active, only push the old into existing.backup
if !old.is_clone(&existing.active) and it is not equal to any entry already in
existing.backup; similarly when considering pushing broadcast into
existing.backup, skip the push if broadcast.is_clone(&existing.active) or it
matches any entry in existing.backup. Implement a small helper (e.g.,
broadcast_equals_in_list or use broadcast.is_clone comparisons) to dedupe both
the active->backup push and the backup.push(broadcast.clone()) path.

In `@rs/moq-relay/src/cluster.rs`:
- Around line 132-146: The loop currently resets backoff to 1s whenever
run_remote_once(&url).await returns Ok(()) even if the session immediately
closed; change this so short-lived successful sessions don't reset the backoff:
measure the session lifetime (e.g., record Instant::now() before/after
session.closed().await inside run_remote_once or return a Result with duration)
and in the loop only set backoff = 1s when the session duration >= a
MIN_SESSION_DURATION (or conversely treat short-lived Ok as Err to trigger
backoff doubling); update references to run_remote_once, backoff, max_backoff
and session.closed().await accordingly so rapid graceful closes increase backoff
instead of causing a 1s reconnect loop.

---

Outside diff comments:
In `@demo/relay/root.toml`:
- Around line 17-21: Update the stale comment block above tls.root to remove the
deprecated "cluster" grant and describe the new hop-based/mesh peer model: say
that mTLS clients trusted by tls.root (["ca.pem"]) are granted publish/subscribe
and mesh-peer access used for hop-based routing (instead of a `cluster` claim),
and clarify that this is how leaves join without a `cluster.token`; edit the
three lines of explanatory text preceding tls.root accordingly to reference mesh
peers/hop chains rather than "cluster".

In `@rs/moq-lite/src/model/origin.rs`:
- Around line 415-439: The callers in lite/subscriber.rs and ietf/subscriber.rs
must check publish_broadcast's boolean result and avoid silently proceeding when
it returns false; update the code that creates/caches a BroadcastProducer and
spawns run_broadcast so that after calling publish_broadcast(...) you
immediately return/log and do not insert the BroadcastProducer into local state
or spawn the run_broadcast task if publish_broadcast returned false.
Specifically, in the code paths that construct a BroadcastProducer and call
run_broadcast, gate those actions on publish_broadcast(...) == true (or log +
abort when false) so you don't cache an unused BroadcastProducer or start a task
against an origin that never registered the broadcast.

---

Nitpick comments:
In `@rs/moq-lite/src/lite/announce.rs`:
- Around line 81-98: In encode_hops, the MAX_HOPS guard runs even for versions
that ignore hops (Lite01/Lite02); move the hops.len() > MAX_HOPS check inside
the match arms that actually serialize hops (i.e., at least inside the arm that
loops over and encodes each OriginId, the default arm), and only perform any
length-related encoding for versions that write the length (e.g., keep the
(hops.len() as u64).encode(...) in Lite03 if desired but do not enforce MAX_HOPS
there unless you intentionally want that check for Lite03); update encode_hops
accordingly so Version::Lite01 | Version::Lite02 skip the check and
serialization entirely while the arm that encodes ids enforces MAX_HOPS before
encoding.

In `@rs/moq-lite/src/lite/subscriber.rs`:
- Around line 199-207: The signature of start_announce uses crate::OriginId
inline; import OriginId in the module's existing use crate::{...} block instead
to match module style. Update the top-level imports to include OriginId
alongside Broadcast, OriginProducer, etc., and then remove the crate:: prefix
from the start_announce parameter type (leave the function name start_announce
and PathOwned, BroadcastProducer identifiers unchanged).

In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 72-89: BroadcastProducer exposes its inner Broadcast both via a
public field info and via Deref<Target=Broadcast>, creating two different access
paths; make info private (remove pub) and add a minimal mutable accessor like
info_mut(&mut self) -> &mut Broadcast so callers can still mutate hops (e.g.,
append origin_id) while preserving a single read path through Deref (update
struct field visibility and add the info_mut method on BroadcastProducer).

In `@rs/moq-relay/src/cluster.rs`:
- Around line 95-123: The run method currently spawns a tasks.spawn per entry in
self.config.connect allowing duplicate peers to create duplicate dial loops;
before the for loop in run deduplicate the peer list (e.g., collect
self.config.connect into a HashSet or otherwise unique iterator) and iterate
over the unique peers so you only call tasks.spawn and this.run_remote(&peer,
token) once per distinct peer; update references around the for peer in
&self.config.connect loop and the spawned closure that calls run_remote
accordingly.

In `@rs/moq-token-cli/src/main.rs`:
- Around line 62-64: Update the CLI help for the cluster flag to indicate it's
deprecated and ignored by the relay: change the arg/doc for the cluster field
(the struct field named cluster with #[arg(long)] or its doc comment) to
something like "Mark this token as a cluster node (deprecated; ignored by
relay)" so users know Claims.cluster is preserved only for legacy issuance and
has no routing effect in the relay.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 94fa1cf2-e347-4d2f-a0a8-88ba74278c40

📥 Commits

Reviewing files that changed from the base of the PR and between a2b8a46 and d09ebc6.

📒 Files selected for processing (23)
  • demo/relay/leaf0.toml
  • demo/relay/leaf1.toml
  • demo/relay/root.toml
  • js/lite/src/lite/announce.ts
  • js/lite/src/lite/origin-id.ts
  • js/lite/src/lite/publisher.ts
  • rs/moq-lite/src/lite/announce.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/mod.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-lite/src/model/origin_id.rs
  • rs/moq-relay/src/auth.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/connection.rs
  • rs/moq-relay/src/main.rs
  • rs/moq-relay/src/websocket.rs
  • rs/moq-token-cli/src/main.rs
  • rs/moq-token/examples/basic.rs
  • rs/moq-token/src/claims.rs
  • rs/moq-token/src/key.rs
  • rs/moq-token/src/set.rs
💤 Files with no reviewable changes (2)
  • rs/moq-relay/src/websocket.rs
  • rs/moq-relay/src/connection.rs

Comment thread rs/moq-lite/src/model/origin_id.rs
Comment on lines 146 to +158
} else if let Some(existing) = &mut self.broadcast {
// This node is a leaf with an existing broadcast.
let old = existing.active.clone();
existing.active = broadcast.clone();
existing.backup.push(old);

self.notify.lock().reannounce(full, broadcast);
// This node is a leaf with an existing broadcast. Prefer the shorter or equal hop path;
// on ties, the newer broadcast wins, since the previous one may be about to close.
if broadcast.info.hops.len() <= existing.active.info.hops.len() {
let old = existing.active.clone();
existing.active = broadcast.clone();
existing.backup.push(old);

self.notify.lock().reannounce(full, broadcast);
} else {
// Longer path: keep as a backup in case the active one drops.
existing.backup.push(broadcast.clone());
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Backup list grows unbounded on repeated publishes of the same path.

Every publish_broadcast for an existing path pushes into existing.backup without deduping against active/backup (even when broadcast.is_clone(&active)). In a full-mesh scenario the same broadcast can be re-published many times as peers re-announce; backup will accumulate duplicates until each source closes. remove will correctly drain them one-per-close, so this is a memory/latency concern rather than a correctness bug — but worth a dedup check here for mesh scale.

♻️ Suggested dedup
 } else if let Some(existing) = &mut self.broadcast {
+    if existing.active.is_clone(&broadcast) || existing.backup.iter().any(|b| b.is_clone(&broadcast)) {
+        return;
+    }
     if broadcast.info.hops.len() <= existing.active.info.hops.len() {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} else if let Some(existing) = &mut self.broadcast {
// This node is a leaf with an existing broadcast.
let old = existing.active.clone();
existing.active = broadcast.clone();
existing.backup.push(old);
self.notify.lock().reannounce(full, broadcast);
// This node is a leaf with an existing broadcast. Prefer the shorter or equal hop path;
// on ties, the newer broadcast wins, since the previous one may be about to close.
if broadcast.info.hops.len() <= existing.active.info.hops.len() {
let old = existing.active.clone();
existing.active = broadcast.clone();
existing.backup.push(old);
self.notify.lock().reannounce(full, broadcast);
} else {
// Longer path: keep as a backup in case the active one drops.
existing.backup.push(broadcast.clone());
}
} else if let Some(existing) = &mut self.broadcast {
if existing.active.is_clone(&broadcast) || existing.backup.iter().any(|b| b.is_clone(&broadcast)) {
return;
}
// This node is a leaf with an existing broadcast. Prefer the shorter or equal hop path;
// on ties, the newer broadcast wins, since the previous one may be about to close.
if broadcast.info.hops.len() <= existing.active.info.hops.len() {
let old = existing.active.clone();
existing.active = broadcast.clone();
existing.backup.push(old);
self.notify.lock().reannounce(full, broadcast);
} else {
// Longer path: keep as a backup in case the active one drops.
existing.backup.push(broadcast.clone());
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 146 - 158, The backup vector is
growing with duplicate broadcast entries; in the publish path (the method
handling existing = &mut self.broadcast, i.e., publish_broadcast logic) check
for duplicates before pushing: when replacing existing.active, only push the old
into existing.backup if !old.is_clone(&existing.active) and it is not equal to
any entry already in existing.backup; similarly when considering pushing
broadcast into existing.backup, skip the push if
broadcast.is_clone(&existing.active) or it matches any entry in existing.backup.
Implement a small helper (e.g., broadcast_equals_in_list or use
broadcast.is_clone comparisons) to dedupe both the active->backup push and the
backup.push(broadcast.clone()) path.

Comment on lines +132 to 146
let mut backoff = tokio::time::Duration::from_secs(1);
let max_backoff = tokio::time::Duration::from_secs(300);

match res {
Ok(()) => backoff = 1,
loop {
match self.run_remote_once(&url).await {
Ok(()) => backoff = tokio::time::Duration::from_secs(1),
Err(err) => {
backoff *= 2;
tracing::error!(%err, "remote error");
tracing::warn!(%err, "cluster peer error; will retry");
backoff = (backoff * 2).min(max_backoff);
}
}

let timeout = tokio::time::Duration::from_secs(backoff);
if timeout > tokio::time::Duration::from_secs(300) {
// 5 minutes of backoff is enough, just give up.
anyhow::bail!("remote connection keep failing, giving up");
}

tokio::time::sleep(timeout).await;
tokio::time::sleep(backoff).await;
}

Ok(())
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Backoff resets to 1 s on clean close — risk of tight reconnect loop.

run_remote_once returns Ok(()) whenever session.closed().await completes successfully (peer gracefully closed). The loop then sleeps only 1 s before redialing. If a peer is, say, restarting rapidly, rejecting after handshake with a clean close, or behind a load balancer that churns connections, we'll hammer it at ~1 Hz forever while still logging at info level for each dial.

Consider treating "connected then closed" the same as a transient error for backoff purposes, or only resetting backoff after the session has been alive for some minimum duration.

♻️ Suggested adjustment
-        loop {
-            match self.run_remote_once(&url).await {
-                Ok(()) => backoff = tokio::time::Duration::from_secs(1),
-                Err(err) => {
-                    tracing::warn!(%err, "cluster peer error; will retry");
-                    backoff = (backoff * 2).min(max_backoff);
-                }
-            }
-
-            tokio::time::sleep(backoff).await;
-        }
+        loop {
+            let started = tokio::time::Instant::now();
+            match self.run_remote_once(&url).await {
+                Ok(()) if started.elapsed() >= tokio::time::Duration::from_secs(30) => {
+                    backoff = tokio::time::Duration::from_secs(1);
+                }
+                Ok(()) => {
+                    tracing::debug!("cluster peer closed quickly; backing off");
+                    backoff = (backoff * 2).min(max_backoff);
+                }
+                Err(err) => {
+                    tracing::warn!(%err, "cluster peer error; will retry");
+                    backoff = (backoff * 2).min(max_backoff);
+                }
+            }
+            tokio::time::sleep(backoff).await;
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let mut backoff = tokio::time::Duration::from_secs(1);
let max_backoff = tokio::time::Duration::from_secs(300);
match res {
Ok(()) => backoff = 1,
loop {
match self.run_remote_once(&url).await {
Ok(()) => backoff = tokio::time::Duration::from_secs(1),
Err(err) => {
backoff *= 2;
tracing::error!(%err, "remote error");
tracing::warn!(%err, "cluster peer error; will retry");
backoff = (backoff * 2).min(max_backoff);
}
}
let timeout = tokio::time::Duration::from_secs(backoff);
if timeout > tokio::time::Duration::from_secs(300) {
// 5 minutes of backoff is enough, just give up.
anyhow::bail!("remote connection keep failing, giving up");
}
tokio::time::sleep(timeout).await;
tokio::time::sleep(backoff).await;
}
Ok(())
}
let mut backoff = tokio::time::Duration::from_secs(1);
let max_backoff = tokio::time::Duration::from_secs(300);
loop {
let started = tokio::time::Instant::now();
match self.run_remote_once(&url).await {
Ok(()) if started.elapsed() >= tokio::time::Duration::from_secs(30) => {
backoff = tokio::time::Duration::from_secs(1);
}
Ok(()) => {
tracing::debug!("cluster peer closed quickly; backing off");
backoff = (backoff * 2).min(max_backoff);
}
Err(err) => {
tracing::warn!(%err, "cluster peer error; will retry");
backoff = (backoff * 2).min(max_backoff);
}
}
tokio::time::sleep(backoff).await;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-relay/src/cluster.rs` around lines 132 - 146, The loop currently
resets backoff to 1s whenever run_remote_once(&url).await returns Ok(()) even if
the session immediately closed; change this so short-lived successful sessions
don't reset the backoff: measure the session lifetime (e.g., record
Instant::now() before/after session.closed().await inside run_remote_once or
return a Result with duration) and in the loop only set backoff = 1s when the
session duration >= a MIN_SESSION_DURATION (or conversely treat short-lived Ok
as Err to trigger backoff doubling); update references to run_remote_once,
backoff, max_backoff and session.closed().await accordingly so rapid graceful
closes increase backoff instead of causing a 1s reconnect loop.

# Conflicts:
#	rs/moq-lite/src/model/origin.rs
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
rs/moq-lite/src/model/origin.rs (1)

410-414: ⚠️ Potential issue | 🟡 Minor

Update the public docs for the new active/backup policy.

These docs still say the older broadcast remains active and FIFO promotion is used, but the implementation now replaces active on shorter/equal hop paths and promotes the shortest-hop backup.

📝 Suggested doc update
-	/// If there is already a broadcast with the same path, then the older broadcast remains active
-	/// and the new one is queued as a backup (no reannounce is triggered).
-	/// When the active broadcast closes, the oldest queued backup is promoted and reannounced.
+	/// If there is already a broadcast with the same path, the shortest-hop broadcast is active.
+	/// Shorter or equal-hop replacements trigger a reannounce; longer paths are queued as backups.
+	/// When the active broadcast closes, the shortest-hop queued backup is promoted and reannounced.
 	/// A queued backup that closes before it is promoted is silently dropped with no announcement.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 410 - 414, Update the public
doc comment that describes broadcast active/backup policy to match the new
behavior: replace the statement that "the older broadcast remains active" and
"FIFO promotion" with the rule that a new broadcast replaces the active one if
its hop path length is shorter or equal, and that when promoting a backup the
shortest-hop backup (not FIFO) is chosen; also retain that queued backups that
close before promotion are silently dropped. Locate and edit the broadcast
policy doc comment in origin.rs (the comment block describing broadcast
activation/queueing/promotion) and adjust wording to explicitly mention
"shorter-or-equal hop path replaces active" and "promote the shortest-hop
backup".
♻️ Duplicate comments (1)
rs/moq-lite/src/model/origin.rs (1)

146-158: ⚠️ Potential issue | 🟡 Minor

Deduplicate active and backup entries before queueing.

This still pushes duplicate clones into backup on repeated publishes of the same path/source, which can grow the backup list and create unnecessary reannounce churn in a mesh.

♻️ Suggested deduplication
 		} else if let Some(existing) = &mut self.broadcast {
+			if existing.active.is_clone(broadcast)
+				|| existing.backup.iter().any(|b| b.is_clone(broadcast))
+			{
+				return;
+			}
+
 			// This node is a leaf with an existing broadcast. Prefer the shorter or equal hop path;
 			// on ties, the newer broadcast wins, since the previous one may be about to close.
 			if broadcast.info.hops.len() <= existing.active.info.hops.len() {
 				let old = existing.active.clone();
 				existing.active = broadcast.clone();
-				existing.backup.push_back(old);
+				if !existing.backup.iter().any(|b| b.is_clone(&old)) {
+					existing.backup.push_back(old);
+				}
 
 				self.notify.lock().reannounce(full, broadcast);
 			} else {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 146 - 158, The backup queue can
accumulate duplicate entries when the same broadcast is published repeatedly;
before pushing into existing.backup (and before swapping into existing.active)
deduplicate by comparing the incoming broadcast with existing.active and entries
in existing.backup (e.g., compare broadcast.info.hops and broadcast.source/id or
implement PartialEq on the Broadcast info) and only push_back if no equivalent
entry already exists; update the branch in the block handling self.broadcast
(references: self.broadcast, existing.active, existing.backup,
broadcast.info.hops, self.notify.reannounce) to perform this check so duplicates
are avoided and unnecessary reannounce churn is prevented.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-lite/src/lite/publisher.rs`:
- Around line 197-203: The announce path can exceed the 32-hop encoder limit
when you push origin.origin_id() onto active.info.hops before encoding; update
the logic in the block handling Some(active) (where hops is cloned and pushed
and lite::Announce::Active is created and passed to stream.writer.encode) to
check the current hop count first (use active.info.hops.len() < 32) and skip
forwarding/encoding if the limit would be exceeded (to avoid
EncodeError::TooMany from encode_hops), or alternatively expose the announce
module's MAX_HOPS and compare against that constant before pushing the new hop.

---

Outside diff comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 410-414: Update the public doc comment that describes broadcast
active/backup policy to match the new behavior: replace the statement that "the
older broadcast remains active" and "FIFO promotion" with the rule that a new
broadcast replaces the active one if its hop path length is shorter or equal,
and that when promoting a backup the shortest-hop backup (not FIFO) is chosen;
also retain that queued backups that close before promotion are silently
dropped. Locate and edit the broadcast policy doc comment in origin.rs (the
comment block describing broadcast activation/queueing/promotion) and adjust
wording to explicitly mention "shorter-or-equal hop path replaces active" and
"promote the shortest-hop backup".

---

Duplicate comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 146-158: The backup queue can accumulate duplicate entries when
the same broadcast is published repeatedly; before pushing into existing.backup
(and before swapping into existing.active) deduplicate by comparing the incoming
broadcast with existing.active and entries in existing.backup (e.g., compare
broadcast.info.hops and broadcast.source/id or implement PartialEq on the
Broadcast info) and only push_back if no equivalent entry already exists; update
the branch in the block handling self.broadcast (references: self.broadcast,
existing.active, existing.backup, broadcast.info.hops, self.notify.reannounce)
to perform this check so duplicates are avoided and unnecessary reannounce churn
is prevented.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4dc73415-5694-4ada-8bbf-57c18bdc8fc9

📥 Commits

Reviewing files that changed from the base of the PR and between d09ebc6 and 384e015.

📒 Files selected for processing (4)
  • js/lite/src/lite/publisher.ts
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-lite/src/model/origin_id.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • js/lite/src/lite/publisher.ts
  • rs/moq-lite/src/model/origin_id.rs

Comment on lines +197 to 203
if let Some(active) = active {
tracing::debug!(broadcast = %origin.absolute(&path), "announce");
let msg = lite::Announce::Active { suffix, hops: Vec::new() };
// Append our origin ID to the hops so the next relay can detect loops.
let mut hops = active.info.hops.clone();
hops.push(origin.origin_id());
let msg = lite::Announce::Active { suffix, hops };
stream.writer.encode(&msg).await?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Expect to find the Rust MAX_HOPS definition and any encode/decode validation for Announce hops.
rg -n -C3 'MAX_HOPS|Announce::Active|hops\.len\(\)|too many hops|hop' rs/moq-lite/src

Repository: moq-dev/moq

Length of output: 16083


Guard before appending past the hop limit.

Line 201 can turn an already-at-limit hop chain into an over-limit announce. The encoder validates that hop count must not exceed 32 (see encode_hops in announce.rs), and will reject any message exceeding this with EncodeError::TooMany, closing the announce stream. Skip forwarding before push to prevent this:

 							if let Some(active) = active {
 								tracing::debug!(broadcast = %origin.absolute(&path), "announce");
 								// Append our origin ID to the hops so the next relay can detect loops.
 								let mut hops = active.info.hops.clone();
+								if hops.len() >= 32 {
+									tracing::debug!(
+										broadcast = %origin.absolute(&path),
+										hops = hops.len(),
+										"skipping announce at max hop depth"
+									);
+									continue;
+								}
 								hops.push(origin.origin_id());
 								let msg = lite::Announce::Active { suffix, hops };

Note: The MAX_HOPS constant is private to the announce module, so the guard uses the numeric limit directly. Alternatively, export MAX_HOPS as public if you prefer to reference it by name.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/publisher.rs` around lines 197 - 203, The announce path
can exceed the 32-hop encoder limit when you push origin.origin_id() onto
active.info.hops before encoding; update the logic in the block handling
Some(active) (where hops is cloned and pushed and lite::Announce::Active is
created and passed to stream.writer.encode) to check the current hop count first
(use active.info.hops.len() < 32) and skip forwarding/encoding if the limit
would be exceeded (to avoid EncodeError::TooMany from encode_hops), or
alternatively expose the announce module's MAX_HOPS and compare against that
constant before pushing the new hop.

kixelated and others added 9 commits April 17, 2026 18:52
Chrome's WebTransport serverCertificateHashes pinning rejects certs
with >14 day validity, so browsers fell back to system trust and saw
QUIC_TLS_CERTIFICATE_UNKNOWN. Regenerate certs on every cluster run
so the window never lapses during development.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The wire format carries OriginIds as u62 varints, but JS read them as
u53, silently losing precision on any value >= 2^53 from a Rust peer.
Switch OriginId and the hops list to bigint end-to-end so the full
62-bit space round-trips correctly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously encode_hops returned EncodeError::TooMany when a broadcast's
hop chain exceeded MAX_HOPS, forcing every caller to worry about the
limit. Truncate to the most recent MAX_HOPS instead: the tail keeps our
own id and the nearest upstream origins, which are the entries that
matter for loop detection. Decode still rejects oversized chains, so
peers can't push us past the limit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The doc comment was merged in from the queue-as-backup strategy (#1319)
and doesn't reflect the hop-aware behavior already implemented on this
branch: a new broadcast replaces the active only when its hop path is
shorter or equal, and on promotion we pick the shortest-hop backup.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cluster config exposed an optional explicit origin_id for stable
dev logs, but a random 62-bit id chosen at startup is enough — we log
it at INFO when the cluster initializes ("cluster initialized
origin_id=..."), so operators can still correlate hops and loop events
without a config knob. Drop the CLI flag, TOML field, and the unused
OriginProducer::with_id helper.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drop the full-mesh wiring (leaf1 dialing both root and the other leaf
directly) so both leaves now only dial root. Forcing broadcasts to
route through the root exercises proper cluster forwarding instead of
letting the mesh shortcut, which mirrors the realistic case where
regional nodes only peer with their upstream.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The helper only exports randomOriginId, so the shorter name fits and
follows the file-per-concept convention used elsewhere in lite/.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously Broadcast::produce() created an empty producer while
produce_with_info(self) carried the hop metadata. Collapse the two:
Broadcast::produce now takes self, so the caller always decides what
metadata to attach. Use Broadcast::new().produce() when there's nothing
to carry forward (the common case in tests and examples).

This only reshapes the Broadcast surface; Broadcast was already
#[non_exhaustive] and carrying the hops field, so the semver boat has
already sailed for this branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
rs/moq-lite/src/model/broadcast.rs (1)

532-573: ⚠️ Potential issue | 🟡 Minor

Pause Tokio time in this sleeping async test.

This test still waits on real time; add tokio::time::pause() at the start so the cleanup sleep advances instantly and deterministically.

🧪 Proposed fix
 	#[tokio::test]
 	async fn requested_unused() {
+		tokio::time::pause();
 		let mut broadcast = Broadcast::new().produce().dynamic();

As per coding guidelines, “In Rust async tests that sleep, call tokio::time::pause() at the start to simulate time instantly.”

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/broadcast.rs` around lines 532 - 573, The test
requested_unused() uses tokio::time::sleep and must pause Tokio time to make the
sleep advance deterministically; add a call to tokio::time::pause() at the start
of the async test (before creating Broadcast::new() / before any awaits) so the
later tokio::time::sleep(std::time::Duration::from_millis(1)).await completes
instantly; ensure you import/use tokio::time::pause() in the test scope.
rs/moq-lite/src/model/origin.rs (1)

811-826: ⚠️ Potential issue | 🟡 Minor

Pause Tokio time before sleeping in this async test.

test_duplicate_reverse uses tokio::time::sleep(...) but does not pause time, so it still depends on wall-clock timing.

🧪 Proposed fix
 	#[tokio::test]
 	async fn test_duplicate_reverse() {
+		tokio::time::pause();
+
 		let origin = Origin::produce();

As per coding guidelines, “In Rust async tests that sleep, call tokio::time::pause() at the start to simulate time instantly.”

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 811 - 826, The
test_duplicate_reverse async test must call tokio::time::pause() before any
tokio::time::sleep(...) to avoid reliance on wall-clock time; update the
test_duplicate_reverse function to invoke tokio::time::pause() at the very
beginning (before publishing/dropping broadcasts and before the
tokio::time::sleep call) so the subsequent sleep is driven by the paused Tokio
clock.
js/lite/src/lite/announce.ts (2)

32-39: ⚠️ Potential issue | 🟡 Minor

Reject oversized hop chains before encoding.

Decode enforces MAX_HOPS, but encode can still emit invalid announces that peers will reject.

Proposed fix
 			case Version.DRAFT_03:
+				if (this.hops.length > MAX_HOPS) {
+					throw new Error(`hop count ${this.hops.length} exceeds maximum ${MAX_HOPS}`);
+				}
 				await w.u53(this.hops.length);
 				break;
 			default:
 				// Lite04+: hop count + individual hop IDs
+				if (this.hops.length > MAX_HOPS) {
+					throw new Error(`hop count ${this.hops.length} exceeds maximum ${MAX_HOPS}`);
+				}
 				await w.u53(this.hops.length);
 				for (const id of this.hops) {
 					await w.u62(id);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/lite/src/lite/announce.ts` around lines 32 - 39, Before writing hop count
in announce encoding, validate this.hops.length against MAX_HOPS and reject
(throw an Error) if it exceeds the limit; add this check where announces are
encoded (the switch handling Version.DRAFT_03 and the default branch that calls
w.u53(this.hops.length) and iterates with w.u62(id)) so you never emit an
invalid oversize hop count to peers. Ensure the check runs before the
w.u53/write and before the loop over this.hops.

61-67: ⚠️ Potential issue | 🟠 Major

Reject zero OriginIds in Lite04+ hop chains.

OriginId is defined as non-zero, but this decoder accepts 0n from the wire for Lite04+ announces. Draft03 placeholders can stay 0n; Lite04+ real hop IDs should be rejected.

Proposed fix
 				const count = await r.u53();
 				if (count > MAX_HOPS) throw new Error(`hop count ${count} exceeds maximum ${MAX_HOPS}`);
 				hops = [];
 				for (let i = 0; i < count; i++) {
-					hops.push(await r.u62());
+					const id = await r.u62();
+					if (id === 0n) throw new Error("origin ID 0 is reserved");
+					hops.push(id);
 				}
 				break;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@js/lite/src/lite/announce.ts` around lines 61 - 67, When decoding Lite04+ hop
chains in lite/announce.ts (the block using r.u53(), r.u62(), MAX_HOPS and
hops), validate each hop ID returned by r.u62() is non-zero (!== 0n) and throw
an error if a zero OriginId is encountered (e.g. "OriginId must be non-zero" or
similar). Keep the existing MAX_HOPS check and loop, but after reading each hop
value enforce the non-zero check so Draft03 placeholder zeros are still allowed
elsewhere but Lite04+ hop IDs are rejected.
♻️ Duplicate comments (2)
rs/moq-lite/src/model/origin.rs (1)

146-158: ⚠️ Potential issue | 🟡 Minor

Deduplicate active/backup entries before enqueueing.

This still allows the same broadcast instance to be pushed into backup on repeated publishes, which can grow the queue and delay cleanup in mesh reannounce churn.

♻️ Proposed dedup
 		} else if let Some(existing) = &mut self.broadcast {
+			if existing.active.is_clone(broadcast)
+				|| existing.backup.iter().any(|backup| backup.is_clone(broadcast))
+			{
+				return;
+			}
+
 			// This node is a leaf with an existing broadcast. Prefer the shorter or equal hop path;
 			// on ties, the newer broadcast wins, since the previous one may be about to close.
 			if broadcast.info.hops.len() <= existing.active.info.hops.len() {
 				let old = existing.active.clone();
 				existing.active = broadcast.clone();
-				existing.backup.push_back(old);
+				if !existing.backup.iter().any(|backup| backup.is_clone(&old)) {
+					existing.backup.push_back(old);
+				}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/origin.rs` around lines 146 - 158, When updating
self.broadcast in the branch handling an existing broadcast, avoid enqueueing
duplicate broadcast entries into existing.backup: before pushing (either the old
active or the incoming broadcast) check that its unique identifier (e.g.
broadcast.info.id or another stable key) is not equal to existing.active.info.id
and not already present in existing.backup; only push when it's not a duplicate.
Apply this check both when replacing existing.active (the old value moved into
backup) and when saving a longer-path broadcast into existing.backup, ensuring
existing.backup does not accumulate repeated references to the same broadcast;
keep calls to self.notify.lock().reannounce(full, broadcast) unchanged.
rs/moq-relay/src/cluster.rs (1)

121-130: ⚠️ Potential issue | 🟡 Minor

Avoid resetting backoff after short clean closes.

Ok(()) from run_remote_once can mean the peer connected and immediately closed cleanly; resetting to 1s can create a persistent reconnect loop against a flapping peer.

♻️ Suggested adjustment
 		loop {
+			let started = tokio::time::Instant::now();
 			match self.run_remote_once(&url).await {
-				Ok(()) => backoff = tokio::time::Duration::from_secs(1),
+				Ok(()) if started.elapsed() >= tokio::time::Duration::from_secs(30) => {
+					backoff = tokio::time::Duration::from_secs(1);
+				}
+				Ok(()) => {
+					tracing::debug!("cluster peer closed quickly; backing off");
+					backoff = (backoff * 2).min(max_backoff);
+				}
 				Err(err) => {
 					tracing::warn!(%err, "cluster peer error; will retry");
 					backoff = (backoff * 2).min(max_backoff);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-relay/src/cluster.rs` around lines 121 - 130, run_remote_once
currently returns Ok(()) even for very short clean closes, and the loop resets
backoff to 1s on any Ok which causes tight reconnect loops; change the contract
so run_remote_once returns an outcome that distinguishes short clean closes vs a
long-lived successful connection (e.g. an enum like RemoteOutcome::ShortClose |
RemoteOutcome::Connected or include connection duration), then in the loop only
reset backoff (set backoff = Duration::from_secs(1)) when the outcome indicates
a sustained connection (Connected); for ShortClose treat it like an error branch
and grow backoff = (backoff * 2).min(max_backoff) so flapping peers back off
instead of immediate reconnects, updating references to run_remote_once,
backoff, and max_backoff accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@doc/rs/env/native.md`:
- Around line 72-74: The docs snippet using moq_lite::Broadcast::new().produce()
and origin.publish_broadcast("", broadcast.consume()) is ambiguous about
broadcast naming: update the text near that example to explicitly state that for
WebTransport (https://) the URL path scopes broadcasts (so empty name relies on
the path), but for raw QUIC schemes (moqt://, moql:// and iroh) there is no HTTP
path layer so you must include the desired path in the publish_broadcast(...)
name; add the suggested three-line paragraph clarifying WebTransport vs raw QUIC
behavior and an example note referencing the Broadcast::new().produce() /
origin.publish_broadcast usage.

In `@rs/moq-lite/src/lite/announce.rs`:
- Around line 37-46: The code casts the decoded hop count with u64::decode(r,
version)? as usize before checking against MAX_HOPS, which can truncate on
32-bit targets; instead, decode into a u64, compare that u64 to MAX_HOPS, then
only if within bounds cast to usize and proceed (affecting the branches that
create vec![OriginId::UNKNOWN; count] and the Lite04+ branch that reads that
many OriginId varints); update the code paths using u64::decode, MAX_HOPS, and
the subsequent cast to perform the u64 comparison first and then cast to usize.

In `@rs/moq-relay/src/cluster.rs`:
- Around line 20-58: Cluster currently always calls Origin::produce() and
ignores any configured stable origin id; add an optional origin id field to
ClusterConfig (e.g., pub origin_id: Option<String> or Option<OriginId> wired to
the CLI/env) and then in Cluster::new use that value to construct the Origin
instead of always calling Origin::produce() — i.e., if
config.origin_id.is_some() create the Origin from that id (via the appropriate
Origin::from_id/with_id/produce_from_id helper) otherwise fall back to
Origin::produce(); ensure you pass the resulting Origin into Cluster and keep
the tracing::info!(origin_id = %origin.id()) line.

---

Outside diff comments:
In `@js/lite/src/lite/announce.ts`:
- Around line 32-39: Before writing hop count in announce encoding, validate
this.hops.length against MAX_HOPS and reject (throw an Error) if it exceeds the
limit; add this check where announces are encoded (the switch handling
Version.DRAFT_03 and the default branch that calls w.u53(this.hops.length) and
iterates with w.u62(id)) so you never emit an invalid oversize hop count to
peers. Ensure the check runs before the w.u53/write and before the loop over
this.hops.
- Around line 61-67: When decoding Lite04+ hop chains in lite/announce.ts (the
block using r.u53(), r.u62(), MAX_HOPS and hops), validate each hop ID returned
by r.u62() is non-zero (!== 0n) and throw an error if a zero OriginId is
encountered (e.g. "OriginId must be non-zero" or similar). Keep the existing
MAX_HOPS check and loop, but after reading each hop value enforce the non-zero
check so Draft03 placeholder zeros are still allowed elsewhere but Lite04+ hop
IDs are rejected.

In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 532-573: The test requested_unused() uses tokio::time::sleep and
must pause Tokio time to make the sleep advance deterministically; add a call to
tokio::time::pause() at the start of the async test (before creating
Broadcast::new() / before any awaits) so the later
tokio::time::sleep(std::time::Duration::from_millis(1)).await completes
instantly; ensure you import/use tokio::time::pause() in the test scope.

In `@rs/moq-lite/src/model/origin.rs`:
- Around line 811-826: The test_duplicate_reverse async test must call
tokio::time::pause() before any tokio::time::sleep(...) to avoid reliance on
wall-clock time; update the test_duplicate_reverse function to invoke
tokio::time::pause() at the very beginning (before publishing/dropping
broadcasts and before the tokio::time::sleep call) so the subsequent sleep is
driven by the paused Tokio clock.

---

Duplicate comments:
In `@rs/moq-lite/src/model/origin.rs`:
- Around line 146-158: When updating self.broadcast in the branch handling an
existing broadcast, avoid enqueueing duplicate broadcast entries into
existing.backup: before pushing (either the old active or the incoming
broadcast) check that its unique identifier (e.g. broadcast.info.id or another
stable key) is not equal to existing.active.info.id and not already present in
existing.backup; only push when it's not a duplicate. Apply this check both when
replacing existing.active (the old value moved into backup) and when saving a
longer-path broadcast into existing.backup, ensuring existing.backup does not
accumulate repeated references to the same broadcast; keep calls to
self.notify.lock().reannounce(full, broadcast) unchanged.

In `@rs/moq-relay/src/cluster.rs`:
- Around line 121-130: run_remote_once currently returns Ok(()) even for very
short clean closes, and the loop resets backoff to 1s on any Ok which causes
tight reconnect loops; change the contract so run_remote_once returns an outcome
that distinguishes short clean closes vs a long-lived successful connection
(e.g. an enum like RemoteOutcome::ShortClose | RemoteOutcome::Connected or
include connection duration), then in the loop only reset backoff (set backoff =
Duration::from_secs(1)) when the outcome indicates a sustained connection
(Connected); for ShortClose treat it like an error branch and grow backoff =
(backoff * 2).min(max_backoff) so flapping peers back off instead of immediate
reconnects, updating references to run_remote_once, backoff, and max_backoff
accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8810b0ef-5cc2-480b-841e-825bddefedb8

📥 Commits

Reviewing files that changed from the base of the PR and between 384e015 and dc89774.

📒 Files selected for processing (21)
  • demo/relay/justfile
  • demo/relay/leaf0.toml
  • demo/relay/leaf1.toml
  • demo/relay/root.toml
  • doc/rs/env/native.md
  • js/lite/src/lite/announce.ts
  • js/lite/src/lite/origin.ts
  • js/lite/src/lite/publisher.ts
  • rs/hang/examples/video.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-gst/src/sink/imp.rs
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-lite/src/lite/announce.rs
  • rs/moq-lite/src/lite/publisher.rs
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
  • rs/moq-lite/src/model/origin.rs
  • rs/moq-mux/src/import/hls.rs
  • rs/moq-native/examples/chat.rs
  • rs/moq-relay/src/cluster.rs
  • rs/moq-relay/src/main.rs
✅ Files skipped from review due to trivial changes (6)
  • rs/moq-lite/src/ietf/subscriber.rs
  • rs/moq-gst/src/sink/imp.rs
  • demo/relay/root.toml
  • rs/moq-lite/src/lite/publisher.rs
  • js/lite/src/lite/origin.ts
  • rs/moq-mux/src/import/hls.rs
🚧 Files skipped from review as they are similar to previous changes (3)
  • demo/relay/leaf1.toml
  • demo/relay/leaf0.toml
  • rs/moq-relay/src/main.rs

Comment thread doc/rs/env/native.md
Comment on lines +72 to 74
let mut broadcast = moq_lite::Broadcast::new().produce();
// ... add catalog and tracks to the broadcast ...
origin.publish_broadcast("", broadcast.consume());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clarify that empty broadcast names only rely on path scoping for WebTransport.

This snippet follows the new API, but the page also advertises moqt:///moql://; for raw QUIC, users need to include the expected prefix in the broadcast name instead of relying on the URL path.

📝 Suggested doc addition
 let mut broadcast = moq_lite::Broadcast::new().produce();
 // ... add catalog and tracks to the broadcast ...
 origin.publish_broadcast("", broadcast.consume());

+For https:// / WebTransport connections, the URL path scopes the broadcast.
+For raw QUIC schemes such as moqt:// or moql://, include the desired path
+in publish_broadcast(...) because there is no HTTP path layer to prepend it.

</details>

Based on learnings, “WebTransport connections automatically prepend the URL path … raw QUIC and iroh connections have no HTTP layer, so there's no way to send a path.”

<!-- suggestion_start -->

<details>
<summary>📝 Committable suggestion</summary>

> ‼️ **IMPORTANT**
> Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

```suggestion
let mut broadcast = moq_lite::Broadcast::new().produce();
// ... add catalog and tracks to the broadcast ...
origin.publish_broadcast("", broadcast.consume());

For `https://` / WebTransport connections, the URL path scopes the broadcast.
For raw QUIC schemes such as `moqt://` or `moql://`, include the desired path
in `publish_broadcast(...)` because there is no HTTP path layer to prepend it.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@doc/rs/env/native.md` around lines 72 - 74, The docs snippet using
moq_lite::Broadcast::new().produce() and origin.publish_broadcast("",
broadcast.consume()) is ambiguous about broadcast naming: update the text near
that example to explicitly state that for WebTransport (https://) the URL path
scopes broadcasts (so empty name relies on the path), but for raw QUIC schemes
(moqt://, moql:// and iroh) there is no HTTP path layer so you must include the
desired path in the publish_broadcast(...) name; add the suggested three-line
paragraph clarifying WebTransport vs raw QUIC behavior and an example note
referencing the Broadcast::new().produce() / origin.publish_broadcast usage.

Comment on lines 37 to +46
let count = u64::decode(r, version)? as usize;
if count > 256 {
if count > MAX_HOPS {
return Err(DecodeError::BoundsExceeded);
}
vec![0; count]
vec![OriginId::UNKNOWN; count]
}
_ => {
// Lite04+: count followed by that many OriginId varints.
let count = u64::decode(r, version)? as usize;
if count > 256 {
if count > MAX_HOPS {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Description: Look for 32-bit/WASM target support where u64 -> usize truncation matters.

rg -n -C3 'wasm32|target_arch\s*=\s*"wasm32"|cdylib|crate-type|wasm-bindgen'

Repository: moq-dev/moq

Length of output: 21312


🏁 Script executed:

# Check moq-lite crate configuration and MAX_HOPS constant
cat rs/moq-lite/Cargo.toml

Repository: moq-dev/moq

Length of output: 843


🏁 Script executed:

# Get more context around the code in question, including MAX_HOPS definition
head -n 100 rs/moq-lite/src/lite/announce.rs | tail -n 70

Repository: moq-dev/moq

Length of output: 2156


🏁 Script executed:

# Search for MAX_HOPS definition
rg -n "MAX_HOPS" rs/moq-lite/src/

Repository: moq-dev/moq

Length of output: 497


Check hop count before narrowing to usize.

u64::decode(...)? as usize can truncate on 32-bit targets before the MAX_HOPS check, allowing oversized counts to slip through. While moq-lite does not currently support 32-bit or WASM targets, this is defensive programming and aligns with safe casting practices. Compare as u64 first, then cast.

Proposed fix
-				let count = u64::decode(r, version)? as usize;
-				if count > MAX_HOPS {
+				let count = u64::decode(r, version)?;
+				if count > MAX_HOPS as u64 {
 					return Err(DecodeError::BoundsExceeded);
 				}
+				let count = count as usize;
 				vec![OriginId::UNKNOWN; count]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/lite/announce.rs` around lines 37 - 46, The code casts the
decoded hop count with u64::decode(r, version)? as usize before checking against
MAX_HOPS, which can truncate on 32-bit targets; instead, decode into a u64,
compare that u64 to MAX_HOPS, then only if within bounds cast to usize and
proceed (affecting the branches that create vec![OriginId::UNKNOWN; count] and
the Lite04+ branch that reads that many OriginId varints); update the code paths
using u64::decode, MAX_HOPS, and the subsequent cast to perform the u64
comparison first and then cast to usize.

Comment on lines 20 to +58
pub struct ClusterConfig {
/// Connect to this hostname in order to discover other nodes.
/// Connect to one or more other cluster nodes. Accepts a comma-separated list on the CLI
/// or repeat the flag; in config files use a TOML array.
#[serde(alias = "connect")]
#[arg(
id = "cluster-root",
long = "cluster-root",
env = "MOQ_CLUSTER_ROOT",
alias = "cluster-connect"
id = "cluster-connect",
long = "cluster-connect",
env = "MOQ_CLUSTER_CONNECT",
value_delimiter = ','
)]
pub root: Option<String>,
#[serde_as(as = "serde_with::OneOrMany<_>")]
pub connect: Vec<String>,

/// Use the token in this file when connecting to other nodes.
#[arg(id = "cluster-token", long = "cluster-token", env = "MOQ_CLUSTER_TOKEN")]
pub token: Option<PathBuf>,

/// Our hostname which we advertise to other nodes.
///
// TODO Remove alias once we've migrated to the new name.
#[serde(alias = "advertise")]
#[arg(
id = "cluster-node",
long = "cluster-node",
env = "MOQ_CLUSTER_NODE",
alias = "cluster-advertise"
)]
pub node: Option<String>,

/// The prefix to use for cluster announcements.
/// Defaults to "internal/origins".
///
/// WARNING: This should not be accessible by users unless authentication is disabled (YOLO).
#[arg(
id = "cluster-prefix",
long = "cluster-prefix",
default_value = "internal/origins",
env = "MOQ_CLUSTER_PREFIX"
)]
pub prefix: String,
}

/// Manages broadcast origins across local and remote relay nodes.
/// A relay cluster built around a single [`OriginProducer`].
///
/// Broadcasts are split into three tiers: [`primary`](Self::primary)
/// holds locally announced broadcasts, [`secondary`](Self::secondary)
/// holds those learned from other cluster nodes, and
/// [`combined`](Self::combined) merges both for serving to end users.
/// Local sessions and remote cluster connections all publish into the same
/// origin. Loop prevention and shortest-path preference come from the
/// hop list carried on each broadcast (see [`moq_lite::Broadcast::hops`]).
#[derive(Clone)]
pub struct Cluster {
config: ClusterConfig,
client: moq_native::Client,

/// Broadcasts announced by local clients (users).
pub primary: OriginProducer,

/// Broadcasts announced by remote servers (cluster).
pub secondary: OriginProducer,

/// Broadcasts announced by local clients and remote servers.
pub combined: OriginProducer,
/// All broadcasts, local and remote. Downstream sessions read from here
/// (filtered by their auth token) and remote dials both read and write here.
pub origin: OriginProducer,
}

impl Cluster {
/// Creates a new cluster with the given configuration and QUIC client.
pub fn new(config: ClusterConfig, client: moq_native::Client) -> Self {
Cluster {
config,
client,
primary: Origin::produce(),
secondary: Origin::produce(),
combined: Origin::produce(),
}
let origin = Origin::produce();
tracing::info!(origin_id = %origin.id(), "cluster initialized");
Cluster { config, client, origin }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wire the configured stable origin ID into the cluster.

The config no longer exposes --cluster-origin-id, and Cluster::new always uses a random OriginId. That loses the stable per-relay identity required for predictable hop chains across restarts and for pinned dev configs.

🛠️ Suggested shape
-use moq_lite::{BroadcastConsumer, Origin, OriginConsumer, OriginProducer};
+use moq_lite::{BroadcastConsumer, Origin, OriginConsumer, OriginId, OriginProducer};
@@
 pub struct ClusterConfig {
@@
 	#[serde_as(as = "serde_with::OneOrMany<_>")]
 	pub connect: Vec<String>,
+
+	/// Stable origin ID for this relay. If omitted, a random ID is generated.
+	#[arg(id = "cluster-origin-id", long = "cluster-origin-id", env = "MOQ_CLUSTER_ORIGIN_ID")]
+	pub origin_id: Option<u64>,
@@
 	pub fn new(config: ClusterConfig, client: moq_native::Client) -> Self {
-		let origin = Origin::produce();
+		let origin = match config.origin_id {
+			Some(id) => OriginProducer::with_id(OriginId::try_from(id).expect("invalid cluster origin id")),
+			None => Origin::produce(),
+		};
 		tracing::info!(origin_id = %origin.id(), "cluster initialized");
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-relay/src/cluster.rs` around lines 20 - 58, Cluster currently always
calls Origin::produce() and ignores any configured stable origin id; add an
optional origin id field to ClusterConfig (e.g., pub origin_id: Option<String>
or Option<OriginId> wired to the CLI/env) and then in Cluster::new use that
value to construct the Origin instead of always calling Origin::produce() —
i.e., if config.origin_id.is_some() create the Origin from that id (via the
appropriate Origin::from_id/with_id/produce_from_id helper) otherwise fall back
to Origin::produce(); ensure you pass the resulting Origin into Cluster and keep
the tracing::info!(origin_id = %origin.id()) line.

Broadcast is an immutable value type like Track/Group/Frame, which are
all unmarked. Any future field addition implies a wire/behavior change
that warrants a major bump anyway, so the non_exhaustive attribute
only buys the narrow ability to add purely informational fields. Drop
it for consistency with the other model types.

With struct-literal construction allowed again, with_hops becomes
dead weight; its one caller in the lite subscriber now uses
Broadcast { hops }.produce() directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@rs/moq-lite/src/model/broadcast.rs`:
- Around line 11-14: Update the stale doc comment on the Broadcast type to
reflect the new API: remove or reword the phrase that says Create via
`Broadcast::produce` to obtain both `BroadcastProducer` and `BroadcastConsumer`
pair; instead state that `Broadcast::produce(self)` returns a
`BroadcastProducer` and that the corresponding `BroadcastConsumer` can be
obtained by calling `BroadcastProducer::consume()`, referencing
`Broadcast::produce`, `BroadcastProducer`, and `BroadcastProducer::consume` so
readers know the new call sequence.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6ac9668f-244b-4928-a875-fe3172cae08f

📥 Commits

Reviewing files that changed from the base of the PR and between dc89774 and 19da7f3.

📒 Files selected for processing (2)
  • rs/moq-lite/src/lite/subscriber.rs
  • rs/moq-lite/src/model/broadcast.rs
✅ Files skipped from review due to trivial changes (1)
  • rs/moq-lite/src/lite/subscriber.rs

Comment on lines 11 to +14
/// A collection of media tracks that can be published and subscribed to.
///
/// Create via [`Broadcast::produce`] to obtain both [`BroadcastProducer`] and [`BroadcastConsumer`] pair.
#[derive(Clone, Default)]
#[derive(Clone, Debug, Default)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Stale doc comment: produce no longer returns a pair.

Broadcast::produce(self) now returns only a BroadcastProducer; the consumer is obtained afterwards via BroadcastProducer::consume(). The wording "obtain both [BroadcastProducer] and [BroadcastConsumer] pair" is misleading under the new API.

📝 Proposed doc fix
 /// A collection of media tracks that can be published and subscribed to.
 ///
-/// Create via [`Broadcast::produce`] to obtain both [`BroadcastProducer`] and [`BroadcastConsumer`] pair.
+/// Construct a `Broadcast` (e.g. via [`Broadcast::new`] or a struct literal to set `hops`)
+/// and call [`Broadcast::produce`] to obtain a [`BroadcastProducer`]; a matching
+/// [`BroadcastConsumer`] can then be created via [`BroadcastProducer::consume`].
 #[derive(Clone, Debug, Default)]
 pub struct Broadcast {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// A collection of media tracks that can be published and subscribed to.
///
/// Create via [`Broadcast::produce`] to obtain both [`BroadcastProducer`] and [`BroadcastConsumer`] pair.
#[derive(Clone, Default)]
#[derive(Clone, Debug, Default)]
/// A collection of media tracks that can be published and subscribed to.
///
/// Construct a `Broadcast` (e.g. via [`Broadcast::new`] or a struct literal to set `hops`)
/// and call [`Broadcast::produce`] to obtain a [`BroadcastProducer`]; a matching
/// [`BroadcastConsumer`] can then be created via [`BroadcastProducer::consume`].
#[derive(Clone, Debug, Default)]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@rs/moq-lite/src/model/broadcast.rs` around lines 11 - 14, Update the stale
doc comment on the Broadcast type to reflect the new API: remove or reword the
phrase that says Create via `Broadcast::produce` to obtain both
`BroadcastProducer` and `BroadcastConsumer` pair; instead state that
`Broadcast::produce(self)` returns a `BroadcastProducer` and that the
corresponding `BroadcastConsumer` can be obtained by calling
`BroadcastProducer::consume()`, referencing `Broadcast::produce`,
`BroadcastProducer`, and `BroadcastProducer::consume` so readers know the new
call sequence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant