Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 2 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ members = [
"crates/table",
"crates/testing",
"crates/update",
"crates/vm",
"modules/benchmarks",
"modules/keynote-benchmarks",
"modules/perf-test",
Expand Down Expand Up @@ -143,7 +142,6 @@ spacetimedb-schema = { path = "crates/schema", version = "=2.0.5" }
spacetimedb-standalone = { path = "crates/standalone", version = "=2.0.5" }
spacetimedb-sql-parser = { path = "crates/sql-parser", version = "=2.0.5" }
spacetimedb-table = { path = "crates/table", version = "=2.0.5" }
spacetimedb-vm = { path = "crates/vm", version = "=2.0.5" }
spacetimedb-fs-utils = { path = "crates/fs-utils", version = "=2.0.5" }
spacetimedb-snapshot = { path = "crates/snapshot", version = "=2.0.5" }
spacetimedb-subscription = { path = "crates/subscription", version = "=2.0.5" }
Expand Down
102 changes: 5 additions & 97 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion};
use spacetimedb::client::consume_each_list::ConsumeEachBuffer;
use spacetimedb::db::relational_db::RelationalDB;
use spacetimedb::error::DBError;
use spacetimedb::host::module_host::DatabaseTableUpdate;
use spacetimedb::identity::AuthCtx;
use spacetimedb::sql::ast::SchemaViewer;
use spacetimedb::subscription::query::compile_read_only_queryset;
use spacetimedb::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
use spacetimedb::subscription::subscription::ExecutionSet;
use spacetimedb::subscription::tx::DeltaTx;
use spacetimedb::subscription::{collect_table_update, TableUpdateType};
use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
use spacetimedb_client_api_messages::websocket::v1::{BsatnFormat, Compression};
use spacetimedb_client_api_messages::websocket::v1::BsatnFormat;
use spacetimedb_datastore::execution_context::Workload;
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_primitives::{col_list, TableId};
use spacetimedb_query::compile_subscription;
use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue, ProductValue};

use spacetimedb_schema::table_name::TableName;
use spacetimedb_sats::{bsatn, product, AlgebraicType, AlgebraicValue};
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;

Expand Down Expand Up @@ -52,15 +47,6 @@ fn create_table_footprint(db: &RelationalDB) -> Result<TableId, DBError> {
db.create_table_for_test("footprint", schema, indexes)
}

fn insert_op(table_id: TableId, table_name: &str, row: ProductValue) -> DatabaseTableUpdate {
DatabaseTableUpdate {
table_id,
table_name: TableName::for_test(table_name),
inserts: [row].into(),
deletes: [].into(),
}
}

fn eval(c: &mut Criterion) {
let raw = SpacetimeRaw::build(false).unwrap();

Expand Down Expand Up @@ -115,16 +101,12 @@ fn eval(c: &mut Criterion) {
let footprint = AlgebraicValue::sum(1, AlgebraicValue::unit());
let owner = 6u64;

let new_lhs_row = product!(entity_id, owner, footprint);
let new_rhs_row = product!(entity_id, chunk_index, x, z, dimension);

let ins_lhs = insert_op(lhs, "footprint", new_lhs_row);
let ins_rhs = insert_op(rhs, "location", new_rhs_row);
let update = [&ins_lhs, &ins_rhs];
let _new_lhs_row = product!(entity_id, owner, footprint);
let _new_rhs_row = product!(entity_id, chunk_index, x, z, dimension);

let bsatn_rlb_pool = black_box(BsatnRowListBuilderPool::new());

// A benchmark runner for the new query engine
// A benchmark runner for the subscription engine.
let bench_query = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Subscribe);
Expand Down Expand Up @@ -154,20 +136,6 @@ fn eval(c: &mut Criterion) {
});
};

let bench_eval = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Update);
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
let query: ExecutionSet = query.into();

b.iter(|| {
let updates =
black_box(query.eval::<BsatnFormat>(&raw.db, &tx, &bsatn_rlb_pool, None, Compression::None));
updates.consume_each_list(&mut |buffer| bsatn_rlb_pool.try_put(buffer));
})
});
};

// Join 1M rows on the left with 12K rows on the right.
// Note, this should use an index join so as not to read the entire footprint table.
let semijoin = format!(
Expand All @@ -183,66 +151,6 @@ fn eval(c: &mut Criterion) {
bench_query(c, "footprint-scan", "select * from footprint");
bench_query(c, "footprint-semijoin", &semijoin);
bench_query(c, "index-scan-multi", index_scan_multi);

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-scan --exact --profile-time=30
// Iterate 1M rows.
bench_eval(c, "full-scan", "select * from footprint");

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- full-join --exact --profile-time=30
// Join 1M rows on the left with 12K rows on the right.
// Note, this should use an index join so as not to read the entire footprint table.
let name = format!(
r#"
select footprint.*
from footprint join location on footprint.entity_id = location.entity_id
where location.chunk_index = {chunk_index}
"#
);
bench_eval(c, "full-join", &name);

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- incr-select --exact --profile-time=30
c.bench_function("incr-select", |b| {
// A passthru executed independently of the database.
let select_lhs = "select * from footprint";
let select_rhs = "select * from location";
let tx = &raw.db.begin_tx(Workload::Update);
let query_lhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_lhs).unwrap();
let query_rhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_rhs).unwrap();
let query = ExecutionSet::from_iter(query_lhs.into_iter().chain(query_rhs));
let tx = &tx.into();

b.iter(|| drop(black_box(query.eval_incr_for_test(&raw.db, tx, &update, None))))
});

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- incr-join --exact --profile-time=30
c.bench_function("incr-join", |b| {
// Not a passthru - requires reading of database state.
let join = format!(
"\
select footprint.* \
from footprint join location on footprint.entity_id = location.entity_id \
where location.chunk_index = {chunk_index}"
);
let tx = &raw.db.begin_tx(Workload::Update);
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, &join).unwrap();
let query: ExecutionSet = query.into();
let tx = &tx.into();

b.iter(|| drop(black_box(query.eval_incr_for_test(&raw.db, tx, &update, None))));
});

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- query-indexes-multi --exact --profile-time=30
// Iterate 1M rows.
bench_eval(
c,
"query-indexes-multi",
"select * from location WHERE x = 0 AND z = 10000 AND dimension = 0",
);
}

