Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::module_host::{EventStatus, ModuleHost, ModuleInfo, NoSuchModule};
use super::scheduler::SchedulerStarter;
use super::v8::V8HeapMetrics;
use super::wasmtime::WasmtimeRuntime;
use super::wasmtime::{WasmMemoryBytesMetric, WasmtimeRuntime};
use super::{Scheduler, UpdateDatabaseResult};
use crate::client::{ClientActorId, ClientName};
use crate::config::{V8Config, WasmConfig};
Expand Down Expand Up @@ -1411,8 +1411,8 @@ where
let _ = DATA_SIZE_METRICS
.data_size_blob_store_bytes_used_by_blobs
.remove_label_values(db);
let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(db);

WasmMemoryBytesMetric::remove_all_metric_label_values_for_database(db);
V8HeapMetrics::remove_all_metric_label_values_for_database(db);

let _ = WORKER_METRICS.v8_request_queue_length.remove_label_values(db);
Expand Down
7 changes: 1 addition & 6 deletions crates/core/src/host/module_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
module_host_context::ModuleCreationContext,
replica_context::ReplicaContext,
};
use spacetimedb_lib::{Identity, RawModuleDef};
use spacetimedb_lib::RawModuleDef;
use spacetimedb_schema::{def::ModuleDef, error::ValidationErrors};
use std::sync::Arc;

Expand Down Expand Up @@ -70,11 +70,6 @@ impl ModuleCommon {
self.info.clone()
}

/// Returns the identity of the database.
pub fn database_identity(&self) -> &Identity {
&self.info.database_identity
}

