Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Changed
* Stop marking/killing fast-mode iproto fibers on safe-mode switch.
Storage ops are asserted by `yield_checks` in tests.
* Move switch to safe mode from `on_commit` trigger to `on_replace` trigger.
* vinyl spaces always work in safe mode.

## [1.7.0] - 26-12-25

### Added
Expand Down
32 changes: 3 additions & 29 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
local errors = require('errors')
local log = require('log')

local call_cache = require('crud.common.call_cache')
local dev_checks = require('crud.common.dev_checks')
local yield_checks = require('crud.common.yield_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber = require('fiber')
local fiber_clock = fiber.clock
local const = require('crud.common.const')
local rebalance = require('crud.common.rebalance')
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')

local BaseIterator = require('crud.common.map_call_cases.base_iter')
Expand All @@ -18,38 +17,13 @@ local CallError = errors.new_class('CallError')

local CALL_FUNC_NAME = 'call_on_storage'
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)
local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/fast'

local call = {}

local function call_on_storage_safe(run_as_user, func_name, ...)
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
local function call_on_storage(run_as_user, func_name, ...)
return yield_checks.guard(box.session.su, run_as_user, call_cache.func_name_to_func(func_name), ...)
end
Comment thread
vakhov marked this conversation as resolved.

local function call_on_storage_fast(run_as_user, func_name, ...)
fiber.name(CRUD_CALL_FIBER_NAME)
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
end

local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast

rebalance.on_safe_mode_toggle(function(is_enabled)
if is_enabled then
call_on_storage = call_on_storage_safe

local fibers_killed = 0
for fb_id, fb in pairs(fiber.info()) do
if fb.name == CRUD_CALL_FIBER_NAME then
fiber.kill(fb_id)
fibers_killed = fibers_killed + 1
end
end
log.debug('Killed %d fibers with fast-mode crud requests.', fibers_killed)
else
call_on_storage = call_on_storage_fast
end
end)

call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}

function call.get_vshard_call_name(mode, prefer_replica, balance)
Expand Down
31 changes: 12 additions & 19 deletions crud/common/rebalance.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,36 +58,29 @@ local function _safe_mode_disable()
end