criterion_group!(benches, eval);
Expand Down
6 changes: 1 addition & 5 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,7 @@ impl Host {
.await
.map_err(|e| {
log::warn!("{e}");
if let Some(auth_err) = e.get_auth_error() {
(StatusCode::UNAUTHORIZED, auth_err.to_string())
} else {
(StatusCode::BAD_REQUEST, e.to_string())
}
(StatusCode::BAD_REQUEST, e.to_string())
})?;

let total_duration = sql_start.elapsed();
Expand Down
2 changes: 0 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ spacetimedb-query.workspace = true
spacetimedb-sats = { workspace = true, features = ["serde"] }
spacetimedb-schema.workspace = true
spacetimedb-table.workspace = true
spacetimedb-vm.workspace = true
spacetimedb-snapshot.workspace = true
spacetimedb-subscription.workspace = true
spacetimedb-expr.workspace = true
Expand Down Expand Up @@ -156,7 +155,6 @@ spacetimedb-lib = { path = "../lib", features = ["proptest", "test"] }
spacetimedb-sats = { path = "../sats", features = ["proptest"] }
spacetimedb-commitlog = { path = "../commitlog", features = ["test"] }
spacetimedb-datastore = { path = "../datastore/", features = ["test"] }
spacetimedb-vm = { workspace = true, features = ["test"]}

criterion.workspace = true
# Also as dev-dependencies for use in _this_ crate's tests.
Expand Down
70 changes: 1 addition & 69 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Identity;
use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath};
use spacetimedb_primitives::*;
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::raw_identifier::RawIdentifier;
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
Expand All @@ -56,8 +55,6 @@ use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotReposit
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::page_pool::PagePool;
use spacetimedb_table::table::{RowRef, TableScanIter};
use spacetimedb_vm::errors::{ErrorType, ErrorVm};
use spacetimedb_vm::ops::parse;
use std::borrow::Cow;
use std::io;
use std::ops::{Bound, RangeBounds};
Expand Down Expand Up @@ -1511,32 +1508,6 @@ impl RelationalDB {
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_QRY] from `st_var`
pub(crate) fn query_limit(&self, tx: &Tx) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowQryThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_SUB] from `st_var`
#[allow(dead_code)]
pub(crate) fn sub_limit(&self, tx: &Tx) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowSubThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of [ST_VARNAME_SLOW_INC] from `st_var`
#[allow(dead_code)]
pub(crate) fn incr_limit(&self, tx: &Tx) -> Result<Option<u64>, DBError> {
if let Some(StVarValue::U64(ms)) = self.read_var(tx, StVarName::SlowIncThreshold)? {
return Ok(Some(ms));
}
Ok(None)
}

/// Read the value of a system variable from `st_var`
pub(crate) fn read_var(&self, tx: &Tx, name: StVarName) -> Result<Option<StVarValue>, DBError> {
if let Some(row_ref) = self
Expand All @@ -1548,31 +1519,6 @@ impl RelationalDB {
Ok(None)
}

/// Update the value of a system variable in `st_var`
pub(crate) fn write_var(&self, tx: &mut MutTx, name: StVarName, literal: &str) -> Result<(), DBError> {
let value = Self::parse_var(name, literal)?;
if let Some(row_ref) = self
.iter_by_col_eq_mut(tx, ST_VAR_ID, StVarFields::Name.col_id(), &name.into())?
.next()
{
self.delete(tx, ST_VAR_ID, [row_ref.pointer()]);
}
tx.insert_via_serialize_bsatn(ST_VAR_ID, &StVarRow { name, value })?;
Ok(())
}

/// Parse the literal representation of a system variable
fn parse_var(name: StVarName, literal: &str) -> Result<StVarValue, DBError> {
StVarValue::try_from_primitive(parse::parse(literal, &name.type_of())?).map_err(|v| {
ErrorVm::Type(ErrorType::Parse {
value: literal.to_string(),
ty: fmt_algebraic_type(&name.type_of()).to_string(),
err: format!("error parsing value: {v:?}"),
})
.into()
})
}

/// Write `rows` into a (sender) view's backing table.
///
/// # Process
Expand Down Expand Up @@ -2353,9 +2299,7 @@ mod tests {

use super::tests_utils::begin_mut_tx;
use super::*;
use crate::db::relational_db::tests_utils::{
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
};
use crate::db::relational_db::tests_utils::{begin_tx, insert, make_snapshot, TestDB};
use anyhow::bail;
use bytes::Bytes;
use commitlog::payload::txdata;
Expand Down Expand Up @@ -2465,18 +2409,6 @@ mod tests {
Ok(())
}

#[test]
fn test_system_variables() {
let db = TestDB::durable().expect("failed to create db");
let _ = with_auto_commit(&db, |tx| db.write_var(tx, StVarName::RowLimit, "5"));
assert_eq!(
5,
with_read_only(&db, |tx| db.row_limit(tx))
.expect("failed to read from st_var")
.expect("row_limit does not exist")
);
}

#[test]
fn test_open_twice() -> ResultTest<()> {
let stdb = TestDB::durable()?;
Expand Down
Loading
Loading