-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Improvements to call cache cleanup #6476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
2e46573
a2fe887
f501334
90b8248
b2c9bbe
6a388cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -159,7 +159,10 @@ mod data { | |
| use std::iter::FromIterator; | ||
| use std::str::FromStr; | ||
|
|
||
| use std::time::Instant; | ||
|
|
||
| use crate::transaction_receipt::RawTransactionReceipt; | ||
| use crate::vid_batcher::AdaptiveBatchSize; | ||
|
|
||
| use super::JsonBlock; | ||
|
|
||
|
|
@@ -1640,21 +1643,76 @@ mod data { | |
| } | ||
| } | ||
|
|
||
| const CALL_CACHE_CONTRACT_ADDRESS_INDEX: &str = "call_cache_contract_address"; | ||
|
|
||
| /// Ensure that an index on `contract_address` exists on the | ||
| /// call_cache table to speed up deletion queries. If the index does | ||
| /// not exist, create it concurrently. | ||
| async fn ensure_contract_address_index( | ||
| &self, | ||
| conn: &mut AsyncPgConnection, | ||
| logger: &Logger, | ||
| ) -> Result<(), Error> { | ||
| let (schema_name, table_qname) = match self { | ||
| Storage::Shared => ("public", "public.eth_call_cache".to_string()), | ||
| Storage::Private(Schema { | ||
| name, call_cache, .. | ||
| }) => (name.as_str(), call_cache.qname.clone()), | ||
| }; | ||
|
|
||
| let has_index = crate::catalog::table_has_index( | ||
| conn, | ||
| schema_name, | ||
| Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, | ||
| ) | ||
| .await?; | ||
|
|
||
| if !has_index { | ||
| let start = Instant::now(); | ||
| info!( | ||
| logger, | ||
| "Creating index {} on {}.contract_address; \ | ||
| this may take a long time", | ||
| Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, | ||
| table_qname | ||
| ); | ||
| conn.batch_execute(&format!( | ||
| "create index concurrently if not exists {} \ | ||
| on {}(contract_address)", | ||
| Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, | ||
| table_qname | ||
| )) | ||
| .await?; | ||
| let duration = start.elapsed(); | ||
| info!( | ||
| logger, | ||
| "Finished creating index {} on {}.contract_address in {:?}", | ||
| Self::CALL_CACHE_CONTRACT_ADDRESS_INDEX, | ||
| table_qname, | ||
| duration | ||
| ); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| pub async fn clear_stale_call_cache( | ||
| &self, | ||
| conn: &mut AsyncPgConnection, | ||
| logger: &Logger, | ||
| ttl_days: i32, | ||
| ttl_max_contracts: Option<i64>, | ||
| ) -> Result<(), Error> { | ||
| self.ensure_contract_address_index(conn, logger).await?; | ||
|
|
||
| let mut total_calls: usize = 0; | ||
| let mut total_contracts: i64 = 0; | ||
| // We process contracts in batches to avoid loading too many entries into memory | ||
| // at once. Each contract can have many calls, so we also delete calls in batches. | ||
| // Note: The batch sizes were chosen based on experimentation. Potentially, they | ||
| // could be made configurable via ENV vars. | ||
| // We process contracts in batches to avoid loading too many | ||
| // entries into memory at once. Each contract can have many | ||
| // calls, so we delete calls in adaptive batches that | ||
| // self-tune based on query duration. | ||
| let contracts_batch_size: i64 = 2000; | ||
| let cache_batch_size: usize = 10000; | ||
| let mut batch_size = AdaptiveBatchSize::with_size(100); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may be an edge case (depending on shard/replica setup), but now that batch_size is unbounded, couldn't it grow to a point it creates very large WAL files per batch, which may cause replication lag on read replicas and affect query performance? (tbf 10k already probably had significant WAL/lag impact)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it's a big deal, at least if the adaptive batch size works since it will try to keep transactions to under 3 minutes |
||
|
|
||
| // Limits the number of contracts to process if ttl_max_contracts is set. | ||
| // Used also to adjust the final batch size, so we don't process more | ||
|
|
@@ -1704,20 +1762,23 @@ mod data { | |
| } | ||
|
|
||
| loop { | ||
| let current_size = batch_size.size; | ||
| let start = Instant::now(); | ||
| let next_batch = cache::table | ||
| .select(cache::id) | ||
| .filter(cache::contract_address.eq_any(&stale_contracts)) | ||
| .limit(cache_batch_size as i64) | ||
| .limit(current_size) | ||
| .get_results::<Vec<u8>>(conn) | ||
| .await?; | ||
| let deleted_count = | ||
| diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) | ||
| .execute(conn) | ||
| .await?; | ||
| batch_size.adapt(start.elapsed()); | ||
|
|
||
| total_calls += deleted_count; | ||
|
|
||
| if deleted_count < cache_batch_size { | ||
| if (deleted_count as i64) < current_size { | ||
| break; | ||
| } | ||
| } | ||
|
|
@@ -1754,11 +1815,11 @@ mod data { | |
| SELECT id | ||
| FROM {} | ||
| WHERE contract_address = ANY($1) | ||
| LIMIT {} | ||
| LIMIT $2 | ||
| ) | ||
| DELETE FROM {} USING targets | ||
| WHERE {}.id = targets.id", | ||
| call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname | ||
| call_cache.qname, call_cache.qname, call_cache.qname | ||
| ); | ||
|
|
||
| let delete_meta_query = format!( | ||
|
|
@@ -1806,14 +1867,18 @@ mod data { | |
| } | ||
|
|
||
| loop { | ||
| let current_size = batch_size.size; | ||
| let start = Instant::now(); | ||
| let deleted_count = sql_query(&delete_cache_query) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is actually oversight on my part when implementing this (sorry about that 🙏🏻), I would be nice to add logs before/after the delete query to show progress. |
||
| .bind::<Array<Bytea>, _>(&stale_contracts) | ||
| .bind::<BigInt, _>(current_size) | ||
| .execute(conn) | ||
| .await?; | ||
| batch_size.adapt(start.elapsed()); | ||
|
|
||
| total_calls += deleted_count; | ||
|
|
||
| if deleted_count < cache_batch_size { | ||
| if (deleted_count as i64) < current_size { | ||
| break; | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failure on
create index concurrentlymay leave an invalid index, that will not be used, but will exist as a name on the table. We should probably match on error here and executedrop index if exists.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed that