Skip to content

Commit 21f2971

Browse files
committed
feat(block): migrate existing tracked rows when block algo is enabled
When cloudsync_set_column(..., 'algo', 'block') is called on a table that already has tracked rows, those rows are now immediately migrated into the blocks table. Previously pre-existing column values were silently ignored until the next UPDATE, leaving sync state incomplete. Migration uses a two-phase collect-then-write approach to avoid SQLite cursor invalidation, and INSERT OR IGNORE / ON CONFLICT DO NOTHING for idempotency. Bumps version to 1.0.13.
1 parent 6694c2e commit 21f2971

File tree

9 files changed

+428
-1
lines changed

9 files changed

+428
-1
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.
44

55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
66

7+
## [1.0.13] - 2026-04-14
8+
9+
### Fixed
10+
11+
- **Block-level LWW migration**: When `cloudsync_set_column(..., 'algo', 'block')` is called on a table that already has tracked rows, those rows are now immediately migrated into the blocks table. Previously, pre-existing column values were ignored until the next UPDATE, leaving sync state incomplete. The migration uses a two-phase collect-then-write approach to avoid SQLite cursor invalidation and `INSERT OR IGNORE` / `ON CONFLICT DO NOTHING` semantics for idempotency.
12+
13+
### Added
14+
15+
- Unit test `do_test_block_lww_existing_data` (Block LWW Existing Data) verifying block migration on `set_column`, idempotency of repeated `set_column` calls, and correct materialization after update.
16+
- PostgreSQL test `50_block_lww_existing_data.sql` with equivalent coverage for the PostgreSQL backend.
17+
718
## [1.0.12] - 2026-04-11
819

920
### Fixed

