From cf3b9dff7f49669a45fd7628d76d8ee66dc78080 Mon Sep 17 00:00:00 2001 From: "gd.zhou" Date: Sat, 23 May 2026 04:34:58 +0800 Subject: [PATCH 1/4] [feature] add gtid gap log --- src/Makefile | 4 ++-- src/config.c | 1 + src/server.c | 1 + src/server.h | 3 +++ src/xredis_adaptation_version.c | 1 + src/xredis_gtid_gap_log.c | 1 + tests/test_helper.tcl | 1 + 7 files changed, 10 insertions(+), 2 deletions(-) create mode 120000 src/xredis_adaptation_version.c create mode 120000 src/xredis_gtid_gap_log.c diff --git a/src/Makefile b/src/Makefile index 970374fe1b8..e53e3fbc3e9 100644 --- a/src/Makefile +++ b/src/Makefile @@ -221,7 +221,7 @@ ifdef OPENSSL_PREFIX endif # Include paths to dependencies -FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/xredis-gtid/include +FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/xredis-gtid/include ifdef SWAP FINAL_CFLAGS+= -I../deps/tdigest -I../deps/rocksdb/include endif @@ -322,7 +322,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) REDIS_SWAP_OBJ=ctrip_swap.o ctrip_swap_adlist.o ctrip_lru_cache.o ctrip_swap_async.o ctrip_swap_batch.o ctrip_swap_cmd.o ctrip_swap_data.o ctrip_swap_debug.o ctrip_swap_evict.o ctrip_swap_exec.o ctrip_swap_expire.o ctrip_swap_hash.o ctrip_swap_set.o ctrip_swap_list.o ctrip_swap_iter.o ctrip_swap_zset.o ctrip_swap_meta.o ctrip_swap_object.o ctrip_swap_rdb.o ctrip_swap_repl.o ctrip_swap_rio.o ctrip_swap_rocks.o ctrip_swap_stat.o ctrip_swap_sync.o ctrip_swap_thread.o ctrip_swap_util.o ctrip_swap_lock.o ctrip_swap_string.o ctrip_swap_bitmap.o ctrip_swap_compact.o ctrip_swap_slowlog.o ctrip_swap_blocked.o ctrip_cuckoo_hash.o ctrip_cuckoo_filter.o ctrip_swap_filter.o ctrip_absent_cache.o ctrip_swap_load.o ctrip_swap_dirty.o ctrip_swap_persist.o ctrip_roaring_bitmap.o ctrip_swap_rordb.o ctrip_wtdigest.o ctrip_swap_server.o -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o ctrip.o xredis_gtid.o xredis_gtid_aof.o xredis_gtid_repl.o xredis_gtid_rs.o xredis_gtid_rdb.o ctrip_heartbeat.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o ctrip.o xredis_gtid.o xredis_gtid_aof.o xredis_gtid_repl.o xredis_gtid_rs.o xredis_gtid_rdb.o xredis_gtid_gap_log.o xredis_adaptation_version.o ctrip_heartbeat.o ifdef SWAP REDIS_SERVER_OBJ+= $(REDIS_SWAP_OBJ) diff --git a/src/config.c b/src/config.c index 489a2cfbdfd..4cd79e15641 100644 --- a/src/config.c +++ b/src/config.c @@ -3190,6 +3190,7 @@ standardConfig configs[] = { /* ctrip configs */ createBoolConfig("gtid-enabled", NULL, MODIFIABLE_CONFIG, server.gtid_enabled, 0, NULL, updateGtidEnabled), + createBoolConfig("gtid-gaplog-enabled", NULL, MODIFIABLE_CONFIG, server.gtid_gaplog_enabled, 1, NULL, NULL), #ifdef USE_OPENSSL createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.tls_port, 0, INTEGER_CONFIG, NULL, updateTLSPort), /* TCP port. */ diff --git a/src/server.c b/src/server.c index 203d68a09bd..ca50543d8e0 100644 --- a/src/server.c +++ b/src/server.c @@ -3521,6 +3521,7 @@ void initServer(void) { server.gtid_lost = gtidSetNew(); xsyncUuidInterestedInit(); gtidInitialInfoInit(server.gtid_initial); + server.gtid_gap_log = createGtidGapLog(); server.gtid_xsync_fullresync_indicator = 0; server.gtid_executed_cmd_count = 0; server.gtid_ignored_cmd_count = 0; diff --git a/src/server.h b/src/server.h index 3c74272f8d6..bb65c998923 100644 --- a/src/server.h +++ b/src/server.h @@ -1741,6 +1741,9 @@ struct redisServer { long long gtid_ignored_cmd_count; long long gtid_executed_cmd_count; long long gtid_sync_stat[GTID_SYNC_TYPES]; + int gtid_gaplog_enabled; + gtidGapLog* gtid_gap_log; + /* importing mode */ mstime_t importing_end_time; /* in milliseconds */ int importing_expire_enabled; diff --git a/src/xredis_adaptation_version.c b/src/xredis_adaptation_version.c new file mode 120000 index 00000000000..324fe6944cd --- /dev/null +++ b/src/xredis_adaptation_version.c @@ -0,0 +1 @@ +../deps/xredis-gtid/xredis/xredis_adaptation_version.c \ No newline at end of file diff --git a/src/xredis_gtid_gap_log.c b/src/xredis_gtid_gap_log.c new file mode 120000 index 00000000000..d9bf28f3f7b --- /dev/null +++ b/src/xredis_gtid_gap_log.c @@ -0,0 +1 @@ +../deps/xredis-gtid/xredis/xredis_gtid_gap_log.c \ No newline at end of file diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index dd20be9a60f..09721f21980 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -18,6 +18,7 @@ set ::gtid_tests { gtid/replication-psync gtid/sync gtid/xsync + gtid/gaplog } set ::all_tests { From 0f137be7d72e463363b1372c4e46bb618d2e7b79 Mon Sep 17 00:00:00 2001 From: "gd.zhou" Date: Mon, 25 May 2026 15:59:29 +0800 Subject: [PATCH 2/4] [feature] redisCommand add parse key/subkey function (only gtid) --- src/Makefile | 6 +- src/server.c | 3 + src/server.h | 5 + src/xredis_cmdparse.c | 266 ++++++++++++++++++++++++++++++++++++++++ src/xredis_commands.def | 1 + tests/test_helper.tcl | 2 + 6 files changed, 281 insertions(+), 2 deletions(-) create mode 100644 src/xredis_cmdparse.c create mode 120000 src/xredis_commands.def diff --git a/src/Makefile b/src/Makefile index e53e3fbc3e9..946c60deaeb 100644 --- a/src/Makefile +++ b/src/Makefile @@ -221,7 +221,7 @@ ifdef OPENSSL_PREFIX endif # Include paths to dependencies -FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/xredis-gtid/include +FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src -I../deps/hdr_histogram -I../deps/xredis-gtid/include -I../deps/xredis-gtid/xredis ifdef SWAP FINAL_CFLAGS+= -I../deps/tdigest -I../deps/rocksdb/include endif @@ -302,6 +302,8 @@ ifdef SWAP FINAL_CFLAGS+=-DENABLE_SWAP=1 endif +FINAL_CFLAGS+=-DENABLE_CMDPARSE=1 + REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS) REDIS_LD=$(QUIET_LINK)$(CC) $(FINAL_LDFLAGS) REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL) @@ -322,7 +324,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) REDIS_SWAP_OBJ=ctrip_swap.o ctrip_swap_adlist.o ctrip_lru_cache.o ctrip_swap_async.o ctrip_swap_batch.o ctrip_swap_cmd.o ctrip_swap_data.o ctrip_swap_debug.o ctrip_swap_evict.o ctrip_swap_exec.o ctrip_swap_expire.o ctrip_swap_hash.o ctrip_swap_set.o ctrip_swap_list.o ctrip_swap_iter.o ctrip_swap_zset.o ctrip_swap_meta.o ctrip_swap_object.o ctrip_swap_rdb.o ctrip_swap_repl.o ctrip_swap_rio.o ctrip_swap_rocks.o ctrip_swap_stat.o ctrip_swap_sync.o ctrip_swap_thread.o ctrip_swap_util.o ctrip_swap_lock.o ctrip_swap_string.o ctrip_swap_bitmap.o ctrip_swap_compact.o ctrip_swap_slowlog.o ctrip_swap_blocked.o ctrip_cuckoo_hash.o ctrip_cuckoo_filter.o ctrip_swap_filter.o ctrip_absent_cache.o ctrip_swap_load.o ctrip_swap_dirty.o ctrip_swap_persist.o ctrip_roaring_bitmap.o ctrip_swap_rordb.o ctrip_wtdigest.o ctrip_swap_server.o -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o ctrip.o xredis_gtid.o xredis_gtid_aof.o xredis_gtid_repl.o xredis_gtid_rs.o xredis_gtid_rdb.o xredis_gtid_gap_log.o xredis_adaptation_version.o ctrip_heartbeat.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o ctrip.o xredis_gtid.o xredis_gtid_aof.o xredis_gtid_repl.o xredis_gtid_rs.o xredis_gtid_rdb.o xredis_gtid_gap_log.o xredis_adaptation_version.o ctrip_heartbeat.o xredis_cmdparse.o ifdef SWAP REDIS_SERVER_OBJ+= $(REDIS_SWAP_OBJ) diff --git a/src/server.c b/src/server.c index ca50543d8e0..27bad55a938 100644 --- a/src/server.c +++ b/src/server.c @@ -3521,6 +3521,9 @@ void initServer(void) { server.gtid_lost = gtidSetNew(); xsyncUuidInterestedInit(); gtidInitialInfoInit(server.gtid_initial); +#ifdef ENABLE_CMDPARSE + cmdParseBindToCommands(); +#endif server.gtid_gap_log = createGtidGapLog(); server.gtid_xsync_fullresync_indicator = 0; server.gtid_executed_cmd_count = 0; diff --git a/src/server.h b/src/server.h index bb65c998923..d2ee2435d3b 100644 --- a/src/server.h +++ b/src/server.h @@ -1798,6 +1798,11 @@ struct redisCommand { ACLs. A connection is able to execute a given command if the user associated to the connection has this command bit set in the bitmap of allowed commands. */ +#ifdef ENABLE_CMDPARSE + /* 供 swap 和 gtid 共用的命令 key 解析接口(放在末尾以避免破坏命令表的位置初始化) */ + int (*cmdparse_count)(robj **argv, int argc); + void (*cmdparse_parse)(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key); +#endif }; struct redisError { diff --git a/src/xredis_cmdparse.c b/src/xredis_cmdparse.c new file mode 100644 index 00000000000..b4475a444c2 --- /dev/null +++ b/src/xredis_cmdparse.c @@ -0,0 +1,266 @@ +#include "xredis_cmdparse.h" +#include "server.h" + +/* ================================================================ + * 辅助函数:sds 转大写(server.commands 的 key 为大写) + * ================================================================ */ +static sds sdsdupupper(sds s) { + sds result = sdsdup(s); + for (size_t i = 0; i < sdslen(result); i++) { + result[i] = toupper((unsigned char)result[i]); + } + return result; +} + +/* ================================================================ + * 各命令的 count / parse 实现 + * ================================================================ */ + +/* --- del / unlink:多个 key --- */ +static int cmdCountDel(robj **argv, int argc) { + UNUSED(argv); + return argc > 1 ? argc - 1 : 0; +} +static void cmdParseDel(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + for (int i = 1; i < argc; i++) { + on_key(ctx, OBJ_UNKNOWN, i, 0, 0, 0, NULL); + } +} + +/* --- 单个 key,无 subkeys(通过命令名推断类型)--- */ +static int cmdCountSingleKey(robj **argv, int argc) { + UNUSED(argv); UNUSED(argc); + return 1; +} +static int cmdKeyTypeFromCommand(struct redisCommand *cmd) { + if (cmd->flags & CMD_CATEGORY_STRING) return OBJ_STRING; + if (cmd->flags & CMD_CATEGORY_LIST) return OBJ_LIST; + if (cmd->flags & CMD_CATEGORY_HASH) return OBJ_HASH; + if (cmd->flags & CMD_CATEGORY_SET) return OBJ_SET; + if (cmd->flags & CMD_CATEGORY_SORTEDSET) return OBJ_ZSET; + if (cmd->flags & CMD_CATEGORY_BITMAP) return OBJ_STRING; + return OBJ_UNKNOWN; +} +static void cmdParseSingleKey(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); UNUSED(argc); + on_key(ctx, cmdKeyTypeFromCommand(cmd), 1, 0, 0, 0, NULL); +} + +/* --- mset / msetnx:多个 key,间隔出现 --- */ +static int cmdCountMset(robj **argv, int argc) { + UNUSED(argv); + return argc > 1 ? (argc - 1) / 2 : 0; +} +static void cmdParseMset(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); UNUSED(argv); + for (int i = 1; i < argc; i += 2) { + on_key(ctx, OBJ_STRING, i, 0, 0, 0, NULL); + } +} + +/* --- hset / hmset:单个 key + subkeys(field), 步长为2, 从argv[2]开始 --- */ +static void cmdParseHset(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + int subkeys_count = (argc - 2) / 2; + on_key(ctx, OBJ_HASH, 1, subkeys_count, 2, 2, NULL); +} + +/* --- hdel:单个 key + 多个 subkeys, 步长为1, 从argv[2]开始 --- */ +static void cmdParseHdel(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + int subkeys_count = argc - 2; + on_key(ctx, OBJ_HASH, 1, subkeys_count, 2, 1, NULL); +} + +/* --- hsetnx / hincrby / hincrbyfloat:单个 key + 1 个 subkey --- */ +static void cmdParseHsetnx(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + if (argc >= 3) { + on_key(ctx, OBJ_HASH, 1, 1, 2, 1, NULL); + } else { + on_key(ctx, OBJ_HASH, 1, 0, 0, 0, NULL); + } +} + +/* --- sadd / srem / spop:单个 key + 多个 subkeys, 步长为1, 从argv[2]开始 --- */ +static void cmdParseSadd(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + int subkeys_count = argc - 2; + on_key(ctx, OBJ_SET, 1, subkeys_count, 2, 1, NULL); +} + +/* --- smove:2 个 key,各带 1 个 subkey(member=argv[3])--- */ +static int cmdCountRename(robj **argv, int argc) { + UNUSED(argv); + return argc >= 3 ? 2 : (argc >= 2 ? 1 : 0); +} +static void cmdParseSmove(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); UNUSED(argv); + if (argc >= 4) { + on_key(ctx, OBJ_SET, 1, 1, 3, 1, NULL); + on_key(ctx, OBJ_SET, 2, 1, 3, 1, NULL); + } else if (argc >= 3) { + on_key(ctx, OBJ_SET, 1, 0, 0, 0, NULL); + on_key(ctx, OBJ_SET, 2, 0, 0, 0, NULL); + } else if (argc >= 2) { + on_key(ctx, OBJ_SET, 1, 0, 0, 0, NULL); + } +} + +/* --- zadd:单个 key + member subkeys, 步长为2 --- */ +static void cmdParseZadd(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + int i = 2; + while (i < argc) { + sds arg = (sds)argv[i]->ptr; + if (!strcasecmp(arg, "nx") || !strcasecmp(arg, "xx") || + !strcasecmp(arg, "ch") || !strcasecmp(arg, "incr") || + !strcasecmp(arg, "gt") || !strcasecmp(arg, "lt")) { + i++; + } else { + break; + } + } + int subkeys_count = (argc - i) / 2; + on_key(ctx, OBJ_ZSET, 1, subkeys_count, i + 1, 2, NULL); +} + +/* --- zrem:单个 key + 多个 subkeys, 步长为1, 从argv[2]开始 --- */ +static void cmdParseZrem(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + int subkeys_count = argc - 2; + on_key(ctx, OBJ_ZSET, 1, subkeys_count, 2, 1, NULL); +} + +/* --- zincrby:单个 key + 1 个 subkey --- */ +static void cmdParseZincrby(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + if (argc >= 4) { + on_key(ctx, OBJ_ZSET, 1, 1, 3, 1, NULL); + } else { + on_key(ctx, OBJ_ZSET, 1, 0, 0, 0, NULL); + } +} + +/* --- rename / renamenx:2 个 key --- */ +static void cmdParseRename(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); + if (argc >= 3) { + on_key(ctx, OBJ_UNKNOWN, 1, 0, 0, 0, NULL); + on_key(ctx, OBJ_UNKNOWN, 2, 0, 0, 0, NULL); + } else if (argc >= 2) { + on_key(ctx, OBJ_UNKNOWN, 1, 0, 0, 0, NULL); + } +} + +/* --- bitop:argv[1]=操作符, argv[2+]=key(参考 server 表 firstkey=2)--- */ +static int cmdCountBitOp(robj **argv, int argc) { + UNUSED(argv); + return argc > 2 ? argc - 2 : 0; +} +static void cmdParseBitOp(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); UNUSED(argv); + for (int i = 2; i < argc; i++) { + on_key(ctx, OBJ_STRING, i, 0, 0, 0, NULL); + } +} + +/* --- zunionstore / zinterstore / zdiffstore:dest + numkeys 个 key --- */ +static int cmdCountZstore(robj **argv, int argc) { + UNUSED(argv); + if (argc < 4) return argc >= 2 ? 1 : 0; + long long numkeys = 0; + if (getLongLongFromObject(argv[2], &numkeys) != C_OK || numkeys < 1) { + return 1; /* dest only */ + } + return 1 + (int)numkeys; +} +static void cmdParseZstore(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + UNUSED(dbid); UNUSED(argv); + if (argc < 2) return; + on_key(ctx, OBJ_ZSET, 1, 0, 0, 0, NULL); /* dest */ + if (argc < 4) return; + long long numkeys = 0; + if (getLongLongFromObject(argv[2], &numkeys) != C_OK || numkeys < 1) return; + int max_keys = argc - 3; + if (numkeys > max_keys) numkeys = max_keys; + for (long long i = 0; i < numkeys; i++) { + on_key(ctx, OBJ_ZSET, 3 + (int)i, 0, 0, 0, NULL); + } +} + +/* ================================================================ + * 引入由 generate_cmdparse_commands.py 自动生成的命令注册表 + * ================================================================ */ +#include "xredis_commands.def" + +/* ================================================================ + * 公共接口:通过 server.commands 查找 + * ================================================================ */ + +/* 计算命令中包含的 key 条目数(通过 lookupCommand 从 server.commands 取) */ +int cmdParseCountKeys(robj **argv, int argc) { + if (argc < 1) return 0; + sds cmd_upper = sdsdupupper((sds)argv[0]->ptr); + struct redisCommand *cmd = lookupCommand(cmd_upper); + sdsfree(cmd_upper); + if (cmd != NULL && cmd->cmdparse_count != NULL) { + return cmd->cmdparse_count(argv, argc); + } + /* unknown command fallback */ + return argc >= 2 ? 1 : 0; +} + +/* 解析命令,通过回调通知每个 key 的位置描述(通过 lookupCommand 从 server.commands 取) */ +void cmdParseKeys(int dbid, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + if (argc < 1) return; + sds cmd_upper = sdsdupupper((sds)argv[0]->ptr); + struct redisCommand *cmd = lookupCommand(cmd_upper); + sdsfree(cmd_upper); + if (cmd != NULL && cmd->cmdparse_parse != NULL) { + cmd->cmdparse_parse(dbid, cmd, argv, argc, ctx, on_key); + } else if (argc >= 2) { + /* unknown command fallback */ + serverLog(LL_WARNING, "Unknown command '%s' for key propagation", (sds)argv[0]->ptr); + // on_key(ctx, OBJ_UNKNOWN, 1, 0, 0, 0, NULL); + serverPanic("unknown command fallback"); + } +} + +/* 绑定 cmdparse 函数到 server.commands 中的所有命令 */ +void cmdParseBindToCommands(void) { + int i; + for (i = 0; cmd_parse_commands[i].name != NULL; i++) { + sds name = sdsnew(cmd_parse_commands[i].name); + for (size_t j = 0; j < sdslen(name); j++) { + name[j] = toupper((unsigned char)name[j]); + } + struct redisCommand *cmd = lookupCommand(name); + sdsfree(name); + if (cmd != NULL) { + cmd->cmdparse_count = cmd_parse_commands[i].count; + cmd->cmdparse_parse = cmd_parse_commands[i].parse; + } + } +} + +/* 通过命令名查找 count 函数(从 server.commands 取) */ +int (*cmdParseGetCountFunc(const char *cmd_name))(robj **argv, int argc) { + if (cmd_name == NULL) return NULL; + struct redisCommand *cmd = lookupCommandByCString(cmd_name); + if (cmd != NULL) { + return cmd->cmdparse_count; + } + return NULL; +} + +/* 通过命令名查找 parse 函数(从 server.commands 取) */ +void (*cmdParseGetParseFunc(const char *cmd_name))(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { + if (cmd_name == NULL) return NULL; + struct redisCommand *cmd = lookupCommandByCString(cmd_name); + if (cmd != NULL) { + return cmd->cmdparse_parse; + } + return NULL; +} diff --git a/src/xredis_commands.def b/src/xredis_commands.def new file mode 120000 index 00000000000..3853a467905 --- /dev/null +++ b/src/xredis_commands.def @@ -0,0 +1 @@ +../deps/xredis-gtid/xredis/xredis_commands.def \ No newline at end of file diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 09721f21980..e7a4a6585df 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -19,6 +19,8 @@ set ::gtid_tests { gtid/sync gtid/xsync gtid/gaplog + gtid/gaplog_commands + gtid/gaplog_write_commands } set ::all_tests { From 92f75263706c40e2b95b3fe23d4a28bb891df259 Mon Sep 17 00:00:00 2001 From: "gd.zhou" Date: Mon, 25 May 2026 21:16:02 +0800 Subject: [PATCH 3/4] [feature] swap change to parse key/subkey --- deps/xredis-gtid | 2 +- src/ctrip_swap.h | 33 +--- src/ctrip_swap_cmd.c | 348 ++++++++++++++++++++++-------------------- src/server.c | 2 +- src/server.h | 2 - src/xredis_cmdparse.c | 267 +------------------------------- 6 files changed, 186 insertions(+), 468 deletions(-) mode change 100644 => 120000 src/xredis_cmdparse.c diff --git a/deps/xredis-gtid b/deps/xredis-gtid index e642bd25ac3..7d8aa82656f 160000 --- a/deps/xredis-gtid +++ b/deps/xredis-gtid @@ -1 +1 @@ -Subproject commit e642bd25ac3a282b04472834dcb2928b8cbbcf47 +Subproject commit 7d8aa82656f7ebdbd39a2b84d9a1b7456290c1a9 diff --git a/src/ctrip_swap.h b/src/ctrip_swap.h index e720b74f101..feb467e5bf2 100644 --- a/src/ctrip_swap.h +++ b/src/ctrip_swap.h @@ -244,22 +244,6 @@ int getKeyRequestsMetaScan(int dbid, struct redisCommand *cmd, robj **argv, int int getKeyRequestsSort(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -#define getKeyRequestsHsetnx getKeyRequestsHset -#define getKeyRequestsHget getKeyRequestsHmget -#define getKeyRequestsHdel getKeyRequestsHmget -#define getKeyRequestsHstrlen getKeyRequestsHmget -#define getKeyRequestsHincrby getKeyRequestsHget -#define getKeyRequestsHincrbyfloat getKeyRequestsHmget -#define getKeyRequestsHexists getKeyRequestsHmget -int getKeyRequestsHset(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsHmget(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsHlen(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); - -#define getKeyRequestsSadd getKeyRequestSmembers -#define getKeyRequestsSrem getKeyRequestSmembers -#define getKeyRequestsSdiffstore getKeyRequestsSinterstore -#define getKeyRequestsSunionstore getKeyRequestsSinterstore -int getKeyRequestSmembers(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestSmove(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsSinterstore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); @@ -278,18 +262,14 @@ int getKeyRequestsLindex(int dbid, struct redisCommand *cmd, robj **argv, int ar int getKeyRequestsLrange(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsLtrim(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsZAdd(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsZScore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsZMScore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsZincrby(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsZrange(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsZrangestore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsSinterstore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsZpopMin(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsZpopMax(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); +int getKeyRequestsBzpopMin(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); +int getKeyRequestsBzpopMax(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsZrangeByScore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsZrevrangeByScore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsZremRangeByScore1(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); #define getKeyRequestsZremRangeByScore getKeyRequestsZrangeByScore int getKeyRequestsZrevrangeByLex(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsZrangeByLex(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); @@ -298,16 +278,10 @@ int getKeyRequestsZlexCount(int dbid, struct redisCommand *cmd, robj **argv, int #define getKeyRequestsSdiffstore getKeyRequestsSinterstore #define getKeyRequestsSunionstore getKeyRequestsSinterstore -#define getKeyRequestsZrem getKeyRequestsZScore -int getKeyRequestsGeoAdd(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsGeoRadius(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsGeoHash(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsGeoDist(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsGeoSearch(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsGeoSearchStore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); #define getKeyRequestsGeoRadiusByMember getKeyRequestsGeoRadius -#define getKeyRequestsGeoPos getKeyRequestsGeoHash int getKeyRequestsGtid(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); @@ -317,9 +291,6 @@ int getKeyRequestsGetbit(int dbid, struct redisCommand *cmd, robj **argv, int ar int getKeyRequestsBitcount(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsBitpos(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsBitop(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); -int getKeyRequestsBitField(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); - -int getKeyRequestsMemory(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); int getKeyRequestsMemory(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result); diff --git a/src/ctrip_swap_cmd.c b/src/ctrip_swap_cmd.c index 6d6453a6b23..fd84d8c10dc 100644 --- a/src/ctrip_swap_cmd.c +++ b/src/ctrip_swap_cmd.c @@ -29,6 +29,7 @@ #include "ctrip_swap.h" #include #include "slowlog.h" +#include "xredis_cmdparse.h" struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"module",moduleCommand,-2, @@ -95,11 +96,11 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"bitfield",bitfieldCommand,-2, "write use-memory @bitmap @swap_bitmap", - 0,NULL,getKeyRequestsBitField,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"bitfield_ro",bitfieldroCommand,-2, "read-only fast @bitmap @swap_bitmap", - 0,NULL,getKeyRequestsBitField,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"setrange",setrangeCommand,4, "write use-memory @string @swap_string", @@ -207,11 +208,11 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"sadd",saddCommand,-3, "write use-memory fast @set @swap_set", - 0,NULL,getKeyRequestsSadd,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"srem",sremCommand,-3, "write fast @set @swap_set", - 0,NULL,getKeyRequestsSrem,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, /* smove cmd intention flags set by getKeyRequestSmove */ {"smove",smoveCommand,4, @@ -220,11 +221,11 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"sismember",sismemberCommand,3, "read-only fast @set @swap_set", - 0,NULL,getKeyRequestSmembers,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"smismember",smismemberCommand,-3, "read-only fast @set @swap_set", - 0,NULL,getKeyRequestSmembers,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"scard",swap_scardCommand,2, "read-only fast @set @swap_set", @@ -272,15 +273,15 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { /* (zset type) write command flag should be SWAP_IN_DEL, Because the index (score_cf data) needs to be deleted */ {"zadd",zaddCommand,-4, "write use-memory fast @sortedset @swap_zset", - 0,NULL,getKeyRequestsZAdd,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, {"zincrby",zincrbyCommand,4, "write use-memory fast @sortedset @swap_zset", - 0,NULL,getKeyRequestsZincrby,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, {"zrem",zremCommand,-3, "write fast @sortedset @swap_zset", - 0,NULL,getKeyRequestsZrem,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, {"zremrangebyscore",zremrangebyscoreCommand,4, "write @sortedset @swap_zset", @@ -360,11 +361,11 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"zscore",zscoreCommand,3, "read-only fast @sortedset @swap_zset", - 0,NULL,getKeyRequestsZScore,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"zmscore",zmscoreCommand,-3, "read-only fast @sortedset @swap_zset", - 0,NULL,getKeyRequestsZMScore,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"zrank",zrankCommand,3, "read-only fast @sortedset @swap_zset", @@ -380,19 +381,19 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"zpopmin",zpopminCommand,-2, "write fast @sortedset @swap_zset", - 0,NULL,NULL,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, + 0,NULL,getKeyRequestsZpopMin,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, {"zpopmax",zpopmaxCommand,-2, "write fast @sortedset @swap_zset", - 0,NULL,NULL,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, + 0,NULL,getKeyRequestsZpopMax,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, {"bzpopmin",bzpopminCommand,-3, "write no-script fast @sortedset @blocking @swap_zset", - 0,NULL,getKeyRequestsZpopMin,SWAP_IN,SWAP_IN_DEL,1,-2,1,0,0,0}, + 0,NULL,getKeyRequestsBzpopMin,SWAP_IN,SWAP_IN_DEL,1,-2,1,0,0,0}, {"bzpopmax",bzpopmaxCommand,-3, "write no-script fast @sortedset @blocking @swap_zset", - 0,NULL,getKeyRequestsZpopMax,SWAP_IN,SWAP_IN_DEL,1,-2,1,0,0,0}, + 0,NULL,getKeyRequestsBzpopMax,SWAP_IN,SWAP_IN_DEL,1,-2,1,0,0,0}, {"zrandmember",zrandmemberCommand,-2, "read-only random @sortedset @swap_zset", @@ -400,35 +401,35 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"hset",hsetCommand,-4, "write use-memory fast @hash @swap_hash", - 0,NULL,getKeyRequestsHset,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hsetnx",hsetnxCommand,4, "write use-memory fast @hash @swap_hash", - 0,NULL,getKeyRequestsHsetnx,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hget",hgetCommand,3, "read-only fast @hash @swap_hash", - 0,NULL,getKeyRequestsHget,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hmset",hsetCommand,-4, "write use-memory fast @hash @swap_hash", - 0,NULL,getKeyRequestsHset,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hmget",hmgetCommand,-3, "read-only fast @hash @swap_hash", - 0,NULL,getKeyRequestsHmget,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hincrby",hincrbyCommand,4, "write use-memory fast @hash @swap_hash", - 0,NULL,getKeyRequestsHincrby,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hincrbyfloat",hincrbyfloatCommand,4, "write use-memory fast @hash @swap_hash", - 0,NULL,getKeyRequestsHincrbyfloat,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hdel",hdelCommand,-3, "write fast @hash @swap_hash", - 0,NULL,getKeyRequestsHdel,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, {"hlen",hlenCommand,2, "read-only fast @hash @swap_hash", @@ -436,7 +437,7 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"hstrlen",hstrlenCommand,3, "read-only fast @hash @swap_hash", - 0,NULL,getKeyRequestsHstrlen,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hkeys",hkeysCommand,2, "read-only to-sort @hash @swap_hash", @@ -452,7 +453,7 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"hexists",hexistsCommand,3, "read-only fast @hash @swap_hash", - 0,NULL,getKeyRequestsHexists,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"hrandfield",hrandfieldCommand,-2, "read-only random @hash @swap_hash", @@ -803,7 +804,7 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"geoadd",geoaddCommand,-5, "write use-memory @geo @swap_zset", - 0,NULL,getKeyRequestsGeoAdd,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,SWAP_IN_DEL,1,1,1,0,0,0}, /* GEORADIUS has store options that may write. */ {"georadius",georadiusCommand,-6, @@ -824,15 +825,15 @@ struct redisCommand redisCommandTable[SWAP_CMD_COUNT] = { {"geohash",geohashCommand,-2, "read-only @geo @swap_zset", - 0,NULL,getKeyRequestsGeoHash,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"geopos",geoposCommand,-2, "read-only @geo @swap_zset", - 0,NULL,getKeyRequestsGeoPos,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"geodist",geodistCommand,-4, "read-only @geo @swap_zset", - 0,NULL,getKeyRequestsGeoDist,SWAP_IN,0,1,1,1,0,0,0}, + 0,NULL,NULL,SWAP_IN,0,1,1,1,0,0,0}, {"geosearch",geosearchCommand,-7, "read-only @geo @swap_zset", @@ -1354,32 +1355,67 @@ void getKeyRequestsFreeResult(getKeyRequestsResult *result) { } } +/* ====== swapOnKey bridge: cmdparse → getKeyRequestsResult ====== */ + +typedef struct { + getKeyRequestsResult *result; +} swapCmdParseCtx; + + +/* cmdparse key/subkey => getKeyRequestsResult */ +static void swapOnKey(void *ctx, int dbid, struct redisCommand* cmd, robj** argv, int argc, int key_arg_idx, + int subkeys_count, int subkeys_start, + int subkeys_step, const int *subkey_arg_idxs, + const cmdParseKeyExtra *extra) { + swapCmdParseCtx *sctx = (swapCmdParseCtx *)ctx; + getKeyRequestsResult *result = sctx->result; + int intention = cmd->intention; + int intention_flags = cmd->intention_flags; + uint64_t cmd_flags = cmd->flags; + + + if (subkeys_count > 0) { + robj *key = argv[key_arg_idx]; + incrRefCount(key); + robj **subkeys = zmalloc(subkeys_count * sizeof(robj *)); + for (int i = 0; i < subkeys_count; i++) { + int idx = subkey_arg_idxs ? subkey_arg_idxs[i] + : subkeys_start + i * subkeys_step; + robj *subkey = argv[idx]; + incrRefCount(subkey); + subkeys[i] = subkey; + } + getKeyRequestsAppendSubkeyResult(result, REQUEST_LEVEL_KEY, key, + subkeys_count, subkeys, + intention, intention_flags, cmd_flags, dbid); + return; + } + + /* only key */ + robj *key = argv[key_arg_idx]; + incrRefCount(key); + getKeyRequestsAppendSubkeyResult(result, REQUEST_LEVEL_KEY, key, + 0, NULL, intention, intention_flags, cmd_flags, dbid); + +} + /* NOTE that result.{key,subkeys} are ONLY REFS to client argv (since client * outlives getKeysResult if no swap action happend. key, subkey will be * copied (using incrRefCount) when async swap acutally proceed. */ static int _getSingleCmdKeyRequests(int dbid, struct redisCommand* cmd, robj** argv, int argc, getKeyRequestsResult *result) { - if (cmd->getkeyrequests_proc == NULL) { - int i, numkeys; - getKeysResult keys = GETKEYS_RESULT_INIT; - /* whole key swaping, swaps defined by command arity. */ - numkeys = getKeysFromCommand(cmd,argv,argc,&keys); - getKeyRequestsPrepareResult(result,result->num+numkeys); - for (i = 0; i < numkeys; i++) { - robj *key = argv[keys.keys[i]]; - - incrRefCount(key); - getKeyRequestsAppendSubkeyResult(result,REQUEST_LEVEL_KEY,key,0,NULL, - cmd->intention,cmd->intention_flags,cmd->flags, dbid); - } - getKeysFreeResult(&keys); - return 0; + if (cmd->getkeyrequests_proc != NULL) { + return cmd->getkeyrequests_proc(dbid,cmd,argv,argc,result); } else if (cmd->flags & CMD_MODULE) { /* TODO support module */ + return 0; } else { - return cmd->getkeyrequests_proc(dbid,cmd,argv,argc,result); + swapCmdParseCtx ctx = { + .result = result, + }; + cmdParseKeys(dbid, cmd, argv, argc, &ctx, swapOnKey); + return 0; } - return 0; } static void getSingleCmdKeyRequests(client *c, getKeyRequestsResult *result) { @@ -1599,56 +1635,6 @@ int getKeyRequestsZdiffstore(int dbid, struct redisCommand *cmd, robj **argv, in return getKeyRequestsZunionInterDiffGeneric(dbid, cmd, argv, argc, result, SET_OP_DIFF); } -#define GETKEYS_RESULT_SUBKEYS_INIT_LEN 8 -#define GETKEYS_RESULT_SUBKEYS_LINER_LEN 1024 - -int getKeyRequestsSingleKeyWithSubkeys(int dbid, struct redisCommand *cmd, robj **argv, - int argc, struct getKeyRequestsResult *result, - int key_index, int first_subkey, int last_subkey, int subkey_step) { - int i, num = 0, capacity = GETKEYS_RESULT_SUBKEYS_INIT_LEN; - robj *key, **subkeys = NULL; - UNUSED(cmd); - - subkeys = zmalloc(capacity*sizeof(robj*)); - getKeyRequestsPrepareResult(result,result->num+1); - - key = argv[key_index]; - incrRefCount(key); - - if (last_subkey < 0) last_subkey += argc; - for (i = first_subkey; i <= last_subkey; i += subkey_step) { - robj *subkey = argv[i]; - if (num >= capacity) { - if (capacity < GETKEYS_RESULT_SUBKEYS_LINER_LEN) - capacity *= 2; - else - capacity += GETKEYS_RESULT_SUBKEYS_LINER_LEN; - - subkeys = zrealloc(subkeys, capacity*sizeof(robj*)); - } - incrRefCount(subkey); - subkeys[num++] = subkey; - } - getKeyRequestsAppendSubkeyResult(result,REQUEST_LEVEL_KEY,key,num,subkeys, - cmd->intention,cmd->intention_flags,cmd->flags, dbid); - - return 0; -} - -int getKeyRequestsHset(int dbid,struct redisCommand *cmd, robj **argv, int argc, - struct getKeyRequestsResult *result) { - return getKeyRequestsSingleKeyWithSubkeys(dbid,cmd,argv,argc,result,1,2,-1,2); -} - -int getKeyRequestsHmget(int dbid, struct redisCommand *cmd, robj **argv, int argc, - struct getKeyRequestsResult *result) { - return getKeyRequestsSingleKeyWithSubkeys(dbid,cmd,argv,argc,result,1,2,-1,1); -} - -int getKeyRequestSmembers(int dbid, struct redisCommand *cmd, robj **argv, int argc, - struct getKeyRequestsResult *result) { - return getKeyRequestsSingleKeyWithSubkeys(dbid,cmd,argv,argc,result,1,2,-1,1); -} int getKeyRequestSmove(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { @@ -1903,57 +1889,113 @@ int getKeyRequestsLtrim(int dbid, struct redisCommand *cmd, robj **argv, result,1,2,3,1/*num_ranges*/,(long)start,(long)stop,(int)1/*reverse*/); return 0; } -/** zset **/ -int getKeyRequestsZAdd(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - int first_score = 2; - while(first_score < argc) { - char *opt = argv[first_score]->ptr; - if ( - strcasecmp(opt,"nx") != 0 && - strcasecmp(opt,"xx") != 0 && - strcasecmp(opt,"ch") != 0 && - strcasecmp(opt,"incr") != 0 && - strcasecmp(opt,"gt") != 0 && - strcasecmp(opt,"lt") != 0 - ) { - break; - } - first_score++; + +int getKeyRequestsBzpopMin(int dbid, struct redisCommand *cmd, robj **argv, + int argc, struct getKeyRequestsResult *result) { + + for (int i = 1; i < argc - 1; i++) { + robj *key = argv[i]; + incrRefCount(key); + + /* bzpopmin one summber */ + zrangespec *spec = zmalloc(sizeof(zrangespec)); + spec->min = -INFINITY; + spec->max = +INFINITY; + spec->minex = 0; + spec->maxex = 0; + + getKeyRequestsAppendScoreResult(result, REQUEST_LEVEL_KEY, key, + 0, /* reverse=0: min score first */ + spec, + 1, /* limit=1 */ + cmd->intention, + cmd->intention_flags, + cmd->flags, dbid); } - return getKeyRequestsSingleKeyWithSubkeys(dbid, cmd, argv, argc, result, 1, first_score + 1, -1, 2); + return C_OK; } -int getKeyRequestsZScore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - return getKeyRequestsSingleKeyWithSubkeys(dbid, cmd,argv,argc,result,1,2,-1,1); -} +int getKeyRequestsBzpopMax(int dbid, struct redisCommand *cmd, robj **argv, + int argc, struct getKeyRequestsResult *result) { + for (int i = 1; i < argc - 1; i++) { + robj *key = argv[i]; + incrRefCount(key); -int getKeyRequestsZincrby(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - return getKeyRequestsSingleKeyWithSubkeys(dbid, cmd, argv, argc, result, 1, 3, -1, 2); + zrangespec *spec = zmalloc(sizeof(zrangespec)); + spec->min = -INFINITY; + spec->max = +INFINITY; + spec->minex = 0; + spec->maxex = 0; + + getKeyRequestsAppendScoreResult(result, REQUEST_LEVEL_KEY, key, + 1, /* reverse=1: max score first */ + spec, + 1, /* limit=1 */ + cmd->intention, + cmd->intention_flags, + cmd->flags, dbid); + } + return C_OK; } -int getKeyRequestsZMScore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - return getKeyRequestsSingleKeyWithSubkeys(dbid, cmd, argv, argc, result, 1, 2, -1, 1); -} +int getKeyRequestsZpopMin(int dbid, struct redisCommand *cmd, robj **argv, + int argc, struct getKeyRequestsResult *result) { + long count = 1; -#define ZMIN -1 -#define ZMAX 1 -int getKeyRequestsZpopGeneric(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result, int flags) { - UNUSED(cmd), UNUSED(flags); - getKeyRequestsPrepareResult(result,result->num+ argc - 2); - for(int i = 1; i < argc - 1; i++) { - incrRefCount(argv[i]); - getKeyRequestsAppendSubkeyResult(result, REQUEST_LEVEL_KEY, argv[i], 0, NULL, cmd->intention, - cmd->intention_flags, cmd->flags, dbid); + if (argc >= 3) { + long long value; + if (getLongLongFromObject(argv[2], &value) == C_OK && value > 0) { + count = value; + } } + + robj *key = argv[1]; + incrRefCount(key); + + zrangespec *spec = zmalloc(sizeof(zrangespec)); + spec->min = -INFINITY; + spec->max = +INFINITY; + spec->minex = 0; + spec->maxex = 0; + + getKeyRequestsAppendScoreResult(result, REQUEST_LEVEL_KEY, key, + 0, /* reverse=0:,min score first */ + spec, + count, /* limit */ + cmd->intention, + cmd->intention_flags, + cmd->flags, dbid); return C_OK; } -int getKeyRequestsZpopMin(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - return getKeyRequestsZpopGeneric(dbid, cmd, argv, argc, result, ZMIN); -} +int getKeyRequestsZpopMax(int dbid, struct redisCommand *cmd, robj **argv, + int argc, struct getKeyRequestsResult *result) { + long count = 1; + + if (argc >= 3) { + long long value; + if (getLongLongFromObject(argv[2], &value) == C_OK && value > 0) { + count = value; + } + } -int getKeyRequestsZpopMax(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - return getKeyRequestsZpopGeneric(dbid, cmd, argv, argc, result, ZMAX); + robj *key = argv[1]; + incrRefCount(key); + + zrangespec *spec = zmalloc(sizeof(zrangespec)); + spec->min = -INFINITY; + spec->max = +INFINITY; + spec->minex = 0; + spec->maxex = 0; + + getKeyRequestsAppendScoreResult(result, REQUEST_LEVEL_KEY, key, + 1, /* reverse=1: max score first */ + spec, + count, + cmd->intention, + cmd->intention_flags, + cmd->flags, dbid); + return C_OK; } int getKeyRequestsZrangestore(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { @@ -2070,30 +2112,6 @@ int getKeyRequestsZlexCount(int dbid, struct redisCommand *cmd, robj **argv, int int getKeyRequestsZremRangeByLex(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { return getKeyRequestsZrangeGeneric(dbid, cmd, argv, argc, result, ZRANGE_LEX, ZRANGE_DIRECTION_FORWARD); } -/** geo **/ -int getKeyRequestsGeoAdd(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - int first_score = 2; - while(first_score < argc) { - char *opt = argv[first_score]->ptr; - if ( - strcasecmp(opt,"nx") != 0 && - strcasecmp(opt,"xx") != 0 && - strcasecmp(opt,"ch") != 0 - ) { - break; - } - first_score++; - } - return getKeyRequestsSingleKeyWithSubkeys(dbid, cmd, argv, argc, result, 1, first_score + 2, -1, 3); -} - -int getKeyRequestsGeoDist(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - return getKeyRequestsSingleKeyWithSubkeys(dbid, cmd, argv, argc, result, 1, 2, -2, 1); -} - -int getKeyRequestsGeoHash(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { - return getKeyRequestsSingleKeyWithSubkeys(dbid, cmd, argv, argc, result, 1, 2, -1, 1); -} int getKeyRequestsGeoRadius(int dbid, struct redisCommand *cmd, robj **argv, int argc, struct getKeyRequestsResult *result) { int storekeyIndex = -1; @@ -2247,14 +2265,6 @@ int getKeyRequestsBitop(int dbid, struct redisCommand *cmd, robj **argv, return getKeyRequestsOneDestKeyMultiSrcKeys(dbid, cmd, argv, argc, result, 2, 3, -1); } -int getKeyRequestsBitField(int dbid, struct redisCommand *cmd, robj **argv, - int argc, struct getKeyRequestsResult *result) { - - UNUSED(argc); - getKeyRequestsSingleKey(result,argv[1],cmd->intention,cmd->intention_flags,cmd->flags,dbid); - return 0; -} - #define GET_KEYREQUESTS_MEMORY_MUL 4 @@ -2340,6 +2350,10 @@ int swapCmdTest(int argc, char *argv[], int accurate) { TEST("cmd: init") { initServerConfig(); ACLInit(); + #ifdef ENABLE_CMDPARSE + cmdParseBindToCommands(); + #endif + server.hz = 10; c = createClient(NULL); initTestRedisDb(); diff --git a/src/server.c b/src/server.c index 27bad55a938..54a4922daf4 100644 --- a/src/server.c +++ b/src/server.c @@ -3524,7 +3524,7 @@ void initServer(void) { #ifdef ENABLE_CMDPARSE cmdParseBindToCommands(); #endif - server.gtid_gap_log = createGtidGapLog(); + server.gtid_gap_log = gtidGapLogNew(); server.gtid_xsync_fullresync_indicator = 0; server.gtid_executed_cmd_count = 0; server.gtid_ignored_cmd_count = 0; diff --git a/src/server.h b/src/server.h index d2ee2435d3b..3d42a0ab899 100644 --- a/src/server.h +++ b/src/server.h @@ -1799,8 +1799,6 @@ struct redisCommand { the user associated to the connection has this command bit set in the bitmap of allowed commands. */ #ifdef ENABLE_CMDPARSE - /* 供 swap 和 gtid 共用的命令 key 解析接口(放在末尾以避免破坏命令表的位置初始化) */ - int (*cmdparse_count)(robj **argv, int argc); void (*cmdparse_parse)(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key); #endif }; diff --git a/src/xredis_cmdparse.c b/src/xredis_cmdparse.c deleted file mode 100644 index b4475a444c2..00000000000 --- a/src/xredis_cmdparse.c +++ /dev/null @@ -1,266 +0,0 @@ -#include "xredis_cmdparse.h" -#include "server.h" - -/* ================================================================ - * 辅助函数:sds 转大写(server.commands 的 key 为大写) - * ================================================================ */ -static sds sdsdupupper(sds s) { - sds result = sdsdup(s); - for (size_t i = 0; i < sdslen(result); i++) { - result[i] = toupper((unsigned char)result[i]); - } - return result; -} - -/* ================================================================ - * 各命令的 count / parse 实现 - * ================================================================ */ - -/* --- del / unlink:多个 key --- */ -static int cmdCountDel(robj **argv, int argc) { - UNUSED(argv); - return argc > 1 ? argc - 1 : 0; -} -static void cmdParseDel(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - for (int i = 1; i < argc; i++) { - on_key(ctx, OBJ_UNKNOWN, i, 0, 0, 0, NULL); - } -} - -/* --- 单个 key,无 subkeys(通过命令名推断类型)--- */ -static int cmdCountSingleKey(robj **argv, int argc) { - UNUSED(argv); UNUSED(argc); - return 1; -} -static int cmdKeyTypeFromCommand(struct redisCommand *cmd) { - if (cmd->flags & CMD_CATEGORY_STRING) return OBJ_STRING; - if (cmd->flags & CMD_CATEGORY_LIST) return OBJ_LIST; - if (cmd->flags & CMD_CATEGORY_HASH) return OBJ_HASH; - if (cmd->flags & CMD_CATEGORY_SET) return OBJ_SET; - if (cmd->flags & CMD_CATEGORY_SORTEDSET) return OBJ_ZSET; - if (cmd->flags & CMD_CATEGORY_BITMAP) return OBJ_STRING; - return OBJ_UNKNOWN; -} -static void cmdParseSingleKey(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); UNUSED(argc); - on_key(ctx, cmdKeyTypeFromCommand(cmd), 1, 0, 0, 0, NULL); -} - -/* --- mset / msetnx:多个 key,间隔出现 --- */ -static int cmdCountMset(robj **argv, int argc) { - UNUSED(argv); - return argc > 1 ? (argc - 1) / 2 : 0; -} -static void cmdParseMset(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); UNUSED(argv); - for (int i = 1; i < argc; i += 2) { - on_key(ctx, OBJ_STRING, i, 0, 0, 0, NULL); - } -} - -/* --- hset / hmset:单个 key + subkeys(field), 步长为2, 从argv[2]开始 --- */ -static void cmdParseHset(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - int subkeys_count = (argc - 2) / 2; - on_key(ctx, OBJ_HASH, 1, subkeys_count, 2, 2, NULL); -} - -/* --- hdel:单个 key + 多个 subkeys, 步长为1, 从argv[2]开始 --- */ -static void cmdParseHdel(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - int subkeys_count = argc - 2; - on_key(ctx, OBJ_HASH, 1, subkeys_count, 2, 1, NULL); -} - -/* --- hsetnx / hincrby / hincrbyfloat:单个 key + 1 个 subkey --- */ -static void cmdParseHsetnx(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - if (argc >= 3) { - on_key(ctx, OBJ_HASH, 1, 1, 2, 1, NULL); - } else { - on_key(ctx, OBJ_HASH, 1, 0, 0, 0, NULL); - } -} - -/* --- sadd / srem / spop:单个 key + 多个 subkeys, 步长为1, 从argv[2]开始 --- */ -static void cmdParseSadd(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - int subkeys_count = argc - 2; - on_key(ctx, OBJ_SET, 1, subkeys_count, 2, 1, NULL); -} - -/* --- smove:2 个 key,各带 1 个 subkey(member=argv[3])--- */ -static int cmdCountRename(robj **argv, int argc) { - UNUSED(argv); - return argc >= 3 ? 2 : (argc >= 2 ? 1 : 0); -} -static void cmdParseSmove(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); UNUSED(argv); - if (argc >= 4) { - on_key(ctx, OBJ_SET, 1, 1, 3, 1, NULL); - on_key(ctx, OBJ_SET, 2, 1, 3, 1, NULL); - } else if (argc >= 3) { - on_key(ctx, OBJ_SET, 1, 0, 0, 0, NULL); - on_key(ctx, OBJ_SET, 2, 0, 0, 0, NULL); - } else if (argc >= 2) { - on_key(ctx, OBJ_SET, 1, 0, 0, 0, NULL); - } -} - -/* --- zadd:单个 key + member subkeys, 步长为2 --- */ -static void cmdParseZadd(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - int i = 2; - while (i < argc) { - sds arg = (sds)argv[i]->ptr; - if (!strcasecmp(arg, "nx") || !strcasecmp(arg, "xx") || - !strcasecmp(arg, "ch") || !strcasecmp(arg, "incr") || - !strcasecmp(arg, "gt") || !strcasecmp(arg, "lt")) { - i++; - } else { - break; - } - } - int subkeys_count = (argc - i) / 2; - on_key(ctx, OBJ_ZSET, 1, subkeys_count, i + 1, 2, NULL); -} - -/* --- zrem:单个 key + 多个 subkeys, 步长为1, 从argv[2]开始 --- */ -static void cmdParseZrem(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - int subkeys_count = argc - 2; - on_key(ctx, OBJ_ZSET, 1, subkeys_count, 2, 1, NULL); -} - -/* --- zincrby:单个 key + 1 个 subkey --- */ -static void cmdParseZincrby(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - if (argc >= 4) { - on_key(ctx, OBJ_ZSET, 1, 1, 3, 1, NULL); - } else { - on_key(ctx, OBJ_ZSET, 1, 0, 0, 0, NULL); - } -} - -/* --- rename / renamenx:2 个 key --- */ -static void cmdParseRename(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); - if (argc >= 3) { - on_key(ctx, OBJ_UNKNOWN, 1, 0, 0, 0, NULL); - on_key(ctx, OBJ_UNKNOWN, 2, 0, 0, 0, NULL); - } else if (argc >= 2) { - on_key(ctx, OBJ_UNKNOWN, 1, 0, 0, 0, NULL); - } -} - -/* --- bitop:argv[1]=操作符, argv[2+]=key(参考 server 表 firstkey=2)--- */ -static int cmdCountBitOp(robj **argv, int argc) { - UNUSED(argv); - return argc > 2 ? argc - 2 : 0; -} -static void cmdParseBitOp(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); UNUSED(argv); - for (int i = 2; i < argc; i++) { - on_key(ctx, OBJ_STRING, i, 0, 0, 0, NULL); - } -} - -/* --- zunionstore / zinterstore / zdiffstore:dest + numkeys 个 key --- */ -static int cmdCountZstore(robj **argv, int argc) { - UNUSED(argv); - if (argc < 4) return argc >= 2 ? 1 : 0; - long long numkeys = 0; - if (getLongLongFromObject(argv[2], &numkeys) != C_OK || numkeys < 1) { - return 1; /* dest only */ - } - return 1 + (int)numkeys; -} -static void cmdParseZstore(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - UNUSED(dbid); UNUSED(argv); - if (argc < 2) return; - on_key(ctx, OBJ_ZSET, 1, 0, 0, 0, NULL); /* dest */ - if (argc < 4) return; - long long numkeys = 0; - if (getLongLongFromObject(argv[2], &numkeys) != C_OK || numkeys < 1) return; - int max_keys = argc - 3; - if (numkeys > max_keys) numkeys = max_keys; - for (long long i = 0; i < numkeys; i++) { - on_key(ctx, OBJ_ZSET, 3 + (int)i, 0, 0, 0, NULL); - } -} - -/* ================================================================ - * 引入由 generate_cmdparse_commands.py 自动生成的命令注册表 - * ================================================================ */ -#include "xredis_commands.def" - -/* ================================================================ - * 公共接口:通过 server.commands 查找 - * ================================================================ */ - -/* 计算命令中包含的 key 条目数(通过 lookupCommand 从 server.commands 取) */ -int cmdParseCountKeys(robj **argv, int argc) { - if (argc < 1) return 0; - sds cmd_upper = sdsdupupper((sds)argv[0]->ptr); - struct redisCommand *cmd = lookupCommand(cmd_upper); - sdsfree(cmd_upper); - if (cmd != NULL && cmd->cmdparse_count != NULL) { - return cmd->cmdparse_count(argv, argc); - } - /* unknown command fallback */ - return argc >= 2 ? 1 : 0; -} - -/* 解析命令,通过回调通知每个 key 的位置描述(通过 lookupCommand 从 server.commands 取) */ -void cmdParseKeys(int dbid, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - if (argc < 1) return; - sds cmd_upper = sdsdupupper((sds)argv[0]->ptr); - struct redisCommand *cmd = lookupCommand(cmd_upper); - sdsfree(cmd_upper); - if (cmd != NULL && cmd->cmdparse_parse != NULL) { - cmd->cmdparse_parse(dbid, cmd, argv, argc, ctx, on_key); - } else if (argc >= 2) { - /* unknown command fallback */ - serverLog(LL_WARNING, "Unknown command '%s' for key propagation", (sds)argv[0]->ptr); - // on_key(ctx, OBJ_UNKNOWN, 1, 0, 0, 0, NULL); - serverPanic("unknown command fallback"); - } -} - -/* 绑定 cmdparse 函数到 server.commands 中的所有命令 */ -void cmdParseBindToCommands(void) { - int i; - for (i = 0; cmd_parse_commands[i].name != NULL; i++) { - sds name = sdsnew(cmd_parse_commands[i].name); - for (size_t j = 0; j < sdslen(name); j++) { - name[j] = toupper((unsigned char)name[j]); - } - struct redisCommand *cmd = lookupCommand(name); - sdsfree(name); - if (cmd != NULL) { - cmd->cmdparse_count = cmd_parse_commands[i].count; - cmd->cmdparse_parse = cmd_parse_commands[i].parse; - } - } -} - -/* 通过命令名查找 count 函数(从 server.commands 取) */ -int (*cmdParseGetCountFunc(const char *cmd_name))(robj **argv, int argc) { - if (cmd_name == NULL) return NULL; - struct redisCommand *cmd = lookupCommandByCString(cmd_name); - if (cmd != NULL) { - return cmd->cmdparse_count; - } - return NULL; -} - -/* 通过命令名查找 parse 函数(从 server.commands 取) */ -void (*cmdParseGetParseFunc(const char *cmd_name))(int dbid, struct redisCommand *cmd, robj **argv, int argc, void *ctx, cmdParseOnKeyFn on_key) { - if (cmd_name == NULL) return NULL; - struct redisCommand *cmd = lookupCommandByCString(cmd_name); - if (cmd != NULL) { - return cmd->cmdparse_parse; - } - return NULL; -} diff --git a/src/xredis_cmdparse.c b/src/xredis_cmdparse.c new file mode 120000 index 00000000000..65d69ab2695 --- /dev/null +++ b/src/xredis_cmdparse.c @@ -0,0 +1 @@ +../deps/xredis-gtid/xredis/xredis_cmdparse.c \ No newline at end of file From 1e0c37bffbfe02925d38ac667abc89f6be5f393a Mon Sep 17 00:00:00 2001 From: "gd.zhou" Date: Thu, 4 Jun 2026 13:35:42 +0800 Subject: [PATCH 4/4] [gtid] submodule: add readBacklogIterator (Init/Deinit/SeekTo/ParseNext) and refactor saveGapLogFromGtidSet - readBacklogIterator stack-allocated with embedded mock client - querybuf reuse across gno iterations (no backlog re-read) - parseGtidCommand/parseMultiCommand take (argv, argc) and readBacklogIterator* respectively - saveGapLogFromGtidSet uses the iterator - parseCmdFromBacklog, cleanMockClient, resetMockClient removed - fix pre-existing typo: redisComamnd -> redisCommand in gtidOnKey --- deps/xredis-gtid | 2 +- src/server.c | 2 +- src/server.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/deps/xredis-gtid b/deps/xredis-gtid index 7d8aa82656f..13bda7808ab 160000 --- a/deps/xredis-gtid +++ b/deps/xredis-gtid @@ -1 +1 @@ -Subproject commit 7d8aa82656f7ebdbd39a2b84d9a1b7456290c1a9 +Subproject commit 13bda7808ab66e978458b134e8470c613401b714 diff --git a/src/server.c b/src/server.c index 54a4922daf4..95c61553504 100644 --- a/src/server.c +++ b/src/server.c @@ -3524,7 +3524,7 @@ void initServer(void) { #ifdef ENABLE_CMDPARSE cmdParseBindToCommands(); #endif - server.gtid_gap_log = gtidGapLogNew(); + server.gtid_gap_log = gtidGaplogNew(); server.gtid_xsync_fullresync_indicator = 0; server.gtid_executed_cmd_count = 0; server.gtid_ignored_cmd_count = 0; diff --git a/src/server.h b/src/server.h index 3d42a0ab899..ac64770ecf6 100644 --- a/src/server.h +++ b/src/server.h @@ -1742,7 +1742,7 @@ struct redisServer { long long gtid_executed_cmd_count; long long gtid_sync_stat[GTID_SYNC_TYPES]; int gtid_gaplog_enabled; - gtidGapLog* gtid_gap_log; + gtidGaplog* gtid_gap_log; /* importing mode */ mstime_t importing_end_time; /* in milliseconds */