Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,32 @@ pub(crate) async fn indexes_for_table(
Ok(results.into_iter().map(|i| i.def).collect())
}

pub(crate) async fn table_has_index(
conn: &mut AsyncPgConnection,
schema_name: &str,
index_name: &str,
) -> Result<bool, StoreError> {
#[derive(QueryableByName)]
#[allow(dead_code)]
struct Exists {
#[diesel(sql_type = diesel::sql_types::Integer)]
exists: i32,
}

let exists = sql_query(
"SELECT 1 AS exists FROM pg_indexes \
WHERE schemaname = $1 AND indexname = $2",
)
.bind::<Text, _>(schema_name)
.bind::<Text, _>(index_name)
.get_result::<Exists>(conn)
.await
.optional()
.map_err::<StoreError, _>(Into::into)?;

Ok(exists.is_some())
}

pub(crate) async fn drop_index(
conn: &mut AsyncPgConnection,
schema_name: &str,
Expand Down
85 changes: 75 additions & 10 deletions store/postgres/src/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ mod data {
use std::iter::FromIterator;
use std::str::FromStr;

use std::time::Instant;

use crate::transaction_receipt::RawTransactionReceipt;
use crate::vid_batcher::AdaptiveBatchSize;

use super::JsonBlock;

Expand Down Expand Up @@ -1640,21 +1643,76 @@ mod data {
}
}

const CALL_CACHE_CONTRACT_ADDRESS_INDEX: &str = "call_cache_contract_address";

/// Ensure that an index on `contract_address` exists on the
/// call_cache table to speed up deletion queries. If the index does
/// not exist, create it concurrently.
async fn ensure_contract_address_index(
&self,
conn: &mut AsyncPgConnection,
logger: &Logger,
) -> Result<(), Error> {
let (schema_name, table_qname) = match self {
Storage::Shared => ("public", "public.eth_call_cache".to_string()),
Storage::Private(Schema {
name, call_cache, ..
}) => (name.as_str(), call_cache.qname.clone()),
};

let has_index = crate::catalog::table_has_index(
conn,
schema_name,
Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX,
)
.await?;

if !has_index {
let start = Instant::now();
info!(
logger,
"Creating index {} on {}.contract_address; \
this may take a long time",
Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX,
table_qname
);
conn.batch_execute(&format!(
"create index concurrently if not exists {} \
on {}(contract_address)",
Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX,
table_qname
))
.await?;
Comment on lines +1678 to +1682
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failure on create index concurrently may leave an invalid index, that will not be used, but will exist as a name on the table. We should probably match on error here and execute drop index if exists.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed that

let duration = start.elapsed();
info!(
logger,
"Finished creating index {} on {}.contract_address in {:?}",
Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX,
table_qname,
duration
);
}

Ok(())
}

pub async fn clear_stale_call_cache(
&self,
conn: &mut AsyncPgConnection,
logger: &Logger,
ttl_days: i32,
ttl_max_contracts: Option<i64>,
) -> Result<(), Error> {
self.ensure_contract_address_index(conn, logger).await?;

let mut total_calls: usize = 0;
let mut total_contracts: i64 = 0;
// We process contracts in batches to avoid loading too many entries into memory
// at once. Each contract can have many calls, so we also delete calls in batches.
// Note: The batch sizes were chosen based on experimentation. Potentially, they
// could be made configurable via ENV vars.
// We process contracts in batches to avoid loading too many
// entries into memory at once. Each contract can have many
// calls, so we delete calls in adaptive batches that
// self-tune based on query duration.
let contracts_batch_size: i64 = 2000;
let cache_batch_size: usize = 10000;
let mut batch_size = AdaptiveBatchSize::with_size(100);
Copy link
Copy Markdown
Member

@dimitrovmaksim dimitrovmaksim Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be an edge case (depending on shard/replica setup), but now that batch_size is unbounded, couldn't it grow to a point it creates very large WAL files per batch, which may cause replication lag on read replicas and affect query performance? (tbf 10k already probably had significant WAL/lag impact)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a big deal, at least if the adaptive batch size works since it will try to keep transactions to under 3 minutes


// Limits the number of contracts to process if ttl_max_contracts is set.
// Used also to adjust the final batch size, so we don't process more
Expand Down Expand Up @@ -1704,20 +1762,23 @@ mod data {
}

loop {
let current_size = batch_size.size;
let start = Instant::now();
let next_batch = cache::table
.select(cache::id)
.filter(cache::contract_address.eq_any(&stale_contracts))
.limit(cache_batch_size as i64)
.limit(current_size)
.get_results::<Vec<u8>>(conn)
.await?;
let deleted_count =
diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch)))
.execute(conn)
.await?;
batch_size.adapt(start.elapsed());

total_calls += deleted_count;

if deleted_count < cache_batch_size {
if (deleted_count as i64) < current_size {
break;
}
}
Expand Down Expand Up @@ -1754,11 +1815,11 @@ mod data {
SELECT id
FROM {}
WHERE contract_address = ANY($1)
LIMIT {}
LIMIT $2
)
DELETE FROM {} USING targets
WHERE {}.id = targets.id",
call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname
call_cache.qname, call_cache.qname, call_cache.qname
);

let delete_meta_query = format!(
Expand Down Expand Up @@ -1806,14 +1867,18 @@ mod data {
}

loop {
let current_size = batch_size.size;
let start = Instant::now();
let deleted_count = sql_query(&delete_cache_query)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually oversight on my part when implementing this (sorry about that 🙏🏻), I would be nice to add logs before/after the delete query to show progress.

.bind::<Array<Bytea>, _>(&stale_contracts)
.bind::<BigInt, _>(current_size)
.execute(conn)
.await?;
batch_size.adapt(start.elapsed());

total_calls += deleted_count;

if deleted_count < cache_batch_size {
if (deleted_count as i64) < current_size {
break;
}
}
Expand Down
7 changes: 7 additions & 0 deletions store/postgres/src/vid_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ pub(crate) struct AdaptiveBatchSize {
}

impl AdaptiveBatchSize {
pub fn with_size(size: i64) -> Self {
Self {
size,
target: ENV_VARS.store.batch_target_duration,
}
}

pub fn new(table: &Table) -> Self {
let size = if table.columns.iter().any(|col| col.is_list()) {
INITIAL_BATCH_SIZE_LIST
Expand Down
Loading