src/cloudsync.c

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2069,6 +2069,172 @@ int merge_insert (cloudsync_context *data, cloudsync_table_context *table, const
20692069

20702070
// MARK: - Block column setup -
20712071

2072+
// Migrate existing tracked rows to block format when block-level LWW is first enabled on a column.
2073+
// Scans the metadata table for alive rows with the plain col_name entry (not yet block entries),
2074+
// reads each row's current value from the base table, splits it into blocks, and inserts
2075+
// the block entries into both the blocks table and the metadata table.
2076+
// Uses INSERT OR IGNORE semantics so the operation is safe to call multiple times.
2077+
static int block_migrate_existing_rows (cloudsync_context *data, cloudsync_table_context *table, int col_idx) {
2078+
const char *col_name = table->col_name[col_idx];
2079+
if (!col_name || !table->meta_ref || !table->blocks_ref) return DBRES_OK;
2080+
2081+
const char *delim = table->col_delimiter[col_idx] ? table->col_delimiter[col_idx] : BLOCK_DEFAULT_DELIMITER;
2082+
int64_t db_version = cloudsync_dbversion_next(data, CLOUDSYNC_VALUE_NOTSET);
2083+
2084+
// Phase 1: collect all existing PKs that have an alive regular col_name entry
2085+
// AND do not yet have any entries in the blocks table for this column.
2086+
// The NOT IN filter makes this idempotent: rows that were already migrated
2087+
// (or had their blocks created via INSERT) are skipped on subsequent calls.
2088+
// We collect PKs before writing so that writes to the metadata table (Phase 2)
2089+
// do not perturb the read cursor on the same table.
2090+
char *like_pattern = block_build_colname(col_name, "%");
2091+
if (!like_pattern) return DBRES_NOMEM;
2092+
2093+
char *scan_sql = cloudsync_memory_mprintf(SQL_META_SCAN_COL_FOR_MIGRATION, table->meta_ref, table->blocks_ref);
2094+
if (!scan_sql) { cloudsync_memory_free(like_pattern); return DBRES_NOMEM; }
2095+
dbvm_t *scan_vm = NULL;
2096+
int rc = databasevm_prepare(data, scan_sql, &scan_vm, 0);
2097+
cloudsync_memory_free(scan_sql);
2098+
if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); return rc; }
2099+
2100+
rc = databasevm_bind_text(scan_vm, 1, col_name, -1);
2101+
if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
2102+
// Bind like_pattern as ?2 and keep it alive until after all scan steps complete,
2103+
// because databasevm_bind_text uses SQLITE_STATIC (no copy).
2104+
rc = databasevm_bind_text(scan_vm, 2, like_pattern, -1);
2105+
if (rc != DBRES_OK) { cloudsync_memory_free(like_pattern); databasevm_finalize(scan_vm); return rc; }
2106+
2107+
// Collect pk blobs into a dynamically-grown array of owned copies
2108+
void **pks = NULL;
2109+
size_t *pklens = NULL;
2110+
int pk_count = 0;
2111+
int pk_cap = 0;
2112+
2113+
while ((rc = databasevm_step(scan_vm)) == DBRES_ROW) {
2114+
size_t pklen = 0;
2115+
const void *pk = database_column_blob(scan_vm, 0, &pklen);
2116+
if (!pk || pklen == 0) continue;
2117+
2118+
if (pk_count >= pk_cap) {
2119+
int new_cap = pk_cap ? pk_cap * 2 : 8;
2120+
void **new_pks = (void **)cloudsync_memory_realloc(pks, (uint64_t)(new_cap * sizeof(void *)));
2121+
size_t *new_pklens = (size_t *)cloudsync_memory_realloc(pklens, (uint64_t)(new_cap * sizeof(size_t)));
2122+
if (!new_pks || !new_pklens) {
2123+
cloudsync_memory_free(new_pks ? new_pks : pks);
2124+
cloudsync_memory_free(new_pklens ? new_pklens : pklens);
2125+
databasevm_finalize(scan_vm);
2126+
return DBRES_NOMEM;
2127+
}
2128+
pks = new_pks;
2129+
pklens = new_pklens;
2130+
pk_cap = new_cap;
2131+
}
2132+
2133+
pks[pk_count] = cloudsync_memory_alloc((uint64_t)pklen);
2134+
if (!pks[pk_count]) { rc = DBRES_NOMEM; break; }
2135+
memcpy(pks[pk_count], pk, pklen);
2136+
pklens[pk_count] = pklen;
2137+
pk_count++;
2138+
}
2139+
2140+
databasevm_finalize(scan_vm);
2141+
cloudsync_memory_free(like_pattern); // safe to free after scan_vm is finalized
2142+
if (rc != DBRES_DONE && rc != DBRES_OK) {
2143+
for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
2144+
cloudsync_memory_free(pks);
2145+
cloudsync_memory_free(pklens);
2146+
return rc;
2147+
}
2148+
2149+
if (pk_count == 0) {
2150+
cloudsync_memory_free(pks);
2151+
cloudsync_memory_free(pklens);
2152+
return DBRES_OK;
2153+
}
2154+
2155+
// Phase 2: for each collected PK, read the column value, split into blocks,
2156+
// and insert into the blocks table + metadata using INSERT OR IGNORE.
2157+
2158+
char *meta_sql = cloudsync_memory_mprintf(SQL_META_INSERT_BLOCK_IGNORE, table->meta_ref);
2159+
if (!meta_sql) { rc = DBRES_NOMEM; goto cleanup_pks; }
2160+
dbvm_t *meta_vm = NULL;
2161+
rc = databasevm_prepare(data, meta_sql, &meta_vm, 0);
2162+
cloudsync_memory_free(meta_sql);
2163+
if (rc != DBRES_OK) goto cleanup_pks;
2164+
2165+
char *blocks_sql = cloudsync_memory_mprintf(SQL_BLOCKS_INSERT_IGNORE, table->blocks_ref);
2166+
if (!blocks_sql) { databasevm_finalize(meta_vm); rc = DBRES_NOMEM; goto cleanup_pks; }
2167+
dbvm_t *blocks_vm = NULL;
2168+
rc = databasevm_prepare(data, blocks_sql, &blocks_vm, 0);
2169+
cloudsync_memory_free(blocks_sql);
2170+
if (rc != DBRES_OK) { databasevm_finalize(meta_vm); goto cleanup_pks; }
2171+
2172+
dbvm_t *val_vm = (dbvm_t *)table_column_lookup(table, col_name, false, NULL);
2173+
2174+
for (int p = 0; p < pk_count; p++) {
2175+
const void *pk = pks[p];
2176+
size_t pklen = pklens[p];
2177+
2178+
if (!val_vm) continue;
2179+
2180+
// Read current column value from the base table
2181+
int bind_rc = pk_decode_prikey((char *)pk, pklen, pk_decode_bind_callback, (void *)val_vm);
2182+
if (bind_rc < 0) { databasevm_reset(val_vm); continue; }
2183+
2184+
int step_rc = databasevm_step(val_vm);
2185+
const char *text = (step_rc == DBRES_ROW) ? database_column_text(val_vm, 0) : NULL;
2186+
// Make a copy of text before resetting val_vm, as the pointer is only valid until reset
2187+
char *text_copy = text ? cloudsync_string_dup(text) : NULL;
2188+
databasevm_reset(val_vm);
2189+
2190+
if (!text_copy) continue; // NULL column value: nothing to migrate
2191+
2192+
// Split text into blocks and store each one
2193+
block_list_t *blocks = block_split(text_copy, delim);
2194+
cloudsync_memory_free(text_copy);
2195+
if (!blocks) continue;
2196+
2197+
char **positions = block_initial_positions(blocks->count);
2198+
if (positions) {
2199+
for (int b = 0; b < blocks->count; b++) {
2200+
char *block_cn = block_build_colname(col_name, positions[b]);
2201+
if (block_cn) {
2202+
// Metadata entry (skip if this block position already exists)
2203+
databasevm_bind_blob(meta_vm, 1, pk, (int)pklen);
2204+
databasevm_bind_text(meta_vm, 2, block_cn, -1);
2205+
databasevm_bind_int(meta_vm, 3, 1); // col_version = 1 (alive)
2206+
databasevm_bind_int(meta_vm, 4, db_version);
2207+
databasevm_bind_int(meta_vm, 5, cloudsync_bumpseq(data));
2208+
databasevm_step(meta_vm);
2209+
databasevm_reset(meta_vm);
2210+
2211+
// Block value (skip if this block position already exists)
2212+
databasevm_bind_blob(blocks_vm, 1, pk, (int)pklen);
2213+
databasevm_bind_text(blocks_vm, 2, block_cn, -1);
2214+
databasevm_bind_text(blocks_vm, 3, blocks->entries[b].content, -1);
2215+
databasevm_step(blocks_vm);
2216+
databasevm_reset(blocks_vm);
2217+
2218+
cloudsync_memory_free(block_cn);
2219+
}
2220+
cloudsync_memory_free(positions[b]);
2221+
}
2222+
cloudsync_memory_free(positions);
2223+
}
2224+
block_list_free(blocks);
2225+
}
2226+
2227+
databasevm_finalize(meta_vm);
2228+
databasevm_finalize(blocks_vm);
2229+
rc = DBRES_OK;
2230+
2231+
cleanup_pks:
2232+
for (int i = 0; i < pk_count; i++) cloudsync_memory_free(pks[i]);
2233+
cloudsync_memory_free(pks);
2234+
cloudsync_memory_free(pklens);
2235+
return rc;
2236+
}
2237+
20722238
int cloudsync_setup_block_column (cloudsync_context *data, const char *table_name, const char *col_name, const char *delimiter, bool persist) {
20732239
cloudsync_table_context *table = table_lookup(data, table_name);
20742240
if (!table) return cloudsync_set_error(data, "cloudsync_setup_block_column: table not found", DBRES_ERROR);
@@ -2148,6 +2314,13 @@ int cloudsync_setup_block_column (cloudsync_context *data, const char *table_nam
21482314
rc = dbutils_table_settings_set_key_value(data, table_name, col_name, "delimiter", delimiter);
21492315
if (rc != DBRES_OK) return rc;
21502316
}
2317+
2318+
// Migrate any existing tracked rows: populate the blocks table and metadata with
2319+
// block entries derived from the current column value, so that subsequent UPDATE
2320+
// operations can diff against the real existing state instead of treating everything
2321+
// as new, and so this node participates correctly in LWW conflict resolution.
2322+
rc = block_migrate_existing_rows(data, table, col_idx);
2323+
if (rc != DBRES_OK) return rc;
21512324
}
21522325

