Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,12 @@ int encode_chunk(int desc, char *data, int datalen, struct encode_chunk_context
// do the true encoding according the backend used (isa-l, cauchy ....)
ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, ctx->chunk_size);
if (ret < 0) {
fprintf(stderr, "error encode ret = %d\n", ret);
return -1;
}

// fill the headers with true len, fragment len ....
ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, ctx->chunk_size, tot_len_sum, k_ref, m_ref);
if (ret < 0) {
fprintf(stderr, "error encode ret = %d\n", ret);
return -1;
}
return 0;
Expand Down Expand Up @@ -285,7 +283,9 @@ int my_liberasurecode_encode_cleanup(int desc,
return 0;
}

// Prepare memory, allocating stuff. Suitable for "buffermatrix": no data fragments allocated.
// Prepare memory, allocating stuff.
// Suitable for "buffermatrix": no data fragments allocated.
// Will also init chunk_size and number_of_subgroup
void encode_chunk_buffermatrix_prepare(int desc,
char *data,
int datalen,
Expand All @@ -301,6 +301,7 @@ void encode_chunk_buffermatrix_prepare(int desc,

ctx->number_of_subgroup = number_of_subgroup;

// Note: last subgroup may be smaller than the others
ctx->chunk_size = piecesize;

ctx->k = k;
Expand All @@ -316,7 +317,8 @@ void encode_chunk_buffermatrix_prepare(int desc,

// Encode a chunk using a buffer matrix as an input
// Same as above with the twist that data is not copied and can be directly
static int encode_chunk_buffermatrix(int desc, char *data, int datalen, int nbFrags, struct encode_chunk_context *ctx, int nth)
static int encode_chunk_buffermatrix(int desc, char *data, int datalen,
int nbFrags, struct encode_chunk_context *ctx, int nth, size_t fraglen)
{
ec_backend_t ec = ctx->instance;
char *k_ref[ctx->k];
Expand All @@ -339,8 +341,10 @@ static int encode_chunk_buffermatrix(int desc, char *data, int datalen, int nbFr
k_ref[i] = ptr;

// Computes actual data in the fragment
int size = datalen - (nth * ctx->k + i) * ctx->chunk_size;
tot_len_sum += size > 0 ? (size > ctx->chunk_size ? ctx->chunk_size: size) : 0;
// If we are at the end of the data, we may have a smaller
// fragment than the others. fraglen will take of that.
int size = datalen - ((nth * ctx->k * ctx->chunk_size) + (i * fraglen));
tot_len_sum += size > 0 ? (size > fraglen ? fraglen : size) : 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there some cases where fraglent can be lower than 0 ?
otherwise, it could be simplified ?

}

// "coding" fragments. Those ones are allocated above
Expand All @@ -353,15 +357,14 @@ static int encode_chunk_buffermatrix(int desc, char *data, int datalen, int nbFr
}

// do the true encoding according the backend used (isa-l, cauchy ....)
ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, ctx->chunk_size);
// chunksize is the size of fragmenets in the stripe

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// chunksize is the size of fragmenets in the stripe
// chunksize is the size of fragments in the stripe

ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, fraglen);
if (ret < 0) {
fprintf(stderr, "error encode ret = %d\n", ret);
return -1;
}

ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, ctx->chunk_size, tot_len_sum, k_ref, m_ref);
ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, fraglen, tot_len_sum, k_ref, m_ref);
if (ret < 0) {
fprintf(stderr, "error encode ret = %d\n", ret);
return -1;
}
return 0;
Expand All @@ -372,7 +375,7 @@ int encode_chunk_buffermatrix_all(int desc, char *data, int datalen, int nbfrags
int i;

for (i = 0; i < max ; i++) {
int err = encode_chunk_buffermatrix(desc, data, datalen, nbfrags, ctx, i);
int err = encode_chunk_buffermatrix(desc, data, datalen, nbfrags, ctx, i, ctx->chunk_size);
if (err != 0) {
return err;
}
Expand Down Expand Up @@ -667,6 +670,8 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize

nbFrags := C.int(bm.SubGroups())

// This prepares the context for encoding
// It allocates the coding fragments (not the data fragments)
C.encode_chunk_buffermatrix_prepare(backend.libecDesc, pData, pDataLen,
cChunkSize, C.int(bm.FragLen()), nbFrags, &ctx)

Expand All @@ -675,8 +680,13 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize
wg.Add(int(ctx.number_of_subgroup))

for i := 0; i < int(ctx.number_of_subgroup); i++ {
go func(nth int) {
r := C.encode_chunk_buffermatrix(backend.libecDesc, pData, pDataLen, nbFrags, &ctx, C.int(nth))
func(nth int) {
fragLen := C.size_t(chunkSize)
if i == int(ctx.number_of_subgroup)-1 { // last subgroup has a different size
fragLen = C.size_t(bm.FragLenLastSubGroup())
}
r := C.encode_chunk_buffermatrix(backend.libecDesc, pData, pDataLen,
nbFrags, &ctx, C.int(nth), fragLen)
if r < 0 {
atomic.AddUint32(&errCounter, 1)
}
Expand Down Expand Up @@ -822,7 +832,7 @@ func (backend *Backend) DecodeMatrix(frags [][]byte, piecesize int) (*DecodeData
totLen := uint64(0)
wg.Add(numBlock)

for i := 0; i < numBlock; i++ {
for i := range numBlock {
// launch goroutines, providing them a subrange of the final buffer so it can be used
// in concurrency without need to lock it access
go func(blockNr int) {
Expand Down Expand Up @@ -881,14 +891,14 @@ func (backend *Backend) decodeMatrixSlow(frags [][]byte, piecesize int) (*Decode

var totLen int64

for i := 0; i < blockNr; i++ {
for i := range blockNr {
vect := make([][]byte, len(frags))
for j := 0; j < len(frags); j++ {
for j := range frags {
vect[j] = frags[j][i*cellSize : (i+1)*cellSize]
}
subdata, err := backend.Decode(vect)
if err != nil {
return nil, fmt.Errorf("error subdecoding %d cause =%v", i, err)
return nil, fmt.Errorf("error subdecoding %d cause %w", i, err)
}
copy(data[totLen:], subdata.Data)
totLen += int64(len(subdata.Data))
Expand Down Expand Up @@ -919,6 +929,7 @@ func (backend *Backend) GetRangeMatrix(start, end, chunksize, fragSize int) *Ran
linearizedDataLen := trueFragLen * backend.K

if start > linearizedDataLen || end > linearizedDataLen || start > end {
//fmt.Fprintf(os.Stderr, "start %d end %d linearizedDataLen %d\n", start, end, linearizedDataLen)
return nil
}

Expand Down
Loading