Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/utils/block_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Host chain block watcher that subscribes to new blocks and tracks the
//! current host block number.
//! Rollup chain block watcher that subscribes to new blocks and tracks the
//! current rollup block number.

use alloy::{
network::Ethereum,
Expand All @@ -12,33 +12,33 @@ use tokio::{
};
use tracing::{debug, error, trace};

/// Host chain block watcher that subscribes to new blocks and broadcasts
/// Rollup chain block watcher that subscribes to new blocks and broadcasts
/// updates via a watch channel.
#[derive(Debug)]
pub struct BlockWatcher {
/// Watch channel responsible for broadcasting block number updates.
block_number: watch::Sender<u64>,

/// Host chain provider.
host_provider: RootProvider<Ethereum>,
/// Rollup chain provider.
rollup_provider: RootProvider<Ethereum>,
}

impl BlockWatcher {
/// Creates a new [`BlockWatcher`] with the given provider and initial
/// block number.
pub fn new(host_provider: RootProvider<Ethereum>, initial: u64) -> Self {
pub fn new(rollup_provider: RootProvider<Ethereum>, initial: u64) -> Self {
Self {
block_number: watch::channel(initial).0,
host_provider,
rollup_provider,
}
}

/// Creates a new [`BlockWatcher`], fetching the current block number first.
pub async fn with_current_block(
host_provider: RootProvider<Ethereum>,
rollup_provider: RootProvider<Ethereum>,
) -> Result<Self, TransportError> {
let block_number = host_provider.get_block_number().await?;
Ok(Self::new(host_provider, block_number))
let block_number = rollup_provider.get_block_number().await?;
Ok(Self::new(rollup_provider, block_number))
}

/// Subscribe to block number updates.
Expand All @@ -52,22 +52,22 @@ impl BlockWatcher {
}

async fn task_future(self) {
let mut sub = match self.host_provider.subscribe_blocks().await {
let mut sub = match self.rollup_provider.subscribe_blocks().await {
Ok(sub) => sub,
Err(error) => {
error!(%error);
return;
}
};

debug!("subscribed to host chain blocks");
debug!("subscribed to rollup chain blocks");

loop {
match sub.recv().await {
Ok(header) => {
let block_number = header.number;
self.block_number.send_replace(block_number);
trace!(block_number, "updated host block number");
trace!(block_number, "updated rollup block number");
}
Err(RecvError::Lagged(missed)) => {
debug!(%missed, "block subscription lagged");
Expand Down