21532326
return DBRES_OK;

src/cloudsync.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
extern "C" {
1919
#endif
2020

21-
#define CLOUDSYNC_VERSION "1.0.12"
21+
#define CLOUDSYNC_VERSION "1.0.13"
2222
#define CLOUDSYNC_MAX_TABLENAME_LEN 512
2323

2424
#define CLOUDSYNC_VALUE_NOTSET -1

src/postgresql/sql_postgresql.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,3 +437,15 @@ const char * const SQL_BLOCKS_LIST_ALIVE =
437437
"WHERE b.pk = $1 AND b.col_name LIKE $2 "
438438
"AND m.pk = $3 AND m.col_name LIKE $4 AND m.col_version %% 2 = 1 "
439439
"ORDER BY b.col_name COLLATE \"C\"";
440+
441+
const char * const SQL_BLOCKS_INSERT_IGNORE =
442+
"INSERT INTO %s (pk, col_name, col_value) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING";
443+
444+
const char * const SQL_META_SCAN_COL_FOR_MIGRATION =
445+
"SELECT DISTINCT m.pk FROM %s m "
446+
"WHERE m.col_name = $1 AND m.col_version %% 2 = 1 "
447+
"AND NOT EXISTS (SELECT 1 FROM %s b WHERE b.pk = m.pk AND b.col_name LIKE $2)";
448+
449+
const char * const SQL_META_INSERT_BLOCK_IGNORE =
450+
"INSERT INTO %s (pk, col_name, col_version, db_version, seq, site_id) "
451+
"VALUES ($1, $2, $3, $4, $5, 0) ON CONFLICT DO NOTHING";

src/sql.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,8 @@ extern const char * const SQL_BLOCKS_UPSERT;
7474
extern const char * const SQL_BLOCKS_SELECT;
7575
extern const char * const SQL_BLOCKS_DELETE;
7676
extern const char * const SQL_BLOCKS_LIST_ALIVE;
77+
extern const char * const SQL_BLOCKS_INSERT_IGNORE;
78+
extern const char * const SQL_META_SCAN_COL_FOR_MIGRATION;
79+
extern const char * const SQL_META_INSERT_BLOCK_IGNORE;
7780

7881
#endif

src/sqlite/sql_sqlite.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,3 +304,15 @@ const char * const SQL_BLOCKS_LIST_ALIVE =
304304
"WHERE b.pk = ?1 AND b.col_name LIKE ?2 "
305305
"AND m.pk = ?3 AND m.col_name LIKE ?4 AND m.col_version %% 2 = 1 "
306306
"ORDER BY b.col_name";
307+
308+
const char * const SQL_BLOCKS_INSERT_IGNORE =
309+
"INSERT OR IGNORE INTO %s (pk, col_name, col_value) VALUES (?1, ?2, ?3)";
310+
311+
const char * const SQL_META_SCAN_COL_FOR_MIGRATION =
312+
"SELECT DISTINCT m.pk FROM %s m "
313+
"WHERE m.col_name = ?1 AND m.col_version %% 2 = 1 "
314+
"AND NOT EXISTS (SELECT 1 FROM %s b WHERE b.pk = m.pk AND b.col_name LIKE ?2)";
315+
316+
const char * const SQL_META_INSERT_BLOCK_IGNORE =
317+
"INSERT OR IGNORE INTO %s (pk, col_name, col_version, db_version, seq, site_id) "
318+
"VALUES (?1, ?2, ?3, ?4, ?5, 0)";
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
-- 'Block-level LWW: migration of existing tracked rows when algo=block is enabled'
2+
-- Mirrors the SQLite unit test: Block LWW Existing Data
3+
4+
\set testid '50'
5+
\ir helper_test_init.sql
6+
7+
\connect postgres
8+
\ir helper_psql_conn_setup.sql
9+
10+
DROP DATABASE IF EXISTS cloudsync_block_existing_a;
11+
CREATE DATABASE cloudsync_block_existing_a;
12+
13+
\connect cloudsync_block_existing_a
14+
\ir helper_psql_conn_setup.sql
15+
16+
CREATE EXTENSION IF NOT EXISTS cloudsync;
17+
18+
-- Create a table and init cloudsync WITHOUT block algo first
19+
DROP TABLE IF EXISTS docs;
20+
CREATE TABLE docs (id TEXT PRIMARY KEY NOT NULL, body TEXT);
21+
SELECT cloudsync_init('docs', 'CLS', 1) AS _init \gset
22+
23+
-- Insert rows BEFORE enabling block algorithm (they will be tracked as regular CLS rows)
24+
INSERT INTO docs (id, body) VALUES ('d1', E'Line1\nLine2\nLine3');
25+
INSERT INTO docs (id, body) VALUES ('d2', E'Alpha\nBeta');
26+
27+
-- Now enable block algo on the column that already has data
28+
SELECT cloudsync_set_column('docs', 'body', 'algo', 'block') AS _sc \gset
29+
30+
-- Test 1: Blocks table should have 5 entries (3 for d1, 2 for d2) immediately after set_column
31+
SELECT count(*) AS block_count FROM docs_cloudsync_blocks \gset
32+
SELECT (:block_count::int = 5) AS block_count_ok \gset
33+
\if :block_count_ok
34+
\echo [PASS] (:testid) Migration: 5 block entries after set_column on existing data
35+
\else
36+
\echo [FAIL] (:testid) Migration: expected 5 block entries, got :block_count
37+
SELECT (:fail::int + 1) AS fail \gset
38+
\endif
39+
40+
-- Test 2: Metadata should have 5 alive block entries
41+
SELECT count(*) AS meta_count FROM docs_cloudsync
42+
WHERE col_name LIKE 'body' || chr(31) || '%' AND col_version % 2 = 1 \gset
43+
SELECT (:meta_count::int = 5) AS meta_count_ok \gset
44+
\if :meta_count_ok
45+
\echo [PASS] (:testid) Migration: 5 alive block metadata entries
46+
\else
47+
\echo [FAIL] (:testid) Migration: expected 5 alive metadata entries, got :meta_count
48+
SELECT (:fail::int + 1) AS fail \gset
49+
\endif
50+
51+
-- Test 3: Calling set_column again should be idempotent (count stays at 5)
52+
SELECT cloudsync_set_column('docs', 'body', 'algo', 'block') AS _sc2 \gset
53+
54+
SELECT count(*) AS block_count2 FROM docs_cloudsync_blocks \gset
55+
SELECT (:block_count2::int = 5) AS idempotent_ok \gset
56+
\if :idempotent_ok
57+
\echo [PASS] (:testid) Migration: idempotent (still 5 blocks after second set_column)
58+
\else
59+
\echo [FAIL] (:testid) Migration: idempotency broken, got :block_count2 blocks (expected 5)
60+
SELECT (:fail::int + 1) AS fail \gset
61+
\endif
62+
63+
-- Test 4: UPDATE on d1 should still work correctly after migration
64+
UPDATE docs SET body = E'Line1\nLine2-edited\nLine3' WHERE id = 'd1';
65+
66+
SELECT count(*) AS block_count3 FROM docs_cloudsync_blocks \gset
67+
SELECT (:block_count3::int = 5) AS update_count_ok \gset
68+
\if :update_count_ok
69+
\echo [PASS] (:testid) Migration: 5 blocks after UPDATE (d1 edited in-place)
70+
\else
71+
\echo [FAIL] (:testid) Migration: expected 5 blocks after update, got :block_count3
72+
SELECT (:fail::int + 1) AS fail \gset
73+
\endif
74+
75+
-- Test 5: Materialized value should reflect the update
76+
SELECT cloudsync_text_materialize('docs', 'body', 'd1') AS _mat \gset
77+
78+
SELECT (body = E'Line1\nLine2-edited\nLine3') AS mat_ok FROM docs WHERE id = 'd1' \gset
79+
\if :mat_ok
80+
\echo [PASS] (:testid) Migration: materialized value correct after update
81+
\else
82+
\echo [FAIL] (:testid) Migration: materialized value mismatch
83+
SELECT (:fail::int + 1) AS fail \gset
84+
\endif
85+
86+
-- Cleanup
87+
\ir helper_test_cleanup.sql
88+
\if :should_cleanup
89+
DROP DATABASE IF EXISTS cloudsync_block_existing_a;
90+
\else
91+
\echo [INFO] !!!!!
92+
\endif

test/postgresql/full_test.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
\ir 47_row_filter_advanced.sql
5858
\ir 48_row_filter_multi_table.sql
5959
\ir 49_row_filter_prefill.sql
60+
\ir 50_block_lww_existing_data.sql
6061

6162
-- 'Test summary'
6263
\echo '\nTest summary:'

0 commit comments

Comments
 (0)