feat: replace BundlePoller polling with SSE streaming#269
feat: replace BundlePoller polling with SSE streaming#269Evalir wants to merge 6 commits intographite-base/269from
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
b4f7193 to
8c472ae
Compare
8c472ae to
58c25a1
Compare
dylanlott
left a comment
There was a problem hiding this comment.
We should test these changes in Parmigiana asap before taking to mainnet.
| BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { | ||
| trace!("Not our slot to subscribe to bundles"); | ||
| } | ||
| _ => warn!(%error, "Failed to open SSE bundle subscription"), |
There was a problem hiding this comment.
should add a metric for SSE failures in the subscribe loop to match the inc_tx_poll_errors() in the fetch and dispatch loop.
There was a problem hiding this comment.
[Claude Code]
Done in 13b25a4 — added inc_sse_subscribe_errors and called it from subscribe's inspect_err. Used a generic name (no bundle_ prefix) to match the existing inc_sse_reconnect_attempts pattern; tx.rs can adopt the same helper later without renaming.
| outbound: &mpsc::UnboundedSender<CachedBundle>, | ||
| backoff: &mut Duration, | ||
| ) -> SseStream { | ||
| tokio::select! { |
There was a problem hiding this comment.
should add a metric like inc_tx_sse_reconnects() as well
There was a problem hiding this comment.
[Claude Code]
Done in 13b25a4 — BundlePoller::reconnect now increments the existing inc_sse_reconnect_attempts counter once per attempt, matching how TxPoller::reconnect uses it. Also generalized the metric's description from "SSE transaction stream reconnect attempts" to "SSE stream reconnect attempts" since both pollers share it.
Fraser999
left a comment
There was a problem hiding this comment.
We could use actual integration tests for this stream and the tx stream (I think the existing one is more like a connectivity smoke test). Not blocking, but worth a follow-up ticket?
Addresses dylanlott + Fraser999 comments on the BundlePoller SSE
PR:
- Wrap task_future with #[instrument], adding url + block_number
fields to the parent span; record block_number on every env
change so tracing reflects current state. (dylan)
- Guard the task_future select loop with outbound.is_closed() so a
dropped consumer breaks the loop instead of letting reconnect
retry forever. (dylan)
- Subscribe now returns Option<SseStream>; reconnect loops
internally with backoff until it gets a real stream or the
outbound closes (returning Option). Kills the misleading double
warn ("Failed to open SSE bundle subscription" + "stream
ended") on subscribe failure. (Fraser/Claude)
- Add inc_sse_subscribe_errors metric, called from subscribe's
error path. Generic name (not bundle-specific) since the existing
inc_sse_reconnect_attempts is also generic. (dylan)
- Increment inc_sse_reconnect_attempts in BundlePoller::reconnect,
matching tx.rs. (dylan)
- Bump "Block env changed" log from trace to debug, matching the
agreed change on tx.rs. (Fraser)
- Generalize SSE_RECONNECT_ATTEMPTS_HELP from "transaction stream"
to just "SSE stream" since the metric is shared.
Defers (per plan): renaming "poll" metrics to "fetch" — that's a
cross-cutting metrics rename that should ride in its own PR.
…rument Addresses Fraser's review on PR #269: - Drop the #[instrument] attribute from BundlePoller::task_future. Long-lived tasks shouldn't keep a single span open forever, and the Span::current() lookup inside record_block_number was flaky depending on the runtime log level. - Drop the record_block_number helper. Per-block context is already carried by the BlockConstruction span attached to each SimEnv (see EnvTask, env.rs:294), which already populates sim.ru.number, sim.host.number, sim.slot, etc. — the field names Fraser pointed to. - In the env-change branch of the select loop, capture env.clone_span() and use .instrument() on the inline async block that runs the refetch. This mirrors how SubmitTask::task_future picks up SimResult::clone_span() per work item, and how CacheTask::task_future enters env.span() for its sync work.
Mirrors the TxPoller SSE change: subscribe to /bundles/feed via BuilderTxCache::subscribe_bundles for real-time delivery of new bundles, with an initial full_fetch on startup/block-env change and exponential backoff reconnect on error or stream end. Drops the 1s polling loop.
Mirrors PR #259's 9bc0ff4 split applied to BundlePoller. Restore check_bundle_cache as a private pure-fetch helper returning Result<Vec<CachedBundle>, _>, and rename full_fetch to fetch_and_forward — its name now matches what it does (fetch + forward to the outbound channel). Use let-else over the fetch result to drop a level of indentation.
Addresses dylanlott + Fraser999 comments on the BundlePoller SSE
PR:
- Wrap task_future with #[instrument], adding url + block_number
fields to the parent span; record block_number on every env
change so tracing reflects current state. (dylan)
- Guard the task_future select loop with outbound.is_closed() so a
dropped consumer breaks the loop instead of letting reconnect
retry forever. (dylan)
- Subscribe now returns Option<SseStream>; reconnect loops
internally with backoff until it gets a real stream or the
outbound closes (returning Option). Kills the misleading double
warn ("Failed to open SSE bundle subscription" + "stream
ended") on subscribe failure. (Fraser/Claude)
- Add inc_sse_subscribe_errors metric, called from subscribe's
error path. Generic name (not bundle-specific) since the existing
inc_sse_reconnect_attempts is also generic. (dylan)
- Increment inc_sse_reconnect_attempts in BundlePoller::reconnect,
matching tx.rs. (dylan)
- Bump "Block env changed" log from trace to debug, matching the
agreed change on tx.rs. (Fraser)
- Generalize SSE_RECONNECT_ATTEMPTS_HELP from "transaction stream"
to just "SSE stream" since the metric is shared.
Defers (per plan): renaming "poll" metrics to "fetch" — that's a
cross-cutting metrics rename that should ride in its own PR.
- Collapse the duplicated reconnect match arms in handle_sse_item into let-else, matching the project's terse Option/Result style. - Promote record_block_number from a free fn to a &self method, shrinking the call sites and using SimEnv::rollup_block_number() instead of inlining the rollup_env().number.to::<u64>() chain. - Drop a stray WHAT-comment on reconnect's loop tail.
…rument Addresses Fraser's review on PR #269: - Drop the #[instrument] attribute from BundlePoller::task_future. Long-lived tasks shouldn't keep a single span open forever, and the Span::current() lookup inside record_block_number was flaky depending on the runtime log level. - Drop the record_block_number helper. Per-block context is already carried by the BlockConstruction span attached to each SimEnv (see EnvTask, env.rs:294), which already populates sim.ru.number, sim.host.number, sim.slot, etc. — the field names Fraser pointed to. - In the env-change branch of the select loop, capture env.clone_span() and use .instrument() on the inline async block that runs the refetch. This mirrors how SubmitTask::task_future picks up SimResult::clone_span() per work item, and how CacheTask::task_future enters env.span() for its sync work.
9bc0ff4 to
8cbd998
Compare
1cebe78 to
fa91a05
Compare

Description
Mirrors #259 (tx-poller SSE) for bundles. Replaces the 1s polling loop in
BundlePollerwith an SSE subscription to/bundles/feedviaBuilderTxCache::subscribe_bundles(newly exposed by the SDK, matched by the tx-pool-webservice endpoint).The structure matches
TxPollerexactly:full_fetchpaginates throughstream_bundlesto seed the cache on startup.subscribeopens the SSE stream and yieldsCachedBundles in real time.NotOurSlotis logged at trace level (expected when the builder is not slot-permissioned) in both the full-fetch and subscribe paths — avoids spurious warn-level noise.The public
check_bundle_cache()wrapper is dropped; the integration test now usesBuilderTxCache::stream_bundlesdirectly, matching the style oftx_poller_test.rs.Related Issue
Stacked on #259.
Testing
make fmtpassesmake clippypassesmake testpassescargo doc --no-depspasses with-D warningscargo test --features test-utils --no-runbuilds all integration tests