Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,040 changes: 1,009 additions & 1,031 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ name = "zenith-builder-example"
path = "bin/builder.rs"

[dependencies]
init4-bin-base = { version = "0.18.0-rc.8", features = ["perms", "aws"] }
init4-bin-base = { version = "0.18.0-rc.10", features = ["perms", "aws", "pylon"] }

signet-constants = { version = "0.16.0-rc.8" }
signet-sim = { version = "0.16.0-rc.8" }
signet-tx-cache = { version = "0.16.0-rc.8" }
signet-types = { version = "0.16.0-rc.8" }
signet-zenith = { version = "0.16.0-rc.8" }
signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.7" }
signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.7" }
signet-constants = { version = "0.16.0-rc.11" }
signet-sim = { version = "0.16.0-rc.11" }
signet-tx-cache = { version = "0.16.0-rc.11" }
signet-types = { version = "0.16.0-rc.11" }
signet-zenith = { version = "0.16.0-rc.11" }
signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.8" }
signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.8" }

trevm = { version = "0.34.0", features = ["concurrent-db", "test-utils"] }

Expand All @@ -44,7 +44,7 @@ alloy = { version = "1.4.0", features = [
"provider-mev-api",
] }

reth-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.10.2" }
reth-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.11.0" }

axum = "0.7.5"
eyre = "0.6.12"
Expand Down
14 changes: 13 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use alloy::{
};
use eyre::Result;
use init4_bin_base::{
perms::{Authenticator, OAuthConfig, SharedToken},
perms::{Authenticator, OAuthConfig, SharedToken, pylon},
utils::{
calc::SlotCalculator,
from_env::FromEnv,
Expand All @@ -25,6 +25,9 @@ use signet_zenith::Zenith;
use std::borrow::Cow;
use tokio::join;

/// Pylon client type for blob sidecar submission.
pub type PylonClient = pylon::PylonClient;

/// Type alias for the provider used to simulate against rollup state.
pub type RuProvider = RootProvider<Ethereum>;

Expand Down Expand Up @@ -177,6 +180,10 @@ pub struct BuilderConfig {

/// The signet system constants.
pub constants: SignetSystemConstants,

/// URL for the Pylon blob server API.
#[from_env(var = "PYLON_URL", desc = "URL for the Pylon blob server API")]
pub pylon_url: url::Url,
}

impl BuilderConfig {
Expand Down Expand Up @@ -309,6 +316,11 @@ impl BuilderConfig {
((gas_limit as u128 * (self.max_host_gas_coefficient.unwrap_or(80) as u128)) / 100u128)
as u64
}

/// Connect to the Pylon blob server.
pub fn connect_pylon(&self) -> PylonClient {
PylonClient::new(self.pylon_url.clone(), self.oauth_token())
}
}

#[cfg(test)]
Expand Down
19 changes: 17 additions & 2 deletions src/tasks/block/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
config::{BuilderConfig, HostProvider, RuProvider},
tasks::env::SimEnv,
};
use alloy::consensus::Header;
use alloy::{consensus::Header, eips::Encodable2718, primitives::Bytes};
use init4_bin_base::{
deps::metrics::{counter, histogram},
utils::calc::SlotCalculator,
Expand Down Expand Up @@ -62,6 +62,21 @@ impl SimResult {
pub fn clone_span(&self) -> Span {
self.sim_env.clone_span()
}

/// Constructs the MEV bundle body from host transactions and the submission transaction.
///
/// Combines all host transactions from the rollup block with the prepared rollup block
/// submission transaction, wrapping each as a non-revertible bundle item.
///
/// The rollup block transaction is always included and placed last in the bundle.
pub fn build_bundle_body(&self, block_tx_bytes: Bytes) -> Vec<Bytes> {
self.block
.host_transactions()
.iter()
.map(|tx| tx.encoded_2718().into())
.chain(std::iter::once(block_tx_bytes))
.collect()
}
}

/// A task that builds blocks based on incoming [`SimEnv`]s and a simulation
Expand Down Expand Up @@ -132,7 +147,7 @@ impl SimulatorTask {
let block_build = BlockBuild::new(
rollup_env,
host_env,
finish_by,
finish_by.into(),
concurrency_limit,
sim_items,
self.config.rollup_block_gas_limit,
Expand Down
23 changes: 12 additions & 11 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use init4_bin_base::perms::tx_cache::BuilderTxCache;
use signet_tx_cache::{TxCacheError, types::TxCacheBundle};
use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError};
use signet_tx_cache::{TxCacheError, types::CachedBundle};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
Expand Down Expand Up @@ -51,26 +51,27 @@ impl BundlePoller {
}

/// Checks the bundle cache for new bundles.
pub async fn check_bundle_cache(&self) -> Result<Vec<TxCacheBundle>, TxCacheError> {
pub async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> {
let res = self.tx_cache.get_bundles(None).await;

match res {
Ok(bundles) => {
Ok(resp) => {
let bundles = resp.into_inner();
trace!(count = ?bundles.bundles.len(), "found bundles");
Ok(bundles.bundles)
}
Err(TxCacheError::NotOurSlot) => {
trace!("Not our slot to fetch bundles");
Err(TxCacheError::NotOurSlot)
}
Err(err) => {
error!(?err, "Failed to fetch bundles from tx-cache");
if matches!(&err, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) {
trace!("Not our slot to fetch bundles");
} else {
error!(?err, "Failed to fetch bundles from tx-cache");
}
Err(err)
}
}
}

async fn task_future(self, outbound: UnboundedSender<TxCacheBundle>) {
async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);

Expand Down Expand Up @@ -100,7 +101,7 @@ impl BundlePoller {
}

/// Spawns a task that sends bundles it finds to its channel sender.
pub fn spawn(self) -> (UnboundedReceiver<TxCacheBundle>, JoinHandle<()>) {
pub fn spawn(self) -> (UnboundedReceiver<CachedBundle>, JoinHandle<()>) {
let (outbound, inbound) = unbounded_channel();

let jh = tokio::spawn(self.task_future(outbound));
Expand Down
6 changes: 3 additions & 3 deletions src/tasks/cache/task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::tasks::env::SimEnv;
use alloy::consensus::{TxEnvelope, transaction::SignerRecoverable};
use signet_sim::SimCache;
use signet_tx_cache::types::TxCacheBundle;
use signet_tx_cache::types::CachedBundle;
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
Expand All @@ -18,7 +18,7 @@ pub struct CacheTask {
/// The channel to receive the block environment.
envs: watch::Receiver<Option<SimEnv>>,
/// The channel to receive the transaction bundles.
bundles: mpsc::UnboundedReceiver<TxCacheBundle>,
bundles: mpsc::UnboundedReceiver<CachedBundle>,
/// The channel to receive the transactions.
txns: mpsc::UnboundedReceiver<TxEnvelope>,
}
Expand All @@ -27,7 +27,7 @@ impl CacheTask {
/// Create a new cache task with the given cache and channels.
pub const fn new(
env: watch::Receiver<Option<SimEnv>>,
bundles: mpsc::UnboundedReceiver<TxCacheBundle>,
bundles: mpsc::UnboundedReceiver<CachedBundle>,
txns: mpsc::UnboundedReceiver<TxEnvelope>,
) -> Self {
Self { envs: env, bundles, txns }
Expand Down
114 changes: 54 additions & 60 deletions src/tasks/submit/flashbots.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
//! Flashbots Task receives simulated blocks from an upstream channel and
//! submits them to the Flashbots relay as bundles.
use crate::{
config::{BuilderConfig, FlashbotsProvider, HostProvider, ZenithInstance},
config::{BuilderConfig, FlashbotsProvider, HostProvider, PylonClient, ZenithInstance},
quincey::Quincey,
tasks::{block::sim::SimResult, submit::SubmitPrep},
};
use alloy::{
consensus::TxEnvelope,
eips::Encodable2718,
primitives::{Bytes, TxHash},
providers::ext::MevApi,
consensus::TxEnvelope, eips::Encodable2718, primitives::TxHash, providers::ext::MevApi,
rpc::types::mev::EthSendBundle,
};
use init4_bin_base::{deps::metrics::counter, utils::signer::LocalOrAws};
Expand All @@ -33,6 +30,8 @@ pub struct FlashbotsTask {
signer: LocalOrAws,
/// Channel for sending hashes of outbound transactions.
outbound: mpsc::UnboundedSender<TxHash>,
/// Pylon client for blob sidecar submission.
pylon: PylonClient,
}

impl FlashbotsTask {
Expand All @@ -49,8 +48,9 @@ impl FlashbotsTask {
)?;

let zenith = config.connect_zenith(host_provider);
let pylon = config.connect_pylon();

Ok(Self { config, quincey, zenith, flashbots, signer: builder_key, outbound })
Ok(Self { config, quincey, zenith, flashbots, signer: builder_key, outbound, pylon })
}

/// Prepares a MEV bundle from a simulation result.
Expand Down Expand Up @@ -82,7 +82,7 @@ impl FlashbotsTask {
let tx_bytes = block_tx.encoded_2718().into();

// Build the bundle body with the block_tx bytes as the last transaction in the bundle.
let txs = self.build_bundle_body(sim_result, tx_bytes);
let txs = sim_result.build_bundle_body(tx_bytes);

// Create the MEV bundle (valid only in the specific host block)
Ok(EthSendBundle {
Expand All @@ -91,7 +91,6 @@ impl FlashbotsTask {
..Default::default()
})
}

/// Prepares and signs the submission transaction for the rollup block.
///
/// Creates a `SubmitPrep` instance to build the transaction, then fills
Expand Down Expand Up @@ -131,26 +130,6 @@ impl FlashbotsTask {
}
}

/// Constructs the MEV bundle body from host transactions and the submission transaction.
///
/// Combines all host transactions from the rollup block with the prepared rollup block
/// submission transaction, wrapping each as a non-revertible bundle item.
///
/// The rollup block transaction is placed last in the bundle.
fn build_bundle_body(
&self,
sim_result: &SimResult,
tx_bytes: alloy::primitives::Bytes,
) -> Vec<Bytes> {
sim_result
.block
.host_transactions()
.iter()
.map(|tx| tx.encoded_2718().into())
.chain(std::iter::once(tx_bytes))
.collect()
}

/// Main task loop that processes simulation results and submits bundles to Flashbots.
///
/// Receives `SimResult`s from the inbound channel, prepares MEV bundles, and submits
Expand Down Expand Up @@ -180,7 +159,6 @@ impl FlashbotsTask {

// Prepare a MEV bundle with the configured call type from the sim result
let result = self.prepare(&sim_result).instrument(span.clone()).await;

let bundle = match result {
Ok(bundle) => bundle,
Err(error) => {
Expand All @@ -190,52 +168,68 @@ impl FlashbotsTask {
}
};

// Due to the way the bundle is built, the block transaction is the last transaction in the bundle, and will always exist.
// We'll use this to forward the tx to pylon, which will preload the sidecar.
let block_tx = bundle.txs.last().unwrap().clone();

// Make a child span to cover submission, or use the current span
// if debug is not enabled.
let _guard = span.enter();
let submit_span = debug_span!("flashbots.submit",).or_current();

// Send the bundle to Flashbots, instrumenting the send future so
// all events inside the async send are attributed to the submit
// span.
// span. If Flashbots accepts it, submit the envelope to Pylon.
let flashbots = self.flashbots().to_owned();
let signer = self.signer.clone();
let pylon = self.pylon.clone();

tokio::spawn(
async move {
let response =
flashbots.send_bundle(bundle).with_auth(signer.clone()).into_future().await;

// Check if we met the submission deadline
let met_deadline = Instant::now() <= deadline;

match (response, met_deadline) {
(Ok(resp), true) => {
counter!("signet.builder.flashbots.bundles_submitted").increment(1);
counter!("signet.builder.flashbots.deadline_met").increment(1);
info!(
hash = resp.as_ref().map(|r| r.bundle_hash.to_string()),
"Submitted MEV bundle to Flashbots within deadline"
);
}
(Ok(resp), false) => {
counter!("signet.builder.flashbots.bundles_submitted").increment(1);
counter!("signet.builder.flashbots.deadline_missed").increment(1);
warn!(
hash = resp.as_ref().map(|r| r.bundle_hash.to_string()),
"Submitted MEV bundle to Flashbots AFTER deadline - submission may be too late"
);
}
(Err(err), true) => {
counter!("signet.builder.flashbots.submission_failures").increment(1);
error!(%err, "MEV bundle submission failed - error returned");
}
(Err(err), false) => {
let resp = match flashbots
.send_bundle(bundle)
.with_auth(signer.clone())
.into_future()
.await
{
Ok(resp) => resp,
Err(err) => {
counter!("signet.builder.flashbots.submission_failures").increment(1);
counter!("signet.builder.flashbots.deadline_missed").increment(1);
error!(%err, "MEV bundle submission failed AFTER deadline - error returned");
if Instant::now() > deadline {
counter!("signet.builder.flashbots.deadline_missed").increment(1);
error!(%err, "MEV bundle submission failed AFTER deadline - error returned");
} else {
error!(%err, "MEV bundle submission failed - error returned");
}
return;
}
};

// Check if we met the submission deadline
counter!("signet.builder.flashbots.bundles_submitted").increment(1);
if Instant::now() > deadline {
counter!("signet.builder.flashbots.deadline_missed").increment(1);
warn!(
?resp,
"Submitted MEV bundle to Flashbots AFTER deadline - submission may be too late"
);
return;
}

counter!("signet.builder.flashbots.deadline_met").increment(1);
info!(
hash = resp.as_ref().map(|r| r.bundle_hash.to_string()),
"Submitted MEV bundle to Flashbots within deadline"
);

if let Err(err) = pylon.post_blob_tx(block_tx).await {
counter!("signet.builder.pylon.submission_failures").increment(1);
error!(%err, "pylon submission failed");
return;
}

counter!("signet.builder.pylon.sidecars_submitted").increment(1);
debug!("posted sidecar to pylon");
}
.instrument(submit_span.clone()),
);
Expand Down
1 change: 1 addition & 0 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub fn setup_test_config() -> &'static BuilderConfig {
submit_deadline_buffer: 500,
max_host_gas_coefficient: Some(80),
constants: SignetSystemConstants::parmigiana(),
pylon_url: "http://localhost:8081".parse().unwrap(),
}
})
}
Expand Down