Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,15 @@ impl<S: SubgraphStore, AC: amp::Client> SubgraphInstanceManager<S, AC> {
// when to stop processing them.
// - Offchain data sources might require processing beyond the end block of
// onchain data sources, so the subgraph needs to continue.
let max_end_block: Option<BlockNumber> = if data_sources.len() == end_blocks.len() {
//
// Note: we explicitly check each data source rather than comparing lengths, because
// `end_blocks` is a BTreeSet and deduplicates equal values, which would cause the
// length comparison to fail when multiple data sources share the same `end_block`.
let max_end_block: Option<BlockNumber> = if data_sources.iter().all(|d| {
d.as_onchain()
.and_then(|d: &C::DataSource| d.end_block())
.is_some()
}) {
end_blocks.iter().max().cloned()
} else {
None
Expand Down
9 changes: 8 additions & 1 deletion core/src/subgraph/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,18 @@ where
.observe(block.trigger_count() as f64);
}

// Check if we should skip this block (optimization for blocks without triggers)
// Check if we should skip this block (optimization for blocks without triggers).
// Do not skip if max_end_block has been reached — fall through to process_block so the
// block pointer is persisted and the existing max_end_block check in handle_action fires.
let max_end_block_reached = self
.inputs
.max_end_block
.is_some_and(|max| block_ptr.number >= max);
if block.trigger_count() == 0
&& self.state.skip_ptr_updates_timer.elapsed() <= SKIP_PTR_UPDATES_THRESHOLD
&& !self.inputs.store.is_deployment_synced()
&& !close_to_chain_head(&block_ptr, &self.inputs.chain.chain_head_ptr().await?, 1000)
&& !max_end_block_reached
{
// Skip this block and continue with the same stream
return Ok(RunnerState::AwaitingBlock { block_stream });
Expand Down
Loading