/// Returns the energy monitor.
pub fn energy_monitor(&self) -> Arc<dyn EnergyMonitor> {
self.energy_monitor.clone()
Expand Down
37 changes: 6 additions & 31 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ const REDUCER_ARGS_BUFFER_SIZE: usize = 4_096;
const JS_PROCEDURE_INSTANCE_QUEUE_CAPACITY: usize = 1;

#[derive(Copy, Clone)]
pub(crate) enum JsWorkerKind {
pub enum JsWorkerKind {
Main,
Procedure,
}
Expand Down Expand Up @@ -345,8 +345,6 @@ fn env_on_isolate_unwrap(isolate: &mut Isolate) -> &mut JsInstanceEnv {
struct JsInstanceEnv {
instance_env: InstanceEnv,
module_def: Option<Arc<ModuleDef>>,
/// Last used-heap sample captured by the worker's periodic heap checks.
cached_used_heap_size: usize,

/// The slab of `BufferIters` created for this instance.
iters: RowIters,
Expand All @@ -371,7 +369,6 @@ impl JsInstanceEnv {
Self {
instance_env,
module_def: None,
cached_used_heap_size: 0,
call_times: CallTimes::new(),
iters: <_>::default(),
chunk_pool: <_>::default(),
Expand Down Expand Up @@ -426,16 +423,6 @@ impl JsInstanceEnv {
}
}

/// Refresh the cached heap usage after an explicit V8 heap sample.
fn set_cached_used_heap_size(&mut self, bytes: usize) {
self.cached_used_heap_size = bytes;
}

/// Return the last heap sample without forcing a fresh V8 query.
fn cached_used_heap_size(&self) -> usize {
self.cached_used_heap_size
}

fn set_module_def(&mut self, module_def: Arc<ModuleDef>) {
self.module_def = Some(module_def);
}
Expand Down Expand Up @@ -1119,11 +1106,8 @@ fn adjust_gauge(gauge: &IntGauge, delta: i64) {
}
}

fn sample_heap_stats(scope: &mut PinScope<'_, '_>, metrics: &mut V8HeapMetrics) -> v8::HeapStatistics {
// Whenever we sample heap statistics, we cache them on the isolate so that
// the per-call execution stats can avoid querying them on each invocation.
fn record_heap_stats(scope: &mut PinScope<'_, '_>, metrics: &mut V8HeapMetrics) -> v8::HeapStatistics {
let stats = scope.get_heap_statistics();
env_on_isolate_unwrap(scope).set_cached_used_heap_size(stats.used_heap_size());
metrics.observe(&stats);
stats
}
Expand All @@ -1147,14 +1131,14 @@ fn should_retire_worker_for_heap(
metrics: &mut V8HeapMetrics,
config: V8HeapPolicyConfig,
) -> Option<(usize, usize)> {
let stats = sample_heap_stats(scope, metrics);
let stats = record_heap_stats(scope, metrics);
let (used, limit) = heap_usage(&stats);
if !heap_fraction_at_or_above(used, limit, config.heap_gc_trigger_fraction) {
return None;
}

scope.low_memory_notification();
let stats = sample_heap_stats(scope, metrics);
let stats = record_heap_stats(scope, metrics);
let (used, limit) = heap_usage(&stats);
if heap_fraction_at_or_above(used, limit, config.heap_retire_fraction) {
Some((used, limit))
Expand Down Expand Up @@ -1683,7 +1667,7 @@ where
.with_label_values(&info.database_identity),
initial_heap_limit: heap_policy.heap_limit_bytes,
};
let _initial_heap_stats = sample_heap_stats(inst.scope, &mut heap_metrics);
let _initial_heap_stats = record_heap_stats(inst.scope, &mut heap_metrics);

// Process requests to the worker.
//
Expand Down Expand Up @@ -1994,25 +1978,16 @@ where
// Derive energy stats.
let energy = energy_from_elapsed(budget, timings.total_duration);

// Reuse the last periodic heap sample instead of querying V8 on every call.
// We use this statistic for energy tracking, so eventual consistency is fine.
let memory_allocation = env.cached_used_heap_size();

if heap_limit_hit.get() > 1 {
let database_identity = *env.instance_env.database_identity();
tracing::warn!(
%database_identity,
used_heap_size = memory_allocation,
current_heap_limit = scope.get_heap_statistics().heap_size_limit(),
"Module hit heap limit multiple times in single call, even after doubling!",
)
}

let stats = ExecutionStats {
energy,
timings,
memory_allocation,
};
let stats = ExecutionStats { energy, timings };
ExecutionResult { stats, call_result }
})
}
Expand Down
30 changes: 9 additions & 21 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use anyhow::{anyhow, bail, ensure, Context};
use bytes::{Buf, Bytes};
use core::future::Future;
use core::time::Duration;
use prometheus::{Histogram, IntCounter, IntGauge};
use prometheus::{Histogram, IntCounter};
use spacetimedb_auth::identity::ConnectionAuthCtx;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_datastore::db_metrics::DB_METRICS;
Expand Down Expand Up @@ -105,6 +105,12 @@ pub struct EnergyStats {
pub remaining: FunctionBudget,
}

impl Default for EnergyStats {
fn default() -> Self {
Self::ZERO
}
}

impl EnergyStats {
pub const ZERO: Self = Self {
budget: FunctionBudget::ZERO,
Expand Down Expand Up @@ -207,6 +213,7 @@ pub(crate) fn run_query_for_view(
Ok(rows)
}

#[derive(Default)]
pub struct ExecutionTimings {
pub total_duration: Duration,
pub wasm_instance_env_call_times: CallTimes,
Expand All @@ -226,10 +233,10 @@ impl ExecutionTimings {
/// The result that `__call_reducer__` produces during normal non-trap execution.
pub type ReducerResult = Result<Option<Bytes>, Box<str>>;

#[derive(Default)]
pub struct ExecutionStats {
pub energy: EnergyStats,
pub timings: ExecutionTimings,
pub memory_allocation: usize,
}

impl ExecutionStats {
Expand Down Expand Up @@ -580,8 +587,6 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
pub struct InstanceCommon {
info: Arc<ModuleInfo>,
energy_monitor: Arc<dyn EnergyMonitor>,
allocated_memory: usize,
metric_wasm_memory_bytes: IntGauge,
vm_metrics: AllVmMetrics,
}

Expand All @@ -594,11 +599,6 @@ impl InstanceCommon {
info: module.info(),
vm_metrics,
energy_monitor: module.energy_monitor(),
// Will be updated on the first reducer call.
allocated_memory: 0,
metric_wasm_memory_bytes: WORKER_METRICS
.wasm_memory_bytes
.with_label_values(module.database_identity()),
}
}

Expand Down Expand Up @@ -780,19 +780,12 @@ impl InstanceCommon {
let ProcedureExecuteResult {
stats:
ExecutionStats {
memory_allocation,
// TODO(procedure-energy): Do something with timing and energy.
..
},
call_result,
} = result;

// TODO(shub): deduplicate with reducer and view logic.
if self.allocated_memory != memory_allocation {
self.metric_wasm_memory_bytes.set(memory_allocation as i64);
self.allocated_memory = memory_allocation;
}

let trapped = call_result.is_err();

let result = match call_result {
Expand Down Expand Up @@ -1040,14 +1033,9 @@ impl InstanceCommon {
let energy_used = stats.energy.used();
let energy_quanta_used = energy_used.into();
let timings = &stats.timings;
let memory_allocation = stats.memory_allocation;

self.energy_monitor
.record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration);
if self.allocated_memory != memory_allocation {
self.metric_wasm_memory_bytes.set(memory_allocation as i64);
self.allocated_memory = memory_allocation;
}

maybe_log_long_running_function(function_name, timings.total_duration);

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/host/wasmtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use anyhow::Context;
use spacetimedb_paths::server::ServerDataDir;
use std::borrow::Cow;
use std::time::Duration;
pub(in crate::host) use wasm_instance_env::WasmMemoryBytesMetric;
use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut};
pub use wasmtime_module::{WasmtimeAsyncModule, WasmtimeInstance, WasmtimeModule};

Expand Down
62 changes: 61 additions & 1 deletion crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use crate::host::wasm_common::module_host_actor::{
use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, TimingSpanSet};
use crate::host::AbiCall;
use crate::subscription::module_subscription_manager::TransactionOffset;
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context as _};
use prometheus::IntGauge;
use spacetimedb_data_structures::map::IntMap;
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCallInfo};
use spacetimedb_lib::{bsatn, ConnectionId, Timestamp};
use spacetimedb_lib::{bsatn, ConnectionId, Identity, Timestamp};
use spacetimedb_primitives::errno::HOST_CALL_FAILURE;
use spacetimedb_primitives::{errno, ColId};
use spacetimedb_schema::def::ModuleDef;
Expand Down Expand Up @@ -133,6 +135,57 @@ pub(super) struct WasmInstanceEnv {
/// A pool of unused allocated chunks that can be reused.
// TODO(Centril): consider using this pool for `console_timer_start` and `bytes_sink_write`.
chunk_pool: ChunkPool,

linear_memory_size_metric: WasmMemoryBytesMetric,
}

pub(in crate::host) struct WasmMemoryBytesMetric {
wasm_memory_bytes: IntGauge,

/// Previous value observed by this intance.
///
/// In [`Self::observe`], we use this to compute a delta against the instance's new memory usage,
/// then increment/decrement the metric value by that delta.
/// We do this rather than `set`ting the metric value as multiple instances may coexist
/// and share the same metric label value.
/// This happens when a database has procedures and reducers running concurrently,
/// and may also happen during a module update, as there may be a period when
/// the new version has already been instantiated but the old version has not yet shut down.
last_observed: i64,
}

impl WasmMemoryBytesMetric {
fn new(database_identity: Identity) -> Self {
Self {
wasm_memory_bytes: WORKER_METRICS.wasm_memory_bytes.with_label_values(&database_identity),
last_observed: 0,
}
}

fn observe(&mut self, memory_usage: usize) {
let memory_usage = memory_usage as i64;

let delta = memory_usage - self.last_observed;

if delta > 0 {
self.wasm_memory_bytes.add(delta);
} else {
self.wasm_memory_bytes.sub(-delta);
}

self.last_observed = memory_usage;
}

pub(in crate::host) fn remove_all_metric_label_values_for_database(database_identity: &Identity) {
let _ = WORKER_METRICS.wasm_memory_bytes.remove_label_values(database_identity);
}
}

impl Drop for WasmMemoryBytesMetric {
fn drop(&mut self) {
// Clean up this instance's metric value by subtracting its part of the usage.
self.wasm_memory_bytes.sub(self.last_observed);
}
}

const STANDARD_BYTES_SINK: u32 = 1;
Expand All @@ -145,6 +198,7 @@ type RtResult<T> = anyhow::Result<T>;
impl WasmInstanceEnv {
/// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`.
pub fn new(instance_env: InstanceEnv) -> Self {
let database_identity = *instance_env.database_identity();
Self {
instance_env,
module_def: None,
Expand All @@ -158,6 +212,7 @@ impl WasmInstanceEnv {
timing_spans: Default::default(),
call_times: CallTimes::new(),
chunk_pool: <_>::default(),
linear_memory_size_metric: WasmMemoryBytesMetric::new(database_identity),
}
}

Expand Down Expand Up @@ -216,6 +271,11 @@ impl WasmInstanceEnv {
self.call_view_anon = call_view_anon;
}

/// Record an observation in [`Self::linear_memory_size_metric`].
pub fn record_memory_size(&mut self, memory_size: usize) {
self.linear_memory_size_metric.observe(memory_size);
}

/// Returns a reference to the memory, assumed to be initialized.
pub fn get_mem(&self) -> Mem {
self.mem.expect("Initialized memory")
Expand Down
19 changes: 7 additions & 12 deletions crates/core/src/host/wasmtime/wasmtime_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {

let Some(call_procedure) = self.call_procedure.as_ref() else {
let res = module_host_actor::ProcedureExecuteResult {
stats: zero_execution_stats(store),
stats: ExecutionStats::default(),
call_result: Err(anyhow::anyhow!(
"Module defines procedure {} but does not export `{}`",
op.name,
Expand Down Expand Up @@ -837,20 +837,15 @@ fn finish_opcall(store: &mut Store<WasmInstanceEnv>, initial_budget: FunctionBud
remaining,
};

let stats = ExecutionStats {
energy,
timings,
memory_allocation: get_memory_size(store),
};
record_memory_size(store);

let stats = ExecutionStats { energy, timings };
(stats, ret_bytes)
}

fn zero_execution_stats(store: &Store<WasmInstanceEnv>) -> ExecutionStats {
ExecutionStats {
energy: module_host_actor::EnergyStats::ZERO,
timings: module_host_actor::ExecutionTimings::zero(),
memory_allocation: get_memory_size(store),
}
fn record_memory_size(store: &mut Store<WasmInstanceEnv>) {
let memory_usage = get_memory_size(store);
store.data_mut().record_memory_size(memory_usage);
}

fn get_memory_size(store: &Store<WasmInstanceEnv>) -> usize {
Expand Down
Loading
Loading