From fe09c2203b5e428936ed2cb443d6567428ba0482 Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Wed, 11 Jun 2025 16:44:31 -0400 Subject: [PATCH 1/3] Add User Defined Timestamps to Gorocksdb --- README.md | 2 +- column_family_ts.go | 30 +++++++++++++++++++ db_ts.go | 73 +++++++++++++++++++++++++++++++++++++++++++++ gorocksdb.c | 19 ++++++++++++ gorocksdb.h | 15 ++++++++++ 5 files changed, 138 insertions(+), 1 deletion(-) create mode 100644 column_family_ts.go create mode 100644 db_ts.go diff --git a/README.md b/README.md index b39aa2ba..d85dced0 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ You'll need to build [RocksDB](https://github.com/facebook/rocksdb) v5.16+ on yo After that, you can install gorocksdb using the following command: - CGO_CFLAGS="-I/path/to/rocksdb/include" \ + CGO_CFLAGS="-I/path/to/rocksdb/include -DUSE_USER_DEFINED_TIMESTAMP" \ CGO_LDFLAGS="-L/path/to/rocksdb -lrocksdb -lstdc++ -lm -lz -lbz2 -lsnappy -llz4 -lzstd" \ go get github.com/tecbot/gorocksdb diff --git a/column_family_ts.go b/column_family_ts.go new file mode 100644 index 00000000..f306fde9 --- /dev/null +++ b/column_family_ts.go @@ -0,0 +1,30 @@ +// column_family_ts.go +// +// Build only when RocksDB was compiled with USE_USER_DEFINED_TIMESTAMP=1. +//go:build rocksdb_user_timestamp +// +build rocksdb_user_timestamp + +package gorocksdb + +/* +#cgo CFLAGS: -DUSE_USER_DEFINED_TIMESTAMP +#include "rocksdb/c.h" +*/ +import "C" + +// EnableUserDefinedTimestamp turns the feature on/off for this column‑family +// *before* the CF is created. It maps straight to +// rocksdb_column_family_options_enable_user_defined_timestamp(). +func (opts *ColumnFamilyOptions) EnableUserDefinedTimestamp(enable bool) { + var flag C.uchar + if enable { + flag = 1 + } + C.rocksdb_column_family_options_enable_user_defined_timestamp(opts.c, flag) +} + +// SetTimestampSize sets the fixed length (in bytes) of the timestamp slice +// RocksDB will expect. Most users pick 8 for uint64 epoch microseconds. +func (opts *ColumnFamilyOptions) SetTimestampSize(size int) { + C.rocksdb_column_family_options_set_timestamp_size(opts.c, C.size_t(size)) +} diff --git a/db_ts.go b/db_ts.go new file mode 100644 index 00000000..4417b862 --- /dev/null +++ b/db_ts.go @@ -0,0 +1,73 @@ +//go:build rocksdb_user_timestamp +// +build rocksdb_user_timestamp + +package gorocksdb + +/* +#cgo CFLAGS: -DUSE_USER_DEFINED_TIMESTAMP +#include "rocksdb/c.h" +#include "gorocksdb.h" +*/ +import "C" +import ( + "unsafe" +) + +// PutCFWithTS stores the given key/value pair in the specified column family +// while attaching an explicit timestamp (ts). +// +// The timestamp slice is treated as opaque; its length must match the +// TimestampSize configured for the column family (e.g., 8 for uint64 epoch). +// The slice is not retained by RocksDB after the call returns. +func (db *DB) PutCFWithTS( + wo *WriteOptions, + cf *ColumnFamilyHandle, + key, ts, value []byte, +) error { + var cKey *C.char + if len(key) > 0 { + cKey = (*C.char)(unsafe.Pointer(&key[0])) + } + var cTs *C.char + if len(ts) > 0 { + cTs = (*C.char)(unsafe.Pointer(&ts[0])) + } + var cVal *C.char + if len(value) > 0 { + cVal = (*C.char)(unsafe.Pointer(&value[0])) + } + + cErr := C.gorocksdb_put_cf_with_ts( + db.c, wo.c, cf.c, + cKey, C.size_t(len(key)), + cTs, C.size_t(len(ts)), + cVal, C.size_t(len(value))) + return convertErr(cErr) +} + +// PutWithTS is the default‑CF convenience wrapper around PutCFWithTS. +func (db *DB) PutWithTS( + wo *WriteOptions, + key, ts, value []byte, +) error { + return db.PutCFWithTS(wo, db.defaultCF, key, ts, value) +} + +// IncreaseFullHistoryTsLow raises RocksDB’s low‑water mark for full‑history +// retention. After the call, any keys whose most recent timestamp is below +// the supplied value may be dropped during compaction. The timestamp must be +// monotonically increasing between calls per column family. +func (db *DB) IncreaseFullHistoryTsLow( + cf *ColumnFamilyHandle, + ts []byte, +) error { + var cTs *C.char + if len(ts) > 0 { + cTs = (*C.char)(unsafe.Pointer(&ts[0])) + } + + cErr := C.gorocksdb_increase_full_history_ts_low( + db.c, cf.c, + cTs, C.size_t(len(ts))) + return convertErr(cErr) +} diff --git a/gorocksdb.c b/gorocksdb.c index efebbe51..c39d6195 100644 --- a/gorocksdb.c +++ b/gorocksdb.c @@ -68,3 +68,22 @@ rocksdb_slicetransform_t* gorocksdb_slicetransform_create(uintptr_t idx) { (unsigned char (*)(void*, const char*, size_t))(gorocksdb_slicetransform_in_range), (const char* (*)(void*))(gorocksdb_slicetransform_name)); } + +/* gorocksdb.c */ +rocksdb_status_t* gorocksdb_put_cf_with_ts( + rocksdb_t* db, + const rocksdb_writeoptions_t* options, + rocksdb_column_family_handle_t* cf, + const char* key, size_t keylen, + const char* ts, size_t tslen, + const char* val, size_t vallen) { + return rocksdb_put_cf_with_ts(db, options, cf, key, keylen, ts, tslen, val, vallen); +} + +rocksdb_status_t* gorocksdb_increase_full_history_ts_low( + rocksdb_t* db, + rocksdb_column_family_handle_t* cf, + const char* ts, size_t tslen) { + return rocksdb_increase_full_history_ts_low(db, cf, ts, tslen); +} + diff --git a/gorocksdb.h b/gorocksdb.h index 4a9968f0..c02bb57c 100644 --- a/gorocksdb.h +++ b/gorocksdb.h @@ -28,3 +28,18 @@ extern void gorocksdb_mergeoperator_delete_value(void* state, const char* v, siz /* Slice Transform */ extern rocksdb_slicetransform_t* gorocksdb_slicetransform_create(uintptr_t idx); + +/* gorocksdb.h */ +extern rocksdb_status_t* gorocksdb_put_cf_with_ts( + rocksdb_t* db, + const rocksdb_writeoptions_t* options, + rocksdb_column_family_handle_t* cf, + const char* key, size_t keylen, + const char* ts, size_t tslen, + const char* val, size_t vallen); + +extern rocksdb_status_t* gorocksdb_increase_full_history_ts_low( + rocksdb_t* db, + rocksdb_column_family_handle_t* cf, + const char* ts, size_t tslen); + From f471fc96774b3581183b105a565200b3875b59ec Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Wed, 11 Jun 2025 16:55:37 -0400 Subject: [PATCH 2/3] Add Delete --- column_family_ts.go | 3 -- db_ts.go | 69 ++++++++++++++++++++++----------------------- gorocksdb.c | 8 ++++++ gorocksdb.h | 6 ++++ 4 files changed, 47 insertions(+), 39 deletions(-) diff --git a/column_family_ts.go b/column_family_ts.go index f306fde9..202cee00 100644 --- a/column_family_ts.go +++ b/column_family_ts.go @@ -1,6 +1,3 @@ -// column_family_ts.go -// -// Build only when RocksDB was compiled with USE_USER_DEFINED_TIMESTAMP=1. //go:build rocksdb_user_timestamp // +build rocksdb_user_timestamp diff --git a/db_ts.go b/db_ts.go index 4417b862..fc1fc9a1 100644 --- a/db_ts.go +++ b/db_ts.go @@ -15,59 +15,56 @@ import ( // PutCFWithTS stores the given key/value pair in the specified column family // while attaching an explicit timestamp (ts). -// -// The timestamp slice is treated as opaque; its length must match the -// TimestampSize configured for the column family (e.g., 8 for uint64 epoch). -// The slice is not retained by RocksDB after the call returns. func (db *DB) PutCFWithTS( wo *WriteOptions, cf *ColumnFamilyHandle, key, ts, value []byte, ) error { - var cKey *C.char - if len(key) > 0 { - cKey = (*C.char)(unsafe.Pointer(&key[0])) - } - var cTs *C.char - if len(ts) > 0 { - cTs = (*C.char)(unsafe.Pointer(&ts[0])) - } - var cVal *C.char - if len(value) > 0 { - cVal = (*C.char)(unsafe.Pointer(&value[0])) - } - cErr := C.gorocksdb_put_cf_with_ts( db.c, wo.c, cf.c, - cKey, C.size_t(len(key)), - cTs, C.size_t(len(ts)), - cVal, C.size_t(len(value))) + byteSliceToChar(key), C.size_t(len(key)), + byteSliceToChar(ts), C.size_t(len(ts)), + byteSliceToChar(value), C.size_t(len(value))) return convertErr(cErr) } -// PutWithTS is the default‑CF convenience wrapper around PutCFWithTS. -func (db *DB) PutWithTS( - wo *WriteOptions, - key, ts, value []byte, -) error { +// PutWithTS is the default‑CF convenience wrapper. +func (db *DB) PutWithTS(wo *WriteOptions, key, ts, value []byte) error { return db.PutCFWithTS(wo, db.defaultCF, key, ts, value) } -// IncreaseFullHistoryTsLow raises RocksDB’s low‑water mark for full‑history -// retention. After the call, any keys whose most recent timestamp is below -// the supplied value may be dropped during compaction. The timestamp must be -// monotonically increasing between calls per column family. -func (db *DB) IncreaseFullHistoryTsLow( +// DeleteCFWithTS marks the given key as deleted with the supplied timestamp. +// The semantics mirror PutCFWithTS but create a tombstone instead of a value. +func (db *DB) DeleteCFWithTS( + wo *WriteOptions, cf *ColumnFamilyHandle, - ts []byte, + key, ts []byte, ) error { - var cTs *C.char - if len(ts) > 0 { - cTs = (*C.char)(unsafe.Pointer(&ts[0])) - } + cErr := C.gorocksdb_delete_cf_with_ts( + db.c, wo.c, cf.c, + byteSliceToChar(key), C.size_t(len(key)), + byteSliceToChar(ts), C.size_t(len(ts))) + return convertErr(cErr) +} + +// DeleteWithTS is the default‑CF convenience wrapper. +func (db *DB) DeleteWithTS(wo *WriteOptions, key, ts []byte) error { + return db.DeleteCFWithTS(wo, db.defaultCF, key, ts) +} +// IncreaseFullHistoryTsLow raises RocksDB’s low‑water mark for full‑history +// retention for the specified column family. +func (db *DB) IncreaseFullHistoryTsLow(cf *ColumnFamilyHandle, ts []byte) error { cErr := C.gorocksdb_increase_full_history_ts_low( db.c, cf.c, - cTs, C.size_t(len(ts))) + byteSliceToChar(ts), C.size_t(len(ts))) return convertErr(cErr) } + +// byteSliceToChar safely converts a nil‑able Go []byte to *C.char. +func byteSliceToChar(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} diff --git a/gorocksdb.c b/gorocksdb.c index c39d6195..0b416e22 100644 --- a/gorocksdb.c +++ b/gorocksdb.c @@ -87,3 +87,11 @@ rocksdb_status_t* gorocksdb_increase_full_history_ts_low( return rocksdb_increase_full_history_ts_low(db, cf, ts, tslen); } +rocksdb_status_t* gorocksdb_delete_cf_with_ts( + rocksdb_t* db, + const rocksdb_writeoptions_t* options, + rocksdb_column_family_handle_t* cf, + const char* key, size_t keylen, + const char* ts, size_t tslen) { + return rocksdb_delete_cf_with_ts(db, options, cf, key, keylen, ts, tslen); +} diff --git a/gorocksdb.h b/gorocksdb.h index c02bb57c..4cf8ac93 100644 --- a/gorocksdb.h +++ b/gorocksdb.h @@ -43,3 +43,9 @@ extern rocksdb_status_t* gorocksdb_increase_full_history_ts_low( rocksdb_column_family_handle_t* cf, const char* ts, size_t tslen); +extern rocksdb_status_t* gorocksdb_delete_cf_with_ts( + rocksdb_t*, const rocksdb_writeoptions_t*, + rocksdb_column_family_handle_t*, + const char* key, size_t keylen, + const char* ts, size_t tslen); + From 0a2666da0c422e601fbb1c607cdef021e80769cb Mon Sep 17 00:00:00 2001 From: kbhat1 Date: Wed, 11 Jun 2025 17:10:01 -0400 Subject: [PATCH 3/3] ConvertERr --- db_ts.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/db_ts.go b/db_ts.go index fc1fc9a1..099b3f54 100644 --- a/db_ts.go +++ b/db_ts.go @@ -10,6 +10,7 @@ package gorocksdb */ import "C" import ( + "errors" "unsafe" ) @@ -68,3 +69,17 @@ func byteSliceToChar(b []byte) *C.char { } return (*C.char)(unsafe.Pointer(&b[0])) } + +// convertErr frees a rocksdb_status_t* and converts it to Go error +func convertErr(st *C.rocksdb_status_t) error { + if st == nil { + return nil + } + defer C.rocksdb_status_destroy(st) + + if C.rocksdb_status_code(st) == C.rocksdb_status_code_t(0) { + return nil + } + msg := C.GoString(C.rocksdb_status_to_string(st)) + return errors.New(msg) +}