local function create_settings_trigger()
schema.settings_space:on_replace(function()
box.on_commit(function(rows_iter)
schema.settings_space:on_replace(function(old, new, _, op)
if op == 'REPLACE' then
-- There may be multiple operations on safe mode status tuple in one transaction.
-- We will take only the last action.
-- 0 = do nothing, 1 = enable safe mode, -1 = disable safe mode
local safe_mode_action = 0
for _, old, new, sp in rows_iter() do
-- These checks must be changed to skip unknown keys when there will be more than one setting
-- in _crud_settings_local space.
-- But for now it is better to raise an error than to silently ignore them.
if sp ~= schema.settings_space.id then
goto continue
end
assert((old == nil or old.key == SAFE_MODE_STATUS) and (new.key == SAFE_MODE_STATUS))

if (not old or not old.value) and new.value then
safe_mode_action = 1
elseif old and old.value and not new.value then
safe_mode_action = -1
end

::continue::
-- These checks must be changed to skip unknown keys when there will be more than one setting
-- in _crud_settings_local space.
-- But for now it is better to raise an error than to silently ignore them.
assert((old == nil or old.key == SAFE_MODE_STATUS) and (new.key == SAFE_MODE_STATUS))

if (not old or not old.value) and new.value then
safe_mode_action = 1
elseif old and old.value and not new.value then
safe_mode_action = -1
end

if safe_mode_action == 1 then
_safe_mode_enable()
elseif safe_mode_action == -1 then
_safe_mode_disable()
end
end)
end
end)
end

Expand Down
3 changes: 3 additions & 0 deletions crud/common/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local ReloadSchemaError = errors.new_class('ReloadSchemaError', {capture_stack =

local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local yield_checks = require('crud.common.yield_checks')
local utils = require('crud.common.vshard_utils')

local schema = {}
Expand Down Expand Up @@ -216,6 +217,8 @@ end
-- `add_space_schema_hash` is true
function schema.wrap_func_result(space, func, opts, ...)
dev_checks('table', 'function', 'table')
yield_checks.check_no_yields()
Comment thread
a1div0 marked this conversation as resolved.

local result = {}

local ok, func_res = pcall(func, ...)
Expand Down
22 changes: 16 additions & 6 deletions crud/common/sharding/bucket_ref_unref.lua
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,23 @@ safe_methods = {
bucket_unrefrw_many = bucket_ref_unref._bucket_unrefrw_many,
}

local function make_fast_method(method_name)
return function(arg, space_engine)
if space_engine == 'vinyl' then
-- vinyl always works in safe mode
return safe_methods[method_name](arg)
end
return bucket_ref_unref._fast()
end
end

fast_methods = {
bucket_refrw = bucket_ref_unref._fast,
bucket_unrefrw = bucket_ref_unref._fast,
bucket_refro = bucket_ref_unref._fast,
bucket_unrefro = bucket_ref_unref._fast,
bucket_refrw_many = bucket_ref_unref._fast,
bucket_unrefrw_many = bucket_ref_unref._fast,
bucket_refrw = make_fast_method('bucket_refrw'),
bucket_unrefrw = make_fast_method('bucket_unrefrw'),
bucket_refro = make_fast_method('bucket_refro'),
bucket_unrefro = make_fast_method('bucket_unrefro'),
bucket_refrw_many = make_fast_method('bucket_refrw_many'),
bucket_unrefrw_many = make_fast_method('bucket_unrefrw_many')
}

local function set_methods(methods)
Expand Down
51 changes: 51 additions & 0 deletions crud/common/yield_checks.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
--- module used to check that fiber has no yields
--- during execution in tests
local fiber = require("fiber")

local registry = {}

local yield_checks = {
check_no_yields = function() end,
guard = function(f, ...)
return f(...)
end,
}

local function register_fiber()
local info = fiber.self():info()
assert(registry[info.fid] == nil, "fiber already registered, check register_fiber calls")
registry[info.fid] = info
end

local function check_no_yields()
local info_curr = fiber.self():info()
local info_prev = registry[info_curr.fid]
assert(info_prev ~= nil, "fiber is not registered")
assert(info_curr.csw == info_prev.csw, "yield happened during fiber execution")
end

local function unregister_fiber()
local fid = fiber.self():id()
assert(registry[fid] ~= nil, "fiber is not registered")
registry[fid] = nil
end

local function finish(ok, res_err, ...)
unregister_fiber()
if not ok then
error(res_err)
end
return res_err, ...
end

local function guard(f, ...)
register_fiber()
return finish(pcall(f, ...))
end

if os.getenv("TARANTOOL_CRUD_ENABLE_INTERNAL_CHECKS") == "ON" then
Comment thread
a1div0 marked this conversation as resolved.
Comment thread
ita-sammann marked this conversation as resolved.
Comment thread
ita-sammann marked this conversation as resolved.
yield_checks.check_no_yields = check_no_yields
yield_checks.guard = guard
end

return yield_checks
4 changes: 2 additions & 2 deletions crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ local function delete_on_storage(space_name, key, field_names, opts)
return nil, err
end

local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id)
local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(opts.bucket_id, space.engine)
if not ref_ok then
return nil, bucket_ref_err
end
Expand All @@ -58,7 +58,7 @@ local function delete_on_storage(space_name, key, field_names, opts)
fetch_latest_metadata = opts.fetch_latest_metadata,
}, space, key)

local unref_ok, err_unref = unref(opts.bucket_id)
local unref_ok, err_unref = unref(opts.bucket_id, space.engine)
if not unref_ok then
return nil, err_unref
end
Expand Down
4 changes: 2 additions & 2 deletions crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ local function get_on_storage(space_name, key, field_names, opts)
return nil, err
end

local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refro(opts.bucket_id)
local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refro(opts.bucket_id, space.engine)
if not ref_ok then
return nil, bucket_ref_err
end
Expand All @@ -56,7 +56,7 @@ local function get_on_storage(space_name, key, field_names, opts)
fetch_latest_metadata = opts.fetch_latest_metadata,
}, space, key)

local unref_ok, err_unref = unref(opts.bucket_id)
local unref_ok, err_unref = unref(opts.bucket_id, space.engine)
if not unref_ok then
return nil, err_unref
end
Expand Down
4 changes: 2 additions & 2 deletions crud/insert.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ local function insert_on_storage(space_name, tuple, opts)
end

local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id)
local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id, space.engine)

