Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/dtx/dtx_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,9 @@ dtx_leader_end(struct dtx_leader_handle *dlh, struct ds_cont_child *cont, int re
* The leader will trigger retry globally without abort 'prepared' ones.
*/
if (result < 0 && result != -DER_AGAIN && !dth->dth_solo) {
if (DAOS_FAIL_CHECK(DAOS_DTX_RESEND_NONLEADER))
goto out;

/* 1. Drop partial modification for distributed transaction.
* 2. Remove the pinned DTX entry.
*/
Expand Down
2 changes: 2 additions & 0 deletions src/include/daos/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,8 @@ enum {
#define DAOS_OBJ_SYNC_RETRY (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4c)
#define DAOS_OBJ_COLL_SPARSE (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4d)

#define DAOS_DTX_RESEND_NONLEADER (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x4e)

#define DAOS_NVME_FAULTY (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x50)
#define DAOS_NVME_WRITE_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x51)
#define DAOS_NVME_READ_ERR (DAOS_FAIL_UNIT_TEST_GROUP_LOC | 0x52)
Expand Down
10 changes: 6 additions & 4 deletions src/object/cli_shard.c
Original file line number Diff line number Diff line change
Expand Up @@ -787,12 +787,14 @@ dc_rw_cb(tse_task_t *task, void *arg)
* If any failure happens inside Cart, let's reset failure to
* TIMEDOUT, so the upper layer can retry.
*/
D_ERROR(DF_UOID" (%s) RPC %d to %d/%d, flags %lx/%x, task %p failed, %s: "DF_RC"\n",
DP_UOID(orw->orw_oid), is_ec_obj ? "EC" : "non-EC", opc,
D_ERROR(DF_UOID
" (%s) RPC %p (%d) to %d/%d, flags %lx/%x, task %p failed, %s, TX " DF_DTI
": " DF_RC "\n",
DP_UOID(orw->orw_oid), is_ec_obj ? "EC" : "non-EC", rw_args->rpc, opc,
rw_args->rpc->cr_ep.ep_rank, rw_args->rpc->cr_ep.ep_tag,
(unsigned long)orw->orw_api_flags, orw->orw_flags, task,
orw->orw_bulks.ca_arrays != NULL ||
orw->orw_bulks.ca_count != 0 ? "DMA" : "non-DMA", DP_RC(ret));
orw->orw_bulks.ca_arrays || orw->orw_bulks.ca_count ? "DMA" : "non-DMA",
DP_DTI(&orw->orw_dti), DP_RC(ret));

D_GOTO(out, ret);
}
Expand Down
34 changes: 25 additions & 9 deletions src/object/srv_obj.c
Original file line number Diff line number Diff line change
Expand Up @@ -2706,6 +2706,11 @@ ds_obj_ec_agg_handler(crt_rpc_t *rpc)
obj_ioc_end(&ioc, rc);
}

enum obj_resend_status {
ORS_PREPARED = 1,
ORS_DONE = 2,
};

static int
obj_handle_resend(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, uint32_t *pm_ver,
uint32_t *flags, struct dtx_memberships *mbs, bool leader, bool dist)
Expand All @@ -2723,7 +2728,7 @@ obj_handle_resend(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, ui
switch (rc) {
case -DER_ALREADY:
/* Do nothing if 'committed' or 'committable'. */
D_GOTO(out, rc = 1);
D_GOTO(out, rc = ORS_DONE);
case 0:
/* For 'prepared' DTX, if pool map has been changed, then DTX membership maybe
* changed also. Let's refresh it if necessary.
Expand All @@ -2745,6 +2750,7 @@ obj_handle_resend(daos_handle_t coh, struct dtx_id *dti, daos_epoch_t *epoch, ui
*epoch = e;
}

rc = ORS_PREPARED;
break;
case -DER_MISMATCH:
if (dist)
Expand Down Expand Up @@ -2891,6 +2897,8 @@ ds_obj_tgt_update_handler(crt_rpc_t *rpc)
out:
if (dth != NULL)
rc = dtx_end(dth, ioc.ioc_coc, rc);
if (!(orw->orw_flags & ORF_RESEND) && DAOS_FAIL_CHECK(DAOS_DTX_RESEND_NONLEADER))
ioc.ioc_lost_reply = 1;
obj_rw_reply(rpc, rc, 0, true, &ioc);
D_FREE(mbs);
obj_ioc_end(&ioc, rc);
Expand Down Expand Up @@ -3149,8 +3157,10 @@ ds_obj_rw_handler(crt_rpc_t *rpc)
version = orw->orw_map_ver;
rc = obj_handle_resend(ioc.ioc_vos_coh, &orw->orw_dti, &orw->orw_epoch, &version,
&flags, mbs, true, false);
if (rc != 0)
D_GOTO(out, rc = (rc > 0 ? 0 : rc));
if (rc < 0)
goto out;
if (rc == ORS_DONE)
D_GOTO(out, rc = 0);
} else if (DAOS_FAIL_CHECK(DAOS_DTX_LOST_RPC_REQUEST)) {
ioc.ioc_lost_reply = 1;
D_GOTO(out, rc);
Expand Down Expand Up @@ -4054,8 +4064,10 @@ ds_obj_punch_handler(crt_rpc_t *rpc)
version = opi->opi_map_ver;
rc = obj_handle_resend(ioc.ioc_vos_coh, &opi->opi_dti, &opi->opi_epoch, &version,
&flags, mbs, true, false);
if (rc != 0)
D_GOTO(out, rc = (rc > 0 ? 0 : rc));
if (rc < 0)
goto out;
if (rc == ORS_DONE)
D_GOTO(out, rc = 0);
} else if (DAOS_FAIL_CHECK(DAOS_DTX_LOST_RPC_REQUEST) ||
DAOS_FAIL_CHECK(DAOS_DTX_LONG_TIME_RESEND)) {
goto cleanup;
Expand Down Expand Up @@ -5248,8 +5260,10 @@ ds_obj_dtx_leader(struct daos_cpd_args *dca)
rc = obj_handle_resend(dca->dca_ioc->ioc_vos_coh, &dcsh->dcsh_xid,
&dcsh->dcsh_epoch.oe_value, &oci->oci_map_ver, &flags,
dcsh->dcsh_mbs, true, true);
if (rc != 0)
D_GOTO(out, rc = (rc > 0 ? 0 : rc));
if (rc < 0)
goto out;
if (rc == ORS_DONE)
D_GOTO(out, rc = 0);
} else if (DAOS_FAIL_CHECK(DAOS_DTX_LOST_RPC_REQUEST)) {
D_GOTO(out, rc = 0);
}
Expand Down Expand Up @@ -5841,8 +5855,10 @@ ds_obj_coll_punch_handler(crt_rpc_t *rpc)
version = ocpi->ocpi_map_ver;
rc = obj_handle_resend(ioc.ioc_vos_coh, &ocpi->ocpi_xid, &ocpi->ocpi_epoch,
&version, &flags, odm->odm_mbs, leader, false);
if (rc != 0)
D_GOTO(out, rc = (rc > 0 ? 0 : rc));
if (rc < 0)
goto out;
if (rc == ORS_DONE)
D_GOTO(out, rc = 0);

dce->dce_ver = version;
}
Expand Down
44 changes: 43 additions & 1 deletion src/tests/suite/daos_base_tx.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/**
* (C) Copyright 2019-2024 Intel Corporation.
* (C) Copyright 2026 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -563,14 +564,15 @@ dtx_15(void **state)
static void
dtx_16(void **state)
{
FAULT_INJECTION_REQUIRED();
test_arg_t *arg = *state;
char *update_buf;
const char *dkey = dts_dtx_dkey;
const char *akey = dts_dtx_akey;
daos_obj_id_t oid;
struct ioreq req;

FAULT_INJECTION_REQUIRED();

print_message("Resend after DTX aggregation\n");

if (!test_runable(arg, dts_dtx_replica_cnt))
Expand Down Expand Up @@ -986,6 +988,42 @@ dtx_22(void **state)
par_barrier(PAR_COMM_WORLD);
}

static void
dtx_23(void **state)
{
test_arg_t *arg = *state;
char *update_buf;
const char *dkey = dts_dtx_dkey;
const char *akey = dts_dtx_akey;
daos_obj_id_t oid;
struct ioreq req;

FAULT_INJECTION_REQUIRED();

print_message("DTX23: Resend with lost reply from non-leader\n");

if (!test_runable(arg, dts_dtx_replica_cnt))
return;

D_ALLOC(update_buf, dts_dtx_iosize);
assert_non_null(update_buf);
dts_buf_render(update_buf, dts_dtx_iosize);

oid = daos_test_oid_gen(arg->coh, dts_dtx_class, 0, 0, arg->myrank);
ioreq_init(&req, arg->coh, oid, DAOS_IOD_SINGLE, arg);

dtx_set_fail_loc(arg, DAOS_DTX_RESEND_NONLEADER | DAOS_FAIL_ALWAYS);

insert_single(dkey, akey, 0, update_buf, dts_dtx_iosize, DAOS_TX_NONE, &req);

dtx_set_fail_loc(arg, 0);

dtx_check_replicas(dkey, akey, "update_succ", update_buf, dts_dtx_iosize, &req);

D_FREE(update_buf);
ioreq_fini(&req);
}

static int
dtx_base_rf0_setup(void **state)
{
Expand All @@ -1006,6 +1044,7 @@ dtx_base_rf1_setup(void **state)
return rc;
}

/* clang-format off */
static const struct CMUnitTest dtx_tests[] = {
{"DTX1: update/punch single value with DTX successfully",
dtx_1, NULL, test_case_teardown},
Expand Down Expand Up @@ -1051,7 +1090,10 @@ static const struct CMUnitTest dtx_tests[] = {
dtx_21, dtx_base_rf0_setup, rebuild_sub_teardown},
{"DTX22: iteration does not return aborted DTX",
dtx_22, NULL, test_case_teardown},
{"DTX23: Resend with lost reply from non-leader",
dtx_23, NULL, test_case_teardown},
};
/* clang-format on */

static int
dtx_test_setup(void **state)
Expand Down
Loading