Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { JsNativeDatabase, JsKvEntry, JsKvListOptions } from "./index";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:

export type { JsNativeDatabase, JsKvEntry, JsKvListOptions };

Expand Down Expand Up @@ -128,19 +128,4 @@
actorId: string,
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
): Promise<JsNativeDatabase>;

export interface NativeRawDatabase {
execute: <TRow extends Record<string, unknown> = Record<string, unknown>>(
query: string,
...args: unknown[]
) => Promise<TRow[]>;
close: () => Promise<void>;
}

export declare function openRawDatabaseFromEnvoy(
handle: EnvoyHandle,
actorId: string,
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
): Promise<NativeRawDatabase>;

export declare const utils: {};
177 changes: 0 additions & 177 deletions rivetkit-typescript/packages/rivetkit-native/wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,182 +194,6 @@ function decodePreloadedKv(preloadedKv) {
};
}

function isPlainObject(value) {
return (
!!value &&
typeof value === "object" &&
!Array.isArray(value) &&
Object.getPrototypeOf(value) === Object.prototype
);
}

function toNativeBinding(value) {
if (value === null || value === undefined) {
return { kind: "null" };
}
if (typeof value === "bigint") {
return { kind: "int", intValue: Number(value) };
}
if (typeof value === "number") {
return Number.isInteger(value)
? { kind: "int", intValue: value }
: { kind: "float", floatValue: value };
}
if (typeof value === "string") {
return { kind: "text", textValue: value };
}
if (value instanceof ArrayBuffer) {
return { kind: "blob", blobValue: Buffer.from(value) };
}
if (ArrayBuffer.isView(value)) {
return {
kind: "blob",
blobValue: Buffer.from(value.buffer, value.byteOffset, value.byteLength),
};
}

throw new Error(`unsupported sqlite binding type: ${typeof value}`);
}

function extractNamedSqliteParameters(sql) {
return [...sql.matchAll(/([:@$][A-Za-z_][A-Za-z0-9_]*)/g)].map(
(match) => match[1],
);
}

function getNamedSqliteBinding(bindings, name) {
if (name in bindings) {
return bindings[name];
}

const bareName = name.slice(1);
if (bareName in bindings) {
return bindings[bareName];
}

for (const prefix of [":", "@", "$"]) {
const candidate = `${prefix}${bareName}`;
if (candidate in bindings) {
return bindings[candidate];
}
}

return undefined;
}

function normalizeBindings(sql, args) {
if (!args || args.length === 0) {
return [];
}

if (
args.length === 1 &&
isPlainObject(args[0]) &&
!(args[0] instanceof Uint8Array)
) {
const names = extractNamedSqliteParameters(sql);
if (names.length === 0) {
throw new Error(
"native sqlite object bindings require named placeholders in the SQL statement",
);
}
return names.map((name) => {
const value = getNamedSqliteBinding(args[0], name);
if (value === undefined) {
throw new Error(`missing bind parameter: ${name}`);
}
return toNativeBinding(value);
});
}

return args.map(toNativeBinding);
}

function mapRows(rows, columns) {
return rows.map((row) => {
const rowObject = {};
for (let i = 0; i < columns.length; i++) {
rowObject[columns[i]] = row[i];
}
return rowObject;
});
}

function wrapNativeStorageError(nativeDb, error) {
const lastKvError =
typeof nativeDb.takeLastKvError === "function"
? nativeDb.takeLastKvError()
: null;
if (!lastKvError) {
throw error;
}
throw new Error(
`Database query failed because the underlying storage is no longer available (${lastKvError}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`,
{ cause: error },
);
}

async function openRawDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
const nativeDb = await openDatabaseFromEnvoy(
handle,
actorId,
preloadedEntries,
);
let closed = false;

const ensureOpen = () => {
if (closed) {
throw new Error("database is closed");
}
};

return {
execute: async (query, ...args) => {
ensureOpen();

if (args.length > 0) {
const bindings = normalizeBindings(query, args);
const token = query.trimStart().slice(0, 16).toUpperCase();
const returnsRows =
token.startsWith("SELECT") ||
token.startsWith("PRAGMA") ||
token.startsWith("WITH") ||
/\bRETURNING\b/i.test(query);

if (returnsRows) {
try {
const result = await nativeDb.query(query, bindings);
return mapRows(result.rows, result.columns);
} catch (error) {
wrapNativeStorageError(nativeDb, error);
}
}

try {
await nativeDb.run(query, bindings);
} catch (error) {
wrapNativeStorageError(nativeDb, error);
}
return [];
}

try {
const result = await nativeDb.exec(query);
return mapRows(result.rows, result.columns);
} catch (error) {
wrapNativeStorageError(nativeDb, error);
}
},
close: async () => {
if (closed) {
return;
}
closed = true;
await nativeDb.close();
},
};
}

/**
* Route callback envelopes from the native addon to EnvoyConfig callbacks.
*/
Expand Down Expand Up @@ -635,4 +459,3 @@ function handleEvent(event, config, wrappedHandle) {
module.exports.startEnvoy = startEnvoy;
module.exports.startEnvoySync = startEnvoySync;
module.exports.openDatabaseFromEnvoy = openDatabaseFromEnvoy;
module.exports.openRawDatabaseFromEnvoy = openRawDatabaseFromEnvoy;
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { createActorRouter } from "../../src/actor/router";
import { routeWebSocket } from "../../src/actor/router-websocket-endpoints";
import { HEADER_CONN_PARAMS } from "../../src/common/actor-router-consts";
import { InlineWebSocketAdapter } from "../../src/common/inline-websocket-adapter";
import type { ISqliteVfs } from "@rivetkit/sqlite-wasm";
import type { NativeDatabaseProvider, SqliteDatabase } from "../../src/db/config";
import {
DYNAMIC_BOOTSTRAP_CONFIG_GLOBAL_KEY,
DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS,
Expand All @@ -34,8 +34,6 @@ import {
} from "../../src/dynamic/runtime-bridge";
import { RegistryConfigSchema } from "../../src/registry/config";

const { SqliteVfsPool } = require("@rivetkit/sqlite-wasm") as typeof import("@rivetkit/sqlite-wasm");

interface IsolateReferenceLike {
applySyncPromise(
receiver: unknown,
Expand All @@ -60,6 +58,10 @@ interface DynamicHostBridge {
kvDeleteRange: IsolateReferenceLike;
kvListPrefix: IsolateReferenceLike;
kvListRange: IsolateReferenceLike;
dbExec: IsolateReferenceLike;
dbQuery: IsolateReferenceLike;
dbRun: IsolateReferenceLike;
dbClose: IsolateReferenceLike;
setAlarm: IsolateReferenceLike;
clientCall: IsolateReferenceLike;
ackHibernatableWebSocketMessage: IsolateReferenceLike;
Expand Down Expand Up @@ -108,7 +110,7 @@ interface DynamicActorDriver {
},
): Promise<Array<[Uint8Array, Uint8Array]>>;
setAlarm(actor: { id: string }, timestamp: number): Promise<void>;
createSqliteVfs(actorId: string): Promise<ISqliteVfs>;
getNativeDatabaseProvider(): NativeDatabaseProvider;
startSleep(actorId: string): void;
ackHibernatableWebSocketMessage(
gatewayId: ArrayBuffer,
Expand Down Expand Up @@ -282,12 +284,7 @@ const webSocketSessions = new Map<
}
>();
const CLIENT_ACCESSOR_METHODS = new Set(["get", "getOrCreate", "getForId", "create"]);
let sqliteVfsPoolPromise:
| Promise<{
acquire(actorId: string): Promise<ISqliteVfs>;
shutdown(): Promise<void>;
}>
| undefined;
const nativeDatabaseCache = new Map<string, SqliteDatabase>();

type DynamicActorRouter = ReturnType<typeof createActorRouter>;

Expand Down Expand Up @@ -371,6 +368,10 @@ function readHostBridge(): DynamicHostBridge {
kvListRange: getRequiredHostRef(
DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.kvListRange,
),
dbExec: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbExec),
dbQuery: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbQuery),
dbRun: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbRun),
dbClose: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.dbClose),
setAlarm: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.setAlarm),
clientCall: getRequiredHostRef(DYNAMIC_HOST_BRIDGE_GLOBAL_KEYS.clientCall),
ackHibernatableWebSocketMessage: getRequiredHostRef(
Expand Down Expand Up @@ -452,23 +453,6 @@ async function getRuntimeState(): Promise<DynamicRuntimeState> {
return await runtimeStatePromise;
}

async function loadSqliteVfsPool(): Promise<{
acquire(actorId: string): Promise<ISqliteVfs>;
shutdown(): Promise<void>;
}> {
if (!sqliteVfsPoolPromise) {
sqliteVfsPoolPromise = Promise.resolve().then(
() =>
new SqliteVfsPool({
actorsPerInstance: 50,
idleDestroyMs: 30_000,
}),
);
}

return await sqliteVfsPoolPromise;
}

function dynamicHostLog(level: "debug" | "warn", message: string): void {
if (!hostBridge.log) {
return;
Expand Down Expand Up @@ -512,6 +496,45 @@ function bridgeCallSync<T>(ref: IsolateReferenceLike, args: unknown[]): T {
}) as T;
}

function createNativeDatabaseBridge(actorIdValue: string): SqliteDatabase {
return {
async exec(
sql: string,
callback?: (row: unknown[], columns: string[]) => void,
): Promise<void> {
const result = await bridgeCall<{
columns: string[];
rows: unknown[][];
}>(hostBridge.dbExec, [actorIdValue, sql]);
if (!callback) {
return;
}
for (const row of result.rows) {
callback(row, result.columns);
}
},
async run(
sql: string,
params?: unknown[] | Record<string, unknown>,
): Promise<void> {
await bridgeCall(hostBridge.dbRun, [actorIdValue, sql, params]);
},
async query(
sql: string,
params?: unknown[] | Record<string, unknown>,
): Promise<{ rows: unknown[][]; columns: string[] }> {
return await bridgeCall(hostBridge.dbQuery, [actorIdValue, sql, params]);
},
async close(): Promise<void> {
try {
await bridgeCall(hostBridge.dbClose, [actorIdValue]);
} finally {
nativeDatabaseCache.delete(actorIdValue);
}
},
};
}

function toArrayBuffer(input: Uint8Array | ArrayBuffer): ArrayBuffer {
if (input instanceof ArrayBuffer) {
return input;
Expand Down Expand Up @@ -851,9 +874,18 @@ const actorDriver: DynamicActorDriver = {
async setAlarm(actor, timestamp: number): Promise<void> {
await bridgeCall(hostBridge.setAlarm, [actor.id, timestamp]);
},
async createSqliteVfs(actorIdValue: string): Promise<ISqliteVfs> {
const pool = await loadSqliteVfsPool();
return await pool.acquire(actorIdValue);
getNativeDatabaseProvider(): NativeDatabaseProvider {
return {
open: async (actorIdValue: string): Promise<SqliteDatabase> => {
const existing = nativeDatabaseCache.get(actorIdValue);
if (existing) {
return existing;
}
const database = createNativeDatabaseBridge(actorIdValue);
nativeDatabaseCache.set(actorIdValue, database);
return database;
},
};
},
startSleep(requestActorId: string): void {
bridgeCallSync(hostBridge.startSleep, [requestActorId]);
Expand Down Expand Up @@ -1306,10 +1338,13 @@ async function dynamicDisposeEnvelope(): Promise<boolean> {
}
webSocketSessions.clear();
runtimeStopMode = undefined;
if (sqliteVfsPoolPromise) {
const sqliteVfsPool = await sqliteVfsPoolPromise;
await sqliteVfsPool.shutdown();
sqliteVfsPoolPromise = undefined;
for (const [actorId, database] of nativeDatabaseCache.entries()) {
try {
await database.close();
} catch {
// noop
}
nativeDatabaseCache.delete(actorId);
}
return true;
}
Expand Down
2 changes: 0 additions & 2 deletions rivetkit-typescript/packages/rivetkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@
"@rivetkit/engine-runner": "workspace:*",
"@rivetkit/fast-json-patch": "^3.1.2",
"@rivetkit/on-change": "^6.0.2-rc.1",
"@rivetkit/sqlite": "^0.1.1",
"@rivetkit/sqlite-wasm": "workspace:*",
"@rivetkit/traces": "workspace:*",
"@rivetkit/virtual-websocket": "workspace:*",
"@rivetkit/workflow-engine": "workspace:*",
Expand Down
Loading
Loading