if not ref_ok then
return nil, bucket_ref_err
Expand All @@ -60,7 +60,7 @@ local function insert_on_storage(space_name, tuple, opts)
fetch_latest_metadata = opts.fetch_latest_metadata,
}, space, tuple)

local unref_ok, err_unref = unref(bucket_id)
local unref_ok, err_unref = unref(bucket_id, space.engine)
if not unref_ok then
return nil, err_unref
end
Expand Down
12 changes: 6 additions & 6 deletions crud/insert_many.lua
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ local function insert_many_on_storage(space_name, tuples, opts)
bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
end

local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids)
local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids, space.engine)
if not ref_ok then
return nil, bucket_ref_err
end
Expand Down Expand Up @@ -100,7 +100,7 @@ local function insert_many_on_storage(space_name, tuples, opts)
end

if opts.rollback_on_error == true then
local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.rollback()
if not unref_ok then
return nil, bucket_unref_err
Expand All @@ -113,7 +113,7 @@ local function insert_many_on_storage(space_name, tuples, opts)
return nil, errs, replica_schema_version
end

local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.commit()
if not unref_ok then
return nil, bucket_unref_err
Expand All @@ -128,7 +128,7 @@ local function insert_many_on_storage(space_name, tuples, opts)

if next(errs) ~= nil then
if opts.rollback_on_error == true then
local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.rollback()
if not unref_ok then
return nil, bucket_unref_err
Expand All @@ -141,7 +141,7 @@ local function insert_many_on_storage(space_name, tuples, opts)
return nil, errs, replica_schema_version
end

local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.commit()
if not unref_ok then
return nil, bucket_unref_err
Expand All @@ -150,7 +150,7 @@ local function insert_many_on_storage(space_name, tuples, opts)
return inserted_tuples, errs, replica_schema_version
end

local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.commit()
if not unref_ok then
return nil, bucket_unref_err
Expand Down
4 changes: 2 additions & 2 deletions crud/replace.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ local function replace_on_storage(space_name, tuple, opts)
end

local bucket_id = tuple[utils.get_bucket_id_fieldno(space)]
local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id)
local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw(bucket_id, space.engine)
if not ref_ok then
return nil, bucket_ref_err
end
Expand All @@ -59,7 +59,7 @@ local function replace_on_storage(space_name, tuple, opts)
fetch_latest_metadata = opts.fetch_latest_metadata,
}, space, tuple)

local unref_ok, err_unref = unref(bucket_id)
local unref_ok, err_unref = unref(bucket_id, space.engine)
if not unref_ok then
return nil, err_unref
end
Expand Down
12 changes: 6 additions & 6 deletions crud/replace_many.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ local function replace_many_on_storage(space_name, tuples, opts)
bucket_ids[tuple[utils.get_bucket_id_fieldno(space)]] = true
end

local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids)
local ref_ok, bucket_ref_err, unref = bucket_ref_unref.bucket_refrw_many(bucket_ids, space.engine)
if not ref_ok then
return nil, bucket_ref_err
end
Expand Down Expand Up @@ -103,7 +103,7 @@ local function replace_many_on_storage(space_name, tuples, opts)
end

if opts.rollback_on_error == true then
local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.rollback()
if not unref_ok then
return nil, bucket_unref_err
Expand All @@ -115,7 +115,7 @@ local function replace_many_on_storage(space_name, tuples, opts)
return nil, errs, replica_schema_version
end

local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.commit()
if not unref_ok then
return nil, bucket_unref_err
Expand All @@ -129,7 +129,7 @@ local function replace_many_on_storage(space_name, tuples, opts)

if next(errs) ~= nil then
if opts.rollback_on_error == true then
local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.rollback()
if not unref_ok then
return nil, bucket_unref_err
Expand All @@ -142,7 +142,7 @@ local function replace_many_on_storage(space_name, tuples, opts)
return nil, errs, replica_schema_version
end

local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.commit()
if not unref_ok then
return nil, bucket_unref_err
Expand All @@ -151,7 +151,7 @@ local function replace_many_on_storage(space_name, tuples, opts)
return inserted_tuples, errs, replica_schema_version
end

local unref_ok, bucket_unref_err = unref(bucket_ids)
local unref_ok, bucket_unref_err = unref(bucket_ids, space.engine)
box.commit()
if not unref_ok then
return nil, bucket_unref_err
Expand Down
Loading
Loading