diff --git a/backend.c b/backend.c new file mode 100644 index 0000000..4846940 --- /dev/null +++ b/backend.c @@ -0,0 +1,408 @@ +#include +#include +#include +#include +#include +#include "backend.h" + +// shims to make working with frag arrays easier +char **makeStrArray(int n) { return calloc(n, sizeof(char *)); } + +void freeStrArray(char **arr) { free(arr); } + +uint64_t getOrigDataSize(struct fragment_header_s *header) { + return header->meta.orig_data_size; +} +uint32_t getBackendVersion(struct fragment_header_s *header) { + return header->meta.backend_version; +} +ec_backend_id_t getBackendID(struct fragment_header_s *header) { + return header->meta.backend_id; +} +uint32_t getECVersion(struct fragment_header_s *header) { + return header->libec_version; +} +int getHeaderSize() { return sizeof(struct fragment_header_s); } + +// shims because the fragment headers use misaligned fields + +// linearize is used when we have all data fragment. Instead of doing a true +// decoding, we just reassemble all the fragment linearized in a buffer. This is +// mainly a copy of liberasurecode fragment_to_string function, except that we +// won't do any addionnal allocation +// +// /!\ This function does not perform any header checksum validation. +// If fragments must be validated checks 'check_matrix_fragment' +// +// 'k' the number of data fragment used for the encoding. +// 'in' is an array of all data frags, in their index order. +// 'inlen' is the array size +// 'dest' is an already allocated buffer where data will be linearized +// 'destlen' is the buffer size, and hence, the maximum number of bytes +// linearized 'outlen' is a pointer containing the number of bytes really +// linearized in dest (always lower or equal to destlen) it returns dest if +// nothing went wrong, else null +char *linearize(int k, char **in, int inlen, char *dest, uint64_t destlen, + uint64_t *outlen) { + int i; + int curr_idx = 0; + int orig_data_size = -1; + + if (dest == NULL || outlen == NULL) { + return NULL; + } + + // The following perform small correctness checks before linearizing + // the buffer + int previous_idx = -1; + for (i = 0; i < inlen; i++) { + int index = get_fragment_idx(in[i]); + + if (orig_data_size < 0) { + orig_data_size = get_orig_data_size(in[i]); + } else if (get_orig_data_size(in[i]) != orig_data_size) { + return NULL; + } + + if (index >= k) { + return NULL; + } + + // Checks that the fragments are sorted. + if (previous_idx > index) { + return NULL; + } + previous_idx = index; + } + + // compute the number of bytes needed for the output + int tocopy = orig_data_size; + int string_off = 0; + *outlen = orig_data_size; + if (destlen < orig_data_size) { + *outlen = destlen; + tocopy = destlen; + } + + // copy in an ordered way all bytes of fragments in the buffer + for (i = 0; i < inlen && tocopy > 0; i++) { + char *f = get_data_ptr_from_fragment(in[i]); + int fsize = get_fragment_payload_size(in[i]); + int psize = tocopy > fsize ? fsize : tocopy; + memcpy(dest + string_off, f, psize); + tocopy -= psize; + string_off += psize; + } + return dest; +} + +bool check_matrix_fragment(char *frag, int frag_len, int piecesize) { + size_t offset = 0; + + while (offset < frag_len) { + if (is_invalid_fragment_header((fragment_header_t *)&frag[offset])) { + return false; + } + offset += piecesize + getHeaderSize(); + } + + return true; +} + +static inline void *alloc_data(size_t len) { + void *buf; + if (posix_memalign(&buf, 16, len) != 0) { + return NULL; + } + memset(buf, 0, len); + return buf; +} + +static inline void dealloc_data(void *pt, size_t len) { free(pt); } + +// instead of encoding K blocks of data, we divide and subencode blocks of +// 'piecesize' bytes. +// 'desc' : liberasurecode handle +// 'data' : the whole data to encode +// 'datalen' : the datalen +// 'piecesize' : the size of little blocks used for encoding +// 'ctx' : contains informations such as the ECN schema (see below) +// +void encode_chunk_prepare(int desc, char *data, int datalen, int piecesize, + struct encode_chunk_context *ctx) { + ctx->instance = liberasurecode_backend_instance_get_by_desc(desc); + int i; + const int k = ctx->instance->args.uargs.k; + const int m = ctx->instance->args.uargs.m; + + // here we compute the number of (k) subgroup of 'piecesize' bytes we can + // create + int block_size = piecesize * k; + ctx->number_of_subgroup = datalen / block_size; + if (ctx->number_of_subgroup * block_size != datalen) { + ctx->number_of_subgroup++; + } + + // Note: last chunk may be smaller than piecesize + ctx->chunk_size = piecesize; + + ctx->k = k; + ctx->m = m; + + ctx->datas = calloc(ctx->k, sizeof(char *)); + ctx->codings = calloc(ctx->m, sizeof(char *)); + ctx->frags_len = + (sizeof(fragment_header_t) + piecesize) * ctx->number_of_subgroup; + + for (i = 0; i < ctx->k; ++i) { + ctx->datas[i] = alloc_data(ctx->frags_len); + } + + for (i = 0; i < ctx->m; ++i) { + ctx->codings[i] = alloc_data(ctx->frags_len); + } +} + +// return real size of fragment header size +size_t get_fragment_header_size() { return sizeof(fragment_header_t); } + +int encode_chunk(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int nth); + +int encode_chunk_all(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int max) { + int i; + for (i = 0; i < max; i++) { + int err = encode_chunk(desc, data, datalen, ctx, i); + if (err != 0) { + return err; + } + } + return 0; +} + +// encode_chunk will encode a subset of the fragments data. +// It has to be considered that all the datas will not be divided in K blocks, +// but instead, they will be divided in N sub-blocks of K*chunksize fragments +// [-------------------data---------------------] +// {s1a | s1b | s1c | s1d}{s2a | s2b | s2c |d2d } +// fragment1 => [header1a|s1a|header2a|s2a] +// fragment2 => [header1b]s1b|header2b|s2b] +// fragment3 => [header1c]s1c|header2c|s2c] +// fragment4 => [header1d]s1d|header2d|s2d] +// this mapping will let be more efficient against get range pattern (when we +// are only interesting in having a small subset of data) especially when a +// whole fragment will be missing +int encode_chunk(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int nth) { + ec_backend_t ec = ctx->instance; + char *k_ref[ctx->k]; + char *m_ref[ctx->m]; + + int one_cell_size = sizeof(fragment_header_t) + ctx->chunk_size; + int i, ret; + char const *const dataend = data + datalen; + char *dataoffset = data + (ctx->k * nth) * ctx->chunk_size; + if (nth >= ctx->number_of_subgroup) { + return -1; + } + + // Do the mapping as described above + int tot_len_sum = 0; + for (i = 0; i < ctx->k; i++) { + char *ptr = &ctx->datas[i][nth * one_cell_size]; + fragment_header_t *hdr = (fragment_header_t *)ptr; + hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; + ptr = (char *)(hdr + 1); + if (dataoffset < dataend) { + int len_to_copy = ctx->chunk_size; + if (len_to_copy > dataend - dataoffset) { + len_to_copy = dataend - dataoffset; + } + tot_len_sum += len_to_copy; + memcpy(ptr, dataoffset, len_to_copy); + } + dataoffset += ctx->chunk_size; + k_ref[i] = ptr; + } + + for (i = 0; i < ctx->m; i++) { + char *ptr = &ctx->codings[i][nth * one_cell_size]; + fragment_header_t *hdr = (fragment_header_t *)ptr; + hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; + ptr = (char *)(hdr + 1); + m_ref[i] = ptr; + } + + // do the true encoding according the backend used (isa-l, cauchy ....) + ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, + ctx->chunk_size); + if (ret < 0) { + return -1; + } + + // fill the headers with true len, fragment len .... + ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, ctx->chunk_size, + tot_len_sum, k_ref, m_ref); + if (ret < 0) { + return -1; + } + return 0; +} + +int my_liberasurecode_encode_cleanup(int desc, size_t len, char **encoded_data, + char **encoded_parity) { + int i, k, m; + + ec_backend_t instance = liberasurecode_backend_instance_get_by_desc(desc); + if (NULL == instance) { + return -EBACKENDNOTAVAIL; + } + + k = instance->args.uargs.k; + m = instance->args.uargs.m; + + if (encoded_data) { + for (i = 0; i < k; i++) { + dealloc_data(encoded_data[i], len); + } + + free(encoded_data); + } + + if (encoded_parity) { + for (i = 0; i < m; i++) { + dealloc_data(encoded_parity[i], len); + } + free(encoded_parity); + } + + return 0; +} + +// Prepare memory, allocating stuff. +// Suitable for "buffermatrix": no data fragments allocated. +// Will also init chunk_size and number_of_subgroup +void encode_chunk_buffermatrix_prepare(int desc, char *data, int datalen, + int piecesize, int frags_len, + int number_of_subgroup, + struct encode_chunk_context *ctx) { + ctx->instance = liberasurecode_backend_instance_get_by_desc(desc); + int i; + const int k = ctx->instance->args.uargs.k; + const int m = ctx->instance->args.uargs.m; + + ctx->number_of_subgroup = number_of_subgroup; + + // Note: last subgroup may be smaller than the others + ctx->chunk_size = piecesize; + + ctx->k = k; + ctx->m = m; + + ctx->codings = calloc(ctx->m, sizeof(char *)); + ctx->frags_len = frags_len; + + for (i = 0; i < ctx->m; ++i) { + ctx->codings[i] = alloc_data(ctx->frags_len); + } +} + +// Encode a chunk using a buffer matrix as an input +// Same as above with the twist that data is not copied and can be directly +int encode_chunk_buffermatrix(int desc, + char *data, + int datalen, + int nbFrags, + struct encode_chunk_context *ctx, + int nth, + size_t fraglen) { + ec_backend_t ec = ctx->instance; + char *k_ref[ctx->k]; + char *m_ref[ctx->m]; + int one_cell_size = sizeof(fragment_header_t) + ctx->chunk_size; + int i, ret; + int tot_len_sum = 0; + + + if (nth >= ctx->number_of_subgroup) { + return -1; + } + // Create the array of "data" fragments + // No copy, just prepare the header + for (i = 0; i < ctx->k; i++) { + k_ref[i] = data + (nth + nbFrags * i) * one_cell_size; + fragment_header_t *hdr = (fragment_header_t *)k_ref[i]; + hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; + char *ptr = (char *)(hdr + 1); + k_ref[i] = ptr; + + // Computes actual data in the fragment + // If we are at the end of the data, we may have a smaller + // fragment than the others. fraglen will take of that. + int size = datalen - ((nth * ctx->k * ctx->chunk_size) + (i * fraglen)); + tot_len_sum += size > 0 ? (size > fraglen ? fraglen : size) : 0; + } + + + // "coding" fragments. Those ones are allocated above + for (i = 0; i < ctx->m; i++) { + char *ptr = &ctx->codings[i][nth * one_cell_size]; + fragment_header_t *hdr = (fragment_header_t *)ptr; + hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; + ptr = (char *)(hdr + 1); + m_ref[i] = ptr; + } + + + // do the true encoding according the backend used (isa-l, cauchy ....) + ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, + fraglen); + + if (ret < 0) { + return -1; + } + + ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, fraglen, + tot_len_sum, k_ref, m_ref); + if (ret < 0) { + return -1; + } + return 0; +} + +// Helper function to compute everything in one go +int encode_chunk_buffermatrix_all(int desc, char *data, int datalen, + int nbfrags, struct encode_chunk_context *ctx, + int max) { + int i; + + for (i = 0; i < max; i++) { + int err = encode_chunk_buffermatrix(desc, data, datalen, nbfrags, ctx, i, ctx->chunk_size); + if (err != 0) { + return err; + } + } + return 0; +} + +int my_liberasurecode_encode_buffermatrix_cleanup(int desc, size_t len, + char **encoded_data, + char **encoded_parity) { + ec_backend_t instance = liberasurecode_backend_instance_get_by_desc(desc); + if (NULL == instance) { + return -EBACKENDNOTAVAIL; + } + + const int m = instance->args.uargs.m; + + if (encoded_parity) { + int i; + for (i = 0; i < m; i++) { + dealloc_data(encoded_parity[i], len); + } + } + free(encoded_parity); + + return 0; +} diff --git a/backend.go b/backend.go index 6e6cdee..979045d 100644 --- a/backend.go +++ b/backend.go @@ -3,413 +3,7 @@ package erasurecode /* #cgo pkg-config: erasurecode-1 #include -#include -#include -#include - -// shims to make working with frag arrays easier -char ** makeStrArray(int n) { return calloc(n, sizeof (char *)); } -void freeStrArray(char ** arr) { free(arr); } - -// shims because the fragment headers use misaligned fields -uint64_t getOrigDataSize(struct fragment_header_s *header) { return header->meta.orig_data_size; } -uint32_t getBackendVersion(struct fragment_header_s *header) { return header->meta.backend_version; } -ec_backend_id_t getBackendID(struct fragment_header_s *header) { return header->meta.backend_id; } -uint32_t getECVersion(struct fragment_header_s *header) { return header->libec_version; } -int getHeaderSize() { return sizeof(struct fragment_header_s); } - -// linearize is used when we have all data fragment. Instead of doing a true decoding, we just -// reassemble all the fragment linearized in a buffer. This is mainly a copy of liberasurecode -// fragment_to_string function, except that we won't do any addionnal allocation -// -// /!\ This function does not perform any header checksum validation. -// If fragments must be validated checks 'check_matrix_fragment' -// -// 'k' the number of data fragment used for the encoding. -// 'in' is an array of all data frags, in their index order. -// 'inlen' is the array size -// 'dest' is an already allocated buffer where data will be linearized -// 'destlen' is the buffer size, and hence, the maximum number of bytes linearized -// 'outlen' is a pointer containing the number of bytes really linearized in dest (always lower or equal to destlen) -// it returns dest if nothing went wrong, else null -char* linearize(int k, char **in, int inlen, char *dest, uint64_t destlen, uint64_t *outlen) { - int i; - int curr_idx = 0; - int orig_data_size = -1; - - if (dest == NULL || outlen == NULL) { - return NULL; - } - - // The following perform small correctness checks before linearizing - // the buffer - int previous_idx = -1; - for (i = 0; i < inlen; i++) { - int index = get_fragment_idx(in[i]); - - if (orig_data_size < 0) { - orig_data_size = get_orig_data_size(in[i]); - } else if(get_orig_data_size(in[i]) != orig_data_size) { - return NULL; - } - - if (index >= k) { - return NULL; - } - - // Checks that the fragments are sorted. - if (previous_idx > index) { - return NULL; - } - previous_idx = index; - } - - // compute the number of bytes needed for the output - int tocopy = orig_data_size; - int string_off = 0; - *outlen = orig_data_size; - if(destlen < orig_data_size) { - *outlen = destlen; - tocopy = destlen; - } - - // copy in an ordered way all bytes of fragments in the buffer - for (i = 0; i < inlen && tocopy > 0; i++) { - char *f = get_data_ptr_from_fragment(in[i]); - int fsize = get_fragment_payload_size(in[i]); - int psize = tocopy > fsize ? fsize : tocopy; - memcpy(dest + string_off, f, psize); - tocopy -= psize; - string_off += psize; - } - return dest; -} - -bool check_matrix_fragment(char *frag, int frag_len, int piecesize) { - size_t offset = 0; - - bool aligned = (frag_len % (piecesize + getHeaderSize())) == 0; - if (!aligned) { - return false; - } - - while (offset < frag_len) { - if (is_invalid_fragment_header((fragment_header_t*)&frag[offset])) { - return false; - } - offset += piecesize + getHeaderSize(); - } - - return true; -} - - -struct encode_chunk_context { - ec_backend_t instance; // backend instance - char **datas; // the K datas - char **codings; // the M codings - unsigned int number_of_subgroup; // number of subchunk in each K part - unsigned int chunk_size; // datasize of each subchunk - unsigned int frags_len; // allocating size of each K+M objects - int blocksize; // k-bounds of data - int k; - int m; -}; - -static inline void *alloc_data(size_t len) { - void *buf; - if (posix_memalign(&buf, 16, len) != 0) { - return NULL; - } - memset(buf, 0, len); - return buf; -} - -static inline void dealloc_data(void *pt, size_t len) { - free(pt); -} - -// instead of encoding K blocks of data, we divide and subencode blocks of -// 'piecesize' bytes. -// 'desc' : liberasurecode handle -// 'data' : the whole data to encode -// 'datalen' : the datalen -// 'piecesize' : the size of little blocks used for encoding -// 'ctx' : contains informations such as the ECN schema (see below) -// -void encode_chunk_prepare(int desc, - char *data, - int datalen, - int piecesize, - struct encode_chunk_context *ctx) -{ - ctx->instance = liberasurecode_backend_instance_get_by_desc(desc); - int i; - const int k = ctx->instance->args.uargs.k; - const int m = ctx->instance->args.uargs.m; - - // here we compute the number of (k) subgroup of 'piecesize' bytes we can create - int block_size = piecesize * k; - ctx->number_of_subgroup = datalen / block_size; - if(ctx->number_of_subgroup * block_size != datalen) { - ctx->number_of_subgroup++; - } - - ctx->chunk_size = piecesize; - - ctx->k = k; - ctx->m = m; - - ctx->datas = calloc(ctx->k, sizeof(char*)); - ctx->codings = calloc(ctx->m, sizeof(char*)); - ctx->frags_len = (sizeof(fragment_header_t) + piecesize) * ctx->number_of_subgroup; - - for (i = 0; i < ctx->k; ++i) { - ctx->datas[i] = alloc_data(ctx->frags_len); - } - - for (i = 0; i < ctx->m; ++i) { - ctx->codings[i] = alloc_data(ctx->frags_len); - } -} - -// return real size of fragment header size -size_t get_fragment_header_size() { - return sizeof(fragment_header_t); -} - -int encode_chunk(int desc, char *data, int datalen, struct encode_chunk_context *ctx, int nth); - -int encode_chunk_all(int desc, char *data, int datalen, struct encode_chunk_context *ctx, int max) { - int i; - for (i = 0; i < max ; i++) { - int err = encode_chunk(desc, data, datalen, ctx, i); - if (err != 0) { - return err; - } - } - return 0; -} - -// encode_chunk will encode a subset of the fragments data. -// It has to be considered that all the datas will not be divided in K blocks, but instead, -// they will be divided in N sub-blocks of K*chunksize fragments -// [-------------------data---------------------] -// {s1a | s1b | s1c | s1d}{s2a | s2b | s2c |d2d } -// fragment1 => [header1a|s1a|header2a|s2a] -// fragment2 => [header1b]s1b|header2b|s2b] -// fragment3 => [header1c]s1c|header2c|s2c] -// fragment4 => [header1d]s1d|header2d|s2d] -// this mapping will let be more efficient against get range pattern (when we are only interesting in -// having a small subset of data) especially when a whole fragment will be missing -int encode_chunk(int desc, char *data, int datalen, struct encode_chunk_context *ctx, int nth) -{ - ec_backend_t ec = ctx->instance; - char *k_ref[ctx->k]; - char *m_ref[ctx->m]; - - int one_cell_size = sizeof(fragment_header_t) + ctx->chunk_size; - int i, ret; - char const *const dataend = data + datalen; - char *dataoffset = data + (ctx->k * nth) * ctx->chunk_size; - if (nth >= ctx->number_of_subgroup) { - return -1; - } - - // Do the mapping as described above - int tot_len_sum = 0; - for (i = 0; i < ctx->k; i++) { - char *ptr = &ctx->datas[i][nth * one_cell_size]; - fragment_header_t *hdr = (fragment_header_t*)ptr; - hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; - ptr = (char*) (hdr + 1); - if(dataoffset < dataend) { - int len_to_copy = ctx->chunk_size; - if (len_to_copy > dataend - dataoffset) { - len_to_copy = dataend - dataoffset; - } - tot_len_sum += len_to_copy; - memcpy(ptr, dataoffset, len_to_copy); - } - dataoffset += ctx->chunk_size; - k_ref[i] = ptr; - } - - for (i = 0; i < ctx->m; i++) { - char *ptr = &ctx->codings[i][nth * one_cell_size]; - fragment_header_t *hdr = (fragment_header_t*)ptr; - hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; - ptr = (char*) (hdr + 1); - m_ref[i] = ptr; - } - - // do the true encoding according the backend used (isa-l, cauchy ....) - ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, ctx->chunk_size); - if (ret < 0) { - fprintf(stderr, "error encode ret = %d\n", ret); - return -1; - } - - // fill the headers with true len, fragment len .... - ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, ctx->chunk_size, tot_len_sum, k_ref, m_ref); - if (ret < 0) { - fprintf(stderr, "error encode ret = %d\n", ret); - return -1; - } - return 0; -} - -int my_liberasurecode_encode_cleanup(int desc, - size_t len, - char **encoded_data, - char **encoded_parity) -{ - int i, k, m; - - ec_backend_t instance = liberasurecode_backend_instance_get_by_desc(desc); - if (NULL == instance) { - return -EBACKENDNOTAVAIL; - } - - k = instance->args.uargs.k; - m = instance->args.uargs.m; - - if (encoded_data) { - for (i = 0; i < k; i++) { - dealloc_data(encoded_data[i], len); - } - - free(encoded_data); - } - - if (encoded_parity) { - for (i = 0; i < m; i++) { - dealloc_data(encoded_parity[i], len); - } - free(encoded_parity); - } - - return 0; -} - -// Prepare memory, allocating stuff. Suitable for "buffermatrix": no data fragments allocated. -void encode_chunk_buffermatrix_prepare(int desc, - char *data, - int datalen, - int piecesize, - int frags_len, - int number_of_subgroup, - struct encode_chunk_context *ctx) -{ - ctx->instance = liberasurecode_backend_instance_get_by_desc(desc); - int i; - const int k = ctx->instance->args.uargs.k; - const int m = ctx->instance->args.uargs.m; - - ctx->number_of_subgroup = number_of_subgroup; - - ctx->chunk_size = piecesize; - - ctx->k = k; - ctx->m = m; - - ctx->codings = calloc(ctx->m, sizeof(char*)); - ctx->frags_len = frags_len; - - for (i = 0; i < ctx->m; ++i) { - ctx->codings[i] = alloc_data(ctx->frags_len); - } -} - -// Encode a chunk using a buffer matrix as an input -// Same as above with the twist that data is not copied and can be directly -static int encode_chunk_buffermatrix(int desc, char *data, int datalen, int nbFrags, struct encode_chunk_context *ctx, int nth) -{ - ec_backend_t ec = ctx->instance; - char *k_ref[ctx->k]; - char *m_ref[ctx->m]; - int one_cell_size = sizeof(fragment_header_t) + ctx->chunk_size; - int i, ret; - int tot_len_sum = 0; - - if (nth >= ctx->number_of_subgroup) { - return -1; - } - - // Create the array of "data" fragments - // No copy, just prepare the header - for (i = 0 ; i < ctx->k ; i ++) { - k_ref[i] = data + (nth + nbFrags * i) * one_cell_size; - fragment_header_t *hdr = (fragment_header_t*)k_ref[i]; - hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; - char *ptr = (char*) (hdr + 1); - k_ref[i] = ptr; - - // Computes actual data in the fragment - int size = datalen - (nth * ctx->k + i) * ctx->chunk_size; - tot_len_sum += size > 0 ? (size > ctx->chunk_size ? ctx->chunk_size: size) : 0; - } - - // "coding" fragments. Those ones are allocated above - for (i = 0; i < ctx->m; i++) { - char *ptr = &ctx->codings[i][nth * one_cell_size]; - fragment_header_t *hdr = (fragment_header_t*)ptr; - hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; - ptr = (char*) (hdr + 1); - m_ref[i] = ptr; - } - - // do the true encoding according the backend used (isa-l, cauchy ....) - ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, ctx->chunk_size); - if (ret < 0) { - fprintf(stderr, "error encode ret = %d\n", ret); - return -1; - } - - ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, ctx->chunk_size, tot_len_sum, k_ref, m_ref); - if (ret < 0) { - fprintf(stderr, "error encode ret = %d\n", ret); - return -1; - } - return 0; -} - -// Helper function to compute everything in one go -int encode_chunk_buffermatrix_all(int desc, char *data, int datalen, int nbfrags, struct encode_chunk_context *ctx, int max) { - int i; - - for (i = 0; i < max ; i++) { - int err = encode_chunk_buffermatrix(desc, data, datalen, nbfrags, ctx, i); - if (err != 0) { - return err; - } - } - return 0; -} - -int my_liberasurecode_encode_buffermatrix_cleanup(int desc, - size_t len, - char **encoded_data, - char **encoded_parity) -{ - ec_backend_t instance = liberasurecode_backend_instance_get_by_desc(desc); - if (NULL == instance) { - return -EBACKENDNOTAVAIL; - } - - const int m = instance->args.uargs.m; - - if (encoded_parity) { - int i; - for (i = 0; i < m; i++) { - dealloc_data(encoded_parity[i], len); - } - } - free(encoded_parity); - - return 0; -} - +#include "backend.h" */ import "C" import ( @@ -528,7 +122,7 @@ type pool struct { // Assuming a splitSize around 2MiB or less const maxBuffer int = 2 * 1024 * 1024 -func (p *pool) New(size int) (interface{}, []byte) { +func (p *pool) New(size int) (any, []byte) { if size <= p.max { b := p.p.Get().(*bytes.Buffer) return b, b.Bytes() @@ -537,7 +131,7 @@ func (p *pool) New(size int) (interface{}, []byte) { return nil, make([]byte, size) } -func (p *pool) Release(b interface{}) { +func (p *pool) Release(b any) { if b != nil { p.p.Put(b) } @@ -545,7 +139,7 @@ func (p *pool) Release(b interface{}) { var globalPool = &pool{ p: sync.Pool{ - New: func() interface{} { + New: func() any { return bytes.NewBuffer(make([]byte, maxBuffer)) }}, max: maxBuffer, @@ -591,25 +185,13 @@ func InitBackend(params Params) (Backend, error) { } else { backend.pool = &pool{ p: sync.Pool{ - New: func() interface{} { + New: func() any { return bytes.NewBuffer(make([]byte, params.MaxBlockSize)) }}, max: params.MaxBlockSize, } } - // Workaround on init bug of Jerasure - // Apparently, jerasure will crash if the - // first encode is done concurrently with other encode. - res, err := backend.Encode(bytes.Repeat([]byte("1"), 1000)) - - if err != nil { - backend.Close() - return Backend{}, err - } - - res.Free() - return backend, nil } @@ -627,8 +209,19 @@ func (backend *Backend) Close() error { // EncodeData is returned by all encode* functions type EncodeData struct { - Data [][]byte // Slice of []bytes ==> our K+N encoded fragments - Free func() // cleanup closure (to free C allocated data once it becomes useless) + Data [][]byte // Slice of []bytes ==> our K+N encoded fragments + Free func() // cleanup closure (to free C allocated data once it becomes useless) + RealDataSize int64 // the real size of the data, without considering the padding +} + +func (e EncodeData) DataLen() int64 { + return e.RealDataSize +} +func (e *EncodeData) GetFragment(index int) []byte { + if index < 0 || index >= len(e.Data) { + return nil + } + return e.Data[index][:e.RealDataSize] } // Encode is the general purpose encoding function. It encodes data according @@ -658,13 +251,14 @@ func (backend *Backend) Encode(data []byte) (*EncodeData, error) { return &EncodeData{result, func() { C.my_liberasurecode_encode_cleanup( backend.libecDesc, C.size_t(fragLength), dataFrags, parityFrags) - }}, nil + }, int64(fragLength)}, nil } // EncodeMatrixWithBufferMatrix encodes data in small subpart of chunkSize bytes func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize int) (*EncodeData, error) { var wg sync.WaitGroup var ctx C.struct_encode_chunk_context + var totLen int64 data := bm.Bytes() dataLen := bm.Length() @@ -675,6 +269,8 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize nbFrags := C.int(bm.SubGroups()) + // This prepares the context for encoding + // It allocates the coding fragments (not the data fragments) C.encode_chunk_buffermatrix_prepare(backend.libecDesc, pData, pDataLen, cChunkSize, C.int(bm.FragLen()), nbFrags, &ctx) @@ -684,7 +280,15 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize for i := 0; i < int(ctx.number_of_subgroup); i++ { go func(nth int) { - r := C.encode_chunk_buffermatrix(backend.libecDesc, pData, pDataLen, nbFrags, &ctx, C.int(nth)) + fragLen := C.size_t(chunkSize) + // last subgroup has a different size + if i == int(ctx.number_of_subgroup)-1 { + fragLen = C.size_t(bm.FragLenLastSubGroup()) + } + atomic.AddInt64(&totLen, int64(fragLen)+int64(backend.headerSize)) + r := C.encode_chunk_buffermatrix(backend.libecDesc, pData, pDataLen, + nbFrags, &ctx, C.int(nth), fragLen) + if r < 0 { atomic.AddUint32(&errCounter, 1) } @@ -697,7 +301,7 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize return &EncodeData{nil, func() { C.my_liberasurecode_encode_buffermatrix_cleanup( backend.libecDesc, C.size_t(ctx.frags_len), ctx.datas, ctx.codings) - }}, + }, totLen}, fmt.Errorf("error encoding chunk (%+v encoding failed)", errCounter) } result := make([][]byte, backend.K+backend.M) @@ -716,57 +320,7 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize runtime.KeepAlive(bm) C.my_liberasurecode_encode_buffermatrix_cleanup( backend.libecDesc, C.size_t(ctx.frags_len), ctx.datas, ctx.codings) - }}, nil -} - -// EncodeMatrix encodes data in small subpart of chunkSize bytes -func (backend *Backend) EncodeMatrix(data []byte, chunkSize int) (*EncodeData, error) { - var wg sync.WaitGroup - var ctx C.struct_encode_chunk_context - pData := (*C.char)(unsafe.Pointer(&data[0])) - pDataLen := C.int(len(data)) - cChunkSize := C.int(chunkSize) - - C.encode_chunk_prepare(backend.libecDesc, pData, pDataLen, cChunkSize, &ctx) - - var errCounter uint32 - - wg.Add(int(ctx.number_of_subgroup)) - - for i := 0; i < int(ctx.number_of_subgroup); i++ { - go func(nth int) { - r := C.encode_chunk(backend.libecDesc, pData, pDataLen, &ctx, C.int(nth)) - if r < 0 { - atomic.AddUint32(&errCounter, 1) - } - wg.Done() - }(i) - } - wg.Wait() - - if errCounter != 0 { - return &EncodeData{nil, func() { - C.my_liberasurecode_encode_cleanup( - backend.libecDesc, C.size_t(ctx.frags_len), ctx.datas, ctx.codings) - }}, - fmt.Errorf("error encoding chunk (%+v encoding failed)", errCounter) - } - result := make([][]byte, backend.K+backend.M) - fragLen := ctx.frags_len - for i := 0; i < backend.K; i++ { - str := cGetArrayItem(ctx.datas, i) - result[i] = (*[1 << 30]byte)(str)[:int(C.int(fragLen)):int(C.int(fragLen))] - - } - for i := 0; i < backend.M; i++ { - str := cGetArrayItem(ctx.codings, i) - result[i+backend.K] = (*[1 << 30]byte)(str)[:int(C.int(fragLen)):int(C.int(fragLen))] - } - - return &EncodeData{result, func() { - C.my_liberasurecode_encode_cleanup( - backend.libecDesc, C.size_t(ctx.frags_len), ctx.datas, ctx.codings) - }}, nil + }, totLen}, nil } // DecodeData is the structure returned by all Decode* function @@ -774,8 +328,17 @@ func (backend *Backend) EncodeMatrix(data []byte, chunkSize int) (*EncodeData, e // that clean some C dynamically allocated objects // If Free is not null, the closure should be used only when the Data is not needed anymore type DecodeData struct { - Data []byte - Free func() + Data []byte + Free func() + RealDataSize int64 +} + +func (d DecodeData) DataLen() int64 { + return d.RealDataSize +} + +func (d *DecodeData) GetFragment() []byte { + return d.Data[:d.RealDataSize] } // // bufPool is a pool of bytes.Buffer of max size maxBuffer. @@ -831,7 +394,6 @@ func (backend *Backend) ChunkInfo(fragRangeLen int, pieceSize int) ChunkInfo { if nrChunks*chunkSize != fragRangeLen { nrChunks++ } - return ChunkInfo{ ChunkSize: chunkSize, NrChunk: nrChunks, @@ -851,7 +413,7 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int /* Fragments are sorted beforehand with the index of the first chunk. All chunks of a fragments share the same index. */ fragsIndex := make([]int, len(frags)) - for i := 0; i < len(frags); i += 1 { + for i := range frags { fragsIndex[i] = i } @@ -892,7 +454,7 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int return nil, errors.New("gaps in the provided fragments") } - lastDataFragIdxExcl += 1 + lastDataFragIdxExcl++ previousFragIdx = idx } fragsIndex = fragsIndex[:lastDataFragIdxExcl] @@ -946,8 +508,9 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int } return &DecodeData{ - data[:totLen:totLen], - func() { + Data: data[:totLen:totLen], + RealDataSize: int64(totLen), + Free: func() { backend.pool.Release(dataB) }}, nil } @@ -955,7 +518,7 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int // DecodeMatrix tries to reconstruct the data fragments and returns the linearized data. func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, pieceSize int) (*DecodeData, error) { if len(frags) == 0 { - return nil, errors.New("Decoding requires at least one fragment") + return nil, errors.New("decoding requires at least one fragment") } fragRangeLen := len(frags[0]) @@ -972,8 +535,9 @@ func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, pieceSize int) ( var totLen int64 for i := 0; i < chunkInfo.NrChunk; i++ { vect := make([][]byte, len(frags)) + nextBound := min((i+1)*chunkInfo.ChunkSize, fragRangeLen) for j := range frags { - vect[j] = frags[j][i*chunkInfo.ChunkSize : (i+1)*chunkInfo.ChunkSize] + vect[j] = frags[j][i*chunkInfo.ChunkSize : nextBound] } subdata, err := backend.Decode(vect) @@ -985,9 +549,12 @@ func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, pieceSize int) ( subdata.Free() } - return &DecodeData{data[:totLen:totLen], func() { - backend.pool.Release(dataB) - }}, nil + return &DecodeData{ + Data: data[:totLen:totLen], + RealDataSize: int64(totLen), + Free: func() { + backend.pool.Release(dataB) + }}, nil } // RangeMatrix describes information needed to decode a range of encoded frags @@ -1039,7 +606,7 @@ type RangeMatrix struct { * the relevant requested range. The decoded buffer * will look like [p1 p2 p3 p4(*) p1(*) p2(*) p3 p4]. * Chunks not marked with a '*' are discarded. Note how - * the heading and trailing is unecessary and could be + * the heading and trailing is unnecessary and could be * discarded in the ideal case. * * Perfect cases occur when the request span a single group (column): @@ -1050,41 +617,69 @@ type RangeMatrix struct { * p4 [-[*]- -] * */ -func (backend *Backend) GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize int) *RangeMatrix { - chunkSize := pieceSize + backend.headerSize - groupSize := pieceSize * backend.K +func (backend *Backend) GetRangeMatrix(startIncl, endIncl, cellDataSize, fragSize int) *RangeMatrix { + nrColumns := backend.K + cellSize := cellDataSize + backend.headerSize + lineSize := cellDataSize * nrColumns /* At this point we don't know what is the true payload size, but we can at least check that it doesn't exceed the maximum payload that this configuration can handle. */ - nrChunkByFrag := fragSize / chunkSize - dataLenPerFrag := fragSize - nrChunkByFrag*backend.headerSize - maxDataLen := dataLenPerFrag * backend.K - if startIncl >= maxDataLen || endIncl >= maxDataLen || startIncl > endIncl { + nrLines := fragSize / cellSize + // + if nrLines*cellSize < fragSize { + nrLines++ + } + + /* Inside a fragment (ie, inside a column), what is the + stored amount of data? */ + fragDataSize := fragSize - nrLines*backend.headerSize + + maxDataSize := fragDataSize * nrColumns + + if startIncl >= maxDataSize || endIncl >= maxDataSize || startIncl > endIncl { return nil } - pieceStartIncl := startIncl / pieceSize - pieceEndIncl := endIncl / pieceSize + /* convert cells indices to (x,y) indices */ + + /* Considering our (x,y) matrix as a single row, what + are the indices of the start and end of the range we are interested in? */ + idxStart := startIncl / cellDataSize + idxEnd := endIncl / cellDataSize - groupStartIncl := pieceStartIncl / backend.K - groupEndIncl := pieceEndIncl / backend.K + /* as we have the indices of the first cell and the last cell, + * we can derive the indices of the first line and the last line + * Based on this first line and last line, we can deduce + * the amount of data to read in each fragment. + */ + lineStart := idxStart / nrColumns + lineEnd := idxEnd / nrColumns - fragFirstIncl := pieceStartIncl % backend.K - fragCount := (pieceEndIncl + 1 - pieceStartIncl) - dataOffset := pieceStartIncl * pieceSize + /* + * as we have the indices of the first cell and the last cell, + * we can compute the first column (e.g the first fragment) + * where to start the read + */ + columnStart := idxStart % nrColumns + + nrCellsToRead := (idxEnd + 1 - idxStart) + dataOffset := idxStart * cellDataSize + + totalLines := (maxDataSize + lineSize - 1) / lineSize + isLastStripe := (lineEnd == totalLines-1) /* When wrapping around, we read the full groups. */ - if fragFirstIncl+fragCount > backend.K { - fragFirstIncl = 0 - fragCount = backend.K - dataOffset = groupStartIncl * groupSize + if columnStart+nrCellsToRead > nrColumns || isLastStripe { + columnStart = 0 + nrCellsToRead = nrColumns + dataOffset = lineStart * lineSize } /* For each fragment, this is the minimum range to read -- including the header -- to decode or repair the data. */ - inFragRangeStartIncl := groupStartIncl * chunkSize - inFragRangeEndExcl := (groupEndIncl + 1) * chunkSize + inFragRangeStartIncl := lineStart * cellSize + inFragRangeEndExcl := (lineEnd + 1) * cellSize /* The output buffer only contains the data necessary to read the range, and the requested range must be adjusted to be relative @@ -1096,13 +691,13 @@ func (backend *Backend) GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize i linearizedRangeStartIncl := startIncl - dataOffset /* Decoding always works on a group boundary. */ - decodedRangeStartIncl := startIncl - groupStartIncl*groupSize + decodedRangeStartIncl := startIncl - lineStart*lineSize return &RangeMatrix{ ReqStartIncl: startIncl, ReqEndIncl: endIncl, - FragFirstIncl: fragFirstIncl, - FragCount: fragCount, + FragFirstIncl: columnStart, + FragCount: nrCellsToRead, InFragRangeStartIncl: inFragRangeStartIncl, InFragRangeEndExcl: inFragRangeEndExcl, DecodedRangeStartIncl: decodedRangeStartIncl, @@ -1136,8 +731,10 @@ func (backend *Backend) Decode(frags [][]byte) (*DecodeData, error) { runtime.KeepAlive(frags) // prevent frags from being GC-ed during decode C.freeStrArray(cFrags) - return &DecodeData{(*[1 << 30]byte)(unsafe.Pointer(data))[:int(dataLength):int(dataLength)], - func() { + return &DecodeData{ + Data: (*[1 << 30]byte)(unsafe.Pointer(data))[:int(dataLength):int(dataLength)], + RealDataSize: int64(dataLength), + Free: func() { C.liberasurecode_decode_cleanup(backend.libecDesc, data) }}, nil @@ -1199,14 +796,17 @@ func (backend *Backend) ReconstructMatrix(frags [][]byte, fragIndex int, pieceSi dataB, data := backend.pool.New(dlen) var errCounter uint32 + var totLen int64 // TODO use goroutines here to leverage multicore computation wg.Add(chunkNr) for i := 0; i < chunkNr; i++ { go func(chunkIdx int) { vect := make([][]byte, len(frags)) - for j := 0; j < len(frags); j++ { - vect[j] = frags[j][chunkIdx*chunkSize : (chunkIdx+1)*chunkSize] + for j := range frags { + length := min(len(frags[j]), (chunkIdx+1)*chunkSize) + vect[j] = frags[j][chunkIdx*chunkSize : length] } + atomic.AddInt64(&totLen, int64(len(vect[0]))) if err := backend.reconstruct(vect, fragIndex, data[chunkIdx*chunkSize:]); err != nil { atomic.AddUint32(&errCounter, 1) } @@ -1217,9 +817,13 @@ func (backend *Backend) ReconstructMatrix(frags [][]byte, fragIndex int, pieceSi if errCounter != 0 { return nil, errors.New("sub reconstruction failed") } - return &DecodeData{data[:dlen:dlen], func() { - backend.pool.Release(dataB) - }}, nil + + return &DecodeData{ + Data: data[:totLen:totLen], + RealDataSize: int64(totLen), + Free: func() { + backend.pool.Release(dataB) + }}, nil } // IsInvalidFragment is a wrapper on C implementation diff --git a/backend.h b/backend.h new file mode 100644 index 0000000..fa6eb7d --- /dev/null +++ b/backend.h @@ -0,0 +1,70 @@ +#ifndef BACKEND_H +#define BACKEND_H + +#include +#include +#include + + +struct encode_chunk_context { + ec_backend_t instance; // backend instance + char **datas; // the K datas + char **codings; // the M codings + unsigned int number_of_subgroup; // number of subchunk in each K part + unsigned int chunk_size; // datasize of each subchunk + unsigned int frags_len; // allocating size of each K+M objects + int blocksize; // k-bounds of data + int k; + int m; + }; + +char **makeStrArray(int n); + +void freeStrArray(char **arr); + +uint64_t getOrigDataSize(struct fragment_header_s *header); + +uint32_t getBackendVersion(struct fragment_header_s *header); + +ec_backend_id_t getBackendID(struct fragment_header_s *header); + +uint32_t getECVersion(struct fragment_header_s *header); + +int getHeaderSize(void); + +char *linearize(int k, char **in, int inlen, char *dest, uint64_t destlen, + uint64_t *outlen); + +bool check_matrix_fragment(char *frag, int frag_len, int piecesize); + +void encode_chunk_prepare(int desc, char *data, int datalen, int piecesize, + struct encode_chunk_context *ctx); + +size_t get_fragment_header_size(void); + +int encode_chunk(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int nth); + +int encode_chunk_all(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int max); + +int my_liberasurecode_encode_cleanup(int desc, size_t len, char **encoded_data, + char **encoded_parity); + +void encode_chunk_buffermatrix_prepare(int desc, char *data, int datalen, + int piecesize, int frags_len, + int number_of_subgroup, + struct encode_chunk_context *ctx); + +int encode_chunk_buffermatrix_all(int desc, char *data, int datalen, + int nbfrags, struct encode_chunk_context *ctx, + int max); + +int encode_chunk_buffermatrix(int desc, char *data, int datalen, int nbFrags, + struct encode_chunk_context *ctx, int nth, size_t frag_len); + +int my_liberasurecode_encode_buffermatrix_cleanup(int desc, size_t len, + char **encoded_data, + char **encoded_parity); + +#endif \ No newline at end of file diff --git a/backend_test.go b/backend_test.go index 5645367..724a136 100644 --- a/backend_test.go +++ b/backend_test.go @@ -3,7 +3,9 @@ package erasurecode import ( "bytes" cryptorand "crypto/rand" + "encoding/binary" "fmt" + "io" "math/rand" "reflect" "strings" @@ -12,14 +14,10 @@ import ( "testing/quick" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var validParams = []Params{ - {Name: "liberasurecode_rs_vand", K: 2, M: 1}, - {Name: "liberasurecode_rs_vand", K: 10, M: 4}, - {Name: "liberasurecode_rs_vand", K: 4, M: 3}, - {Name: "liberasurecode_rs_vand", K: 8, M: 4}, - {Name: "liberasurecode_rs_vand", K: 15, M: 4}, {Name: "isa_l_rs_vand", K: 2, M: 1}, {Name: "isa_l_rs_vand", K: 2, M: 1, MaxBlockSize: 1}, {Name: "isa_l_rs_vand", K: 2, M: 1, MaxBlockSize: maxBuffer * 2}, @@ -148,76 +146,77 @@ func TestEncodeDecode(t *testing.T) { continue } backend, err := InitBackend(params) + if err != nil { t.Errorf("Error creating backend %v: %q", params, err) continue } + defer backend.Close() for patternIndex, pattern := range testPatterns { - data, err := backend.Encode(pattern) + bm := NewBufferMatrix(DefaultChunkSize, len(pattern), backend.K) + + _, err = io.Copy(bm, bytes.NewReader(pattern)) if err != nil { - t.Errorf("Error encoding %v: %q", params, err) + t.Errorf("Error copying pattern to buffer matrix: %q", err) break } + bm.Finish() + + data, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer data.Free() expectedVersion := GetVersion() frags := data.Data for index, frag := range frags { - info := GetFragmentInfo(frag) - if info.Index != index { - t.Errorf("Expected frag %v to have index %v; got %v", index, index, info.Index) - } - if info.Size != len(frag)-80 { // 80 == sizeof (struct fragment_header_s) - t.Errorf("Expected frag %v to have size %v; got %v", index, len(frag)-80, info.Size) - } - if info.OrigDataSize != uint64(len(pattern)) { - t.Errorf("Expected frag %v to have orig_data_size %v; got %v", index, len(pattern), info.OrigDataSize) - } - if info.BackendName != params.Name { - t.Errorf("Expected frag %v to have backend %v; got %v", index, params.Name, info.BackendName) - } - if info.ErasureCodeVersion != expectedVersion { - t.Errorf("Expected frag %v to have EC version %v; got %v", index, expectedVersion, info.ErasureCodeVersion) - } - if !info.IsValid { - t.Errorf("Expected frag %v to be valid", index) - } - } - decode := func(frags [][]byte, description string) bool { - decoded, err := backend.Decode(frags) - if err != nil { - t.Errorf("%v: %v: %q for pattern %d", description, backend, err, patternIndex) - return false - } else if !bytes.Equal(decoded.Data, pattern) { - t.Errorf("%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, decoded.Data) - return false + for i := range bm.SubGroups() { + + start := i * (DefaultChunkSize + 80) + end := start + DefaultChunkSize + 80 + if i == bm.SubGroups()-1 { + end = start + bm.FragLenLastSubGroup() + 80 + } + + piece := frag[start:end] + require.True(t, backend.ValidateFragmentMatrix(piece, end-start-80)) + + info := GetFragmentInfo(piece) + if info.Index != index { + t.Errorf("Expected frag %v to have index %v; got %v", index, index, info.Index) + } + if info.Size != len(piece)-80 { // 80 == sizeof (struct fragment_header_s) + t.Errorf("Expected frag %v to have size %v; got %v", index, len(piece)-80, info.Size) + } + if info.BackendName != params.Name { + t.Errorf("Expected frag %v to have backend %v; got %v", index, params.Name, info.BackendName) + } + if info.ErasureCodeVersion != expectedVersion { + t.Errorf("Expected frag %v to have EC version %v; got %v", index, expectedVersion, info.ErasureCodeVersion) + } + if !info.IsValid { + t.Errorf("Expected frag %v to be valid", index) + } } - decoded.Free() - return true } - var good bool - good = decode(frags, "all frags") - good = good && decode(shuf(frags), "all frags, shuffled") - good = good && decode(frags[:params.K], "data frags") - good = good && decode(shuf(frags[:params.K]), "shuffled data frags") - good = good && decode(frags[params.M:], "with parity frags") - good = good && decode(shuf(frags[params.M:]), "shuffled parity frags") - - if !good { - break + decode := func(frags [][]byte, description string) { + decoded, err := backend.DecodeMatrix(frags, DefaultChunkSize) + require.NoError(t, err) + defer decoded.Free() + require.True(t, bytes.Equal(decoded.Data, pattern), + "%v:%d(%v) pattern: %v, got: %q", + description, patternIndex, backend, pattern, decoded.Data) } - data.Free() - } - if _, err := backend.Decode([][]byte{}); err == nil { - t.Errorf("Expected error when decoding from empty fragment array") - } + decode(frags, "all frags") + decode(shuf(frags), "all frags, shuffled") + decode(frags[:params.K], "data frags") + decode(shuf(frags[:params.K]), "shuffled data frags") + decode(frags[params.M:], "with parity frags") + decode(shuf(frags[params.M:]), "shuffled parity frags") - err = backend.Close() - if err != nil { - t.Errorf("Error closing backend %v: %q", backend, err) } } } @@ -233,41 +232,35 @@ func TestReconstruct(t *testing.T) { _ = backend.Close() continue } - for patternIndex, pattern := range testPatterns { - data, err := backend.Encode(pattern) - frags := data.Data - if err != nil { - t.Errorf("Error encoding %v: %q", params, err) - } - - reconstruct := func(recon_frags [][]byte, frag_index int, description string) bool { - data, err := backend.Reconstruct(recon_frags, frag_index) - if err != nil { - t.Errorf("%v: %v: %q for pattern %d", description, backend, err, patternIndex) - return false - } else if !bytes.Equal(data, frags[frag_index]) { - t.Errorf("%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, data) - return false - } - return true - } + defer func() { + _ = backend.Close() + }() - var good bool - good = reconstruct(shuf(frags[:params.K]), params.K+params.M-1, "last frag from data frags") - good = good && reconstruct(shuf(frags[params.M:]), 0, "first frag with parity frags") - if !good { - break - } - data.Free() - } + for patternIndex, pattern := range testPatterns { - if _, err := backend.Reconstruct([][]byte{}, 0); err == nil { - t.Errorf("Expected error when reconstructing from empty fragment array") - } + t.Run(fmt.Sprintf("%s_%d_%d-%d-%d", + params.Name, params.K, params.M, + patternIndex, + len(pattern)), + func(t *testing.T) { + bm := NewBufferMatrix(DefaultChunkSize, len(pattern), backend.K) + _, err = io.Copy(bm, bytes.NewReader(pattern)) + require.NoError(t, err) + bm.Finish() + data, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer data.Free() + frags := data.Data - err = backend.Close() - if err != nil { - t.Errorf("Error closing backend %v: %q", backend, err) + reconstruct := func(recon_frags [][]byte, frag_index int, description string) { + data, err := backend.ReconstructMatrix(recon_frags, frag_index, DefaultChunkSize) + require.NoError(t, err, "%v: %v: %q for pattern %d", description, backend, err, patternIndex) + defer data.Free() + require.True(t, bytes.Equal(data.Data, frags[frag_index]), "%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, data.Data) + } + reconstruct(shuf(frags[:params.K]), params.K+params.M-1, "last frag from data frags") + reconstruct(shuf(frags[params.M:]), 0, "first frag with parity frags") + }) } } } @@ -283,112 +276,124 @@ func TestIsInvalidFragment(t *testing.T) { _ = backend.Close() continue } + defer func() { + _ = backend.Close() + }() for patternIndex, pattern := range testPatterns { - data, err := backend.Encode(pattern) - if err != nil { - t.Errorf("Error encoding %v: %q", params, err) - continue - } - frags := data.Data - for index, frag := range frags { - if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid for pattern %d", backend, index, patternIndex) - } - fragCopy := make([]byte, len(frag)) - copy(fragCopy, frag) - - // corrupt the frag - corruptedByte := rand.Intn(len(frag)) //nolint:gosec - for 71 <= corruptedByte && corruptedByte < 80 { - // in the alignment padding -- try again - corruptedByte = rand.Intn(len(frag)) //nolint:gosec - } - frag[corruptedByte] ^= 0xff - if !backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly valid after inverting byte %d for pattern %d", backend, index, corruptedByte, patternIndex) - } - if corruptedByte < 4 || 8 <= corruptedByte && corruptedByte <= 59 { - /** corruption is in metadata; claim we were created by a version of - * libec that predates metadata checksums. Note that - * Note that a corrupted fragment size (bytes 4-7) will lead to a - * segfault when we try to verify the fragment -- there's a reason - * we added metadata checksums! - */ - copy(frag[63:67], []byte{9, 1, 1, 0}) - if 20 <= corruptedByte && corruptedByte <= 53 { - /** Corrupted data checksum type or data checksum - * We may or may not detect this type of error; in particular, - * - if data checksum type is not in ec_checksum_type_t, - * it is ignored - * - if data checksum is mangled, we may still be valid - * under the "alternative" CRC32; this seems more likely - * with the byte inversion when the data is short - * Either way, though, clearing the checksum type should make - * it pass. - */ - frag[20] = 0 - if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid after clearing metadata crc and disabling data crc", backend, index) - } - } else if corruptedByte >= 54 || 0 <= corruptedByte && corruptedByte < 4 { - /** Some corruptions of some bytes are still detectable. Since we're - * inverting the byte, we can detect: - * - frag index -- bytes 0-3 - * - data checksum type -- byte 20 - * - data checksum mismatch -- byte 54 - * - backend id -- byte 55 - * - backend version -- bytes 56-59 + bm := NewBufferMatrix(DefaultChunkSize, len(pattern), backend.K) + _, err = io.Copy(bm, bytes.NewReader(pattern)) + require.NoError(t, err) + bm.Finish() + data, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer data.Free() + + parts := data.Data + for index, part := range parts { + for i := range bm.SubGroups() { + + start := i * (DefaultChunkSize + 80) + end := start + DefaultChunkSize + 80 + if i == bm.SubGroups()-1 { + end = start + bm.FragLenLastSubGroup() + 80 + } + + frag := part[start:end] + + if backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly invalid for pattern %d", backend, index, patternIndex) + } + fragCopy := make([]byte, len(frag)) + copy(fragCopy, frag) + + // corrupt the frag + corruptedByte := rand.Intn(len(frag)) //nolint:gosec + for 71 <= corruptedByte && corruptedByte < 80 { + // in the alignment padding -- try again + corruptedByte = rand.Intn(len(frag)) //nolint:gosec + } + frag[corruptedByte] ^= 0xff + if !backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly valid after inverting byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + } + if corruptedByte < 4 || 8 <= corruptedByte && corruptedByte <= 59 { + /** corruption is in metadata; claim we were created by a version of + * libec that predates metadata checksums. Note that + * Note that a corrupted fragment size (bytes 4-7) will lead to a + * segfault when we try to verify the fragment -- there's a reason + * we added metadata checksums! */ - if !backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly still valid after clearing metadata crc", backend, index) + copy(frag[63:67], []byte{9, 1, 1, 0}) + if 20 <= corruptedByte && corruptedByte <= 53 { + /** Corrupted data checksum type or data checksum + * We may or may not detect this type of error; in particular, + * - if data checksum type is not in ec_checksum_type_t, + * it is ignored + * - if data checksum is mangled, we may still be valid + * under the "alternative" CRC32; this seems more likely + * with the byte inversion when the data is short + * Either way, though, clearing the checksum type should make + * it pass. + */ + frag[20] = 0 + if backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly invalid after clearing metadata crc and disabling data crc", backend, index) + } + } else if corruptedByte >= 54 || 0 <= corruptedByte && corruptedByte < 4 { + /** Some corruptions of some bytes are still detectable. Since we're + * inverting the byte, we can detect: + * - frag index -- bytes 0-3 + * - data checksum type -- byte 20 + * - data checksum mismatch -- byte 54 + * - backend id -- byte 55 + * - backend version -- bytes 56-59 + */ + if !backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly still valid after clearing metadata crc", backend, index) + } + } else { + if backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly invalid after clearing metadata crc", backend, index) + } } - } else { + } else if corruptedByte >= 67 { + copy(frag[20:25], []byte{1, 0, 0, 0, 0}) + // And since we've changed the metadata, roll back version as above... + copy(frag[63:67], []byte{9, 1, 1, 0}) if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid after clearing metadata crc", backend, index) + t.Errorf("%v: frag %v unexpectedly invalid after clearing data crc", backend, index) + t.FailNow() } } - } else if corruptedByte >= 67 { - copy(frag[20:25], []byte{1, 0, 0, 0, 0}) - // And since we've changed the metadata, roll back version as above... - copy(frag[63:67], []byte{9, 1, 1, 0}) - if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid after clearing data crc", backend, index) - t.FailNow() - } - } - frag[corruptedByte] ^= 0xff - copy(frag[63:67], fragCopy[63:67]) - copy(frag[20:25], fragCopy[20:25]) - - if !bytes.Equal(frag, fragCopy) { - for i, orig := range fragCopy { - if frag[i] != orig { - t.Logf("%v != %v at index %v", frag[i], orig, i) + frag[corruptedByte] ^= 0xff + copy(frag[63:67], fragCopy[63:67]) + copy(frag[20:25], fragCopy[20:25]) + + if !bytes.Equal(frag, fragCopy) { + for i, orig := range fragCopy { + if frag[i] != orig { + t.Logf("%v != %v at index %v", frag[i], orig, i) + } } + t.Fatal(corruptedByte, frag, fragCopy) } - t.Fatal(corruptedByte, frag, fragCopy) - } - frag[corruptedByte]++ - if !backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly valid after incrementing byte %d for pattern %d", backend, index, corruptedByte, patternIndex) - } - frag[corruptedByte] -= 2 - if corruptedByte >= 63 && corruptedByte < 67 && frag[corruptedByte] != 0xff { - if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid after decrementing version byte %d for pattern %d", backend, index, corruptedByte, patternIndex) - } - } else { + frag[corruptedByte]++ if !backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly valid after decrementing byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + t.Errorf("%v: frag %v unexpectedly valid after incrementing byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + } + frag[corruptedByte] -= 2 + if corruptedByte >= 63 && corruptedByte < 67 && frag[corruptedByte] != 0xff { + if backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly invalid after decrementing version byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + } + } else { + if !backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly valid after decrementing byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + } } } } - data.Free() - } - err = backend.Close() - if err != nil { - t.Errorf("Error closing backend %v: %q", backend, err) } } } @@ -428,7 +433,7 @@ func TestGC(t *testing.T) { input := bytes.Repeat([]byte("X"), 1000000) backend, err := InitBackend( Params{ - Name: "liberasurecode_rs_vand", + Name: "isa_l_rs_vand", K: 2, M: 1, }) @@ -444,52 +449,41 @@ func TestGC(t *testing.T) { }{{ "Reconstruct", func() { - encoded, err := backend.Encode(input) + bm := NewBufferMatrix(DefaultChunkSize, len(input), backend.K) + _, err = io.Copy(bm, bytes.NewReader(input)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer encoded.Free() - if err != nil { - t.Fatal("cannot encode data") - return - } vect := encoded.Data - defer encoded.Free() oldData := vect[0][:] // force a copy - data, err := backend.Reconstruct(vect[1:3], 0) - if err != nil { - t.Fatalf("cannot reconstruct data, %s", err) - return - } - - if len(data) != len(oldData) { - t.Fatal("reconstructing failed") - return - } + data, err := backend.ReconstructMatrix(vect[1:3], 0, DefaultChunkSize) + require.NoError(t, err) + defer data.Free() + require.True(t, bytes.Equal(data.Data, oldData)) }, }, { "Decode", func() { - encoded, err := backend.Encode(input) - - if err != nil { - t.Fatal("cannot encode data") - return - } + bm := NewBufferMatrix(DefaultChunkSize, len(input), backend.K) + _, err = io.Copy(bm, bytes.NewReader(input)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer encoded.Free() + vect := encoded.Data - decoded, err := backend.Decode(vect[0:2]) - if err != nil { - t.Fatalf("cannot decode data: %v", err) - return - } + decoded, err := backend.DecodeMatrix(vect[0:2], DefaultChunkSize) + require.NoError(t, err) defer decoded.Free() - data := decoded.Data - if len(data) != len(input) { - t.Fatal("decoding failed") - return - } + require.True(t, bytes.Equal(decoded.Data, input)) }, }, } @@ -510,7 +504,7 @@ func TestGC(t *testing.T) { wg.Wait() }) } - backend.Close() + _ = backend.Close() } func TestAvailableBackends(t *testing.T) { @@ -530,14 +524,15 @@ func BenchmarkEncode(b *testing.B) { buf := bytes.Repeat([]byte("A"), 1024*1024) b.ResetTimer() for i := 0; i < b.N; i++ { - encoded, err := backend.Encode(buf) - - if err != nil { - b.Fatal(err) - } - encoded.Free() + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) + defer encoded.Free() } - backend.Close() + _ = backend.Close() } const DefaultChunkSize = 32768 @@ -569,29 +564,25 @@ func BenchmarkLinearizeM(b *testing.B) { if err != nil { b.Fatal("cannot create backend", err) } - defer backend.Close() + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), test.size) - encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) - - if err != nil { - b.Fatal(err) - } + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err = io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer encoded.Free() b.ResetTimer() for i := 0; i < b.N; i++ { decoded, err := backend.LinearizeMatrix(encoded.Data, DefaultChunkSize) - if err != nil { - b.Fatal(err) - } - if decoded != nil { - if decoded.Free != nil { - decoded.Free() - } - } else { - b.Fatal("decoded is nil") - } + require.NoError(b, err) + defer decoded.Free() + require.True(b, bytes.Equal(decoded.Data, buf)) } }) } @@ -604,30 +595,27 @@ func BenchmarkDecodeM(b *testing.B) { if err != nil { b.Fatal("cannot create backend", err) } - defer backend.Close() + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), test.size) - encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err = io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) - if err != nil { - b.Fatal(err) - } defer encoded.Free() data := encoded.Data[1:] b.ResetTimer() for i := 0; i < b.N; i++ { decoded, err := backend.DecodeMatrix(data, DefaultChunkSize) - if err != nil { - b.Fatal(err) - } - if decoded != nil { - if decoded.Free != nil { - decoded.Free() - } - } else { - b.Fatal("decoded is nil") - } + require.NoError(b, err) + defer decoded.Free() + require.True(b, bytes.Equal(decoded.Data, buf)) } }) } @@ -640,51 +628,25 @@ func BenchmarkReconstruct(b *testing.B) { if err != nil { b.Fatal("cannot create backend", err) } - defer backend.Close() + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), test.size) - encoded, err := backend.Encode(buf) - - if err != nil { - b.Fatal(err) - } + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err = io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer encoded.Free() flags := encoded.Data[1:] b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := backend.Reconstruct(flags, 0) - if err != nil { - b.Fatal(err) - } - } - }) - } -} - -func BenchmarkReconstructM(b *testing.B) { - for _, test := range decodeTests { - b.Run(test.String(), func(b *testing.B) { - backend, err := InitBackend(test.p) - if err != nil { - b.Fatal("cannot create backend", err) - } - defer backend.Close() - - buf := bytes.Repeat([]byte("A"), test.size) - encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) - - if err != nil { - b.Fatal(err) - } - defer encoded.Free() - flags := encoded.Data[1:] - b.ResetTimer() - for i := 0; i < b.N; i++ { - ddata, err := backend.ReconstructMatrix(flags, 0, DefaultChunkSize) - if err != nil { - b.Fatal(err) - } - ddata.Free() + data, err := backend.ReconstructMatrix(flags, 0, DefaultChunkSize) + require.NoError(b, err, "cannot reconstruct matrix: %v", err) + defer data.Free() + require.True(b, bytes.Equal(data.Data, encoded.Data[0])) } }) } @@ -703,18 +665,28 @@ func BenchmarkMatrix(b *testing.B) { func(b *testing.B) { dtest.p.Checksum = crc backend, _ := InitBackend(dtest.p) + defer func() { + _ = backend.Close() + }() b.Run("Encode", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - encoded, err := backend.EncodeMatrix(buf, blockSize) - if err != nil { - b.Fatal(err) - } - encoded.Free() + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) + defer encoded.Free() } }) b.Run("Decode", func(b *testing.B) { - encoded, _ := backend.EncodeMatrix(buf, blockSize) + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer encoded.Free() b.ResetTimer() @@ -727,18 +699,21 @@ func BenchmarkMatrix(b *testing.B) { } }) b.Run("Reconstruct", func(b *testing.B) { - encoded, _ := backend.EncodeMatrix(buf, blockSize) + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer encoded.Free() b.ResetTimer() for i := 0; i < b.N; i++ { + decoded, err := backend.ReconstructMatrix(encoded.Data[1:], 0, blockSize) - if err != nil { - b.Fatal(err) - } - decoded.Free() + require.NoError(b, err) + defer decoded.Free() } }) - backend.Close() }) } }) @@ -747,21 +722,25 @@ func BenchmarkMatrix(b *testing.B) { func BenchmarkDecode(b *testing.B) { backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: 4, M: 2, W: 8, HD: 5}) - + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), 1024*1024) - res, _ := backend.Encode(buf) + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + res, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer res.Free() for i := 0; i < b.N; i++ { - decoded, err := backend.Decode(res.Data) + decoded, _ := backend.DecodeMatrix(res.Data, DefaultChunkSize) + defer decoded.Free() - if err != nil { - b.Fatal(err) - } - decoded.Free() } - backend.Close() + } func TestEncodeM(t *testing.T) { @@ -772,7 +751,7 @@ func TestEncodeM(t *testing.T) { } buf := make([]byte, 1024*1024) - cryptorand.Read(buf) + _, _ = cryptorand.Read(buf) testParams := []struct { chunkUnit int @@ -789,20 +768,21 @@ func TestEncodeM(t *testing.T) { testName := fmt.Sprintf("TestEncodeB-%d-%d", p.chunkUnit, p.lenToDecode) t.Run(testName, func(t *testing.T) { // Do the matrix encoding - result, err := backend.EncodeMatrix(buf, p.chunkUnit) - - if err != nil { - t.Errorf("failed to encode %+v", err) - } + bm := NewBufferMatrix(p.chunkUnit, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(t, err) + bm.Finish() + result, err := backend.EncodeMatrixWithBufferMatrix(bm, p.chunkUnit) + require.NoError(t, err) + defer result.Free() // Check that our linearized buffer // contains expected data when there is all data fragment. ddata, err := backend.LinearizeMatrix(result.Data, p.chunkUnit) - assert.NoError(t, err) - assert.Equal(t, len(buf), len(ddata.Data), "data mismatch") - assert.Equalf(t, buf, ddata.Data, "data mismatch") - - ddata.Free() + require.NoError(t, err) + defer ddata.Free() + require.Equal(t, len(buf), len(ddata.Data), "data mismatch") + require.True(t, bytes.Equal(buf, ddata.Data), "data mismatch") /* now do the same but with the slow path*/ /* we will run a matrix decoding but withtout some data part, to enforce repairing*/ @@ -813,26 +793,147 @@ func TestEncodeM(t *testing.T) { vect = append(vect, result.Data[5]) ddata2, _ := backend.DecodeMatrix(vect, p.chunkUnit) - assert.Equal(t, buf, ddata2.Data, "data mismatch") - ddata2.Free() + require.True(t, bytes.Equal(buf, ddata2.Data), "data mismatch") + defer ddata2.Free() + }) + } + _ = backend.Close() +} + +func TestLinearizeMatrixAndReconstruct(t *testing.T) { + backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: 2, M: 1, W: 8, HD: 5}) + require.NoError(t, err) + defer func() { + _ = backend.Close() + }() + + testParams := []struct { + chunkSize int + dataSize int + startIncl int + endIncl int + useOldFormat bool + }{ + { + chunkSize: 512, + dataSize: 512*2 + 10, + startIncl: 512*2 - 3, + endIncl: 512*2 - 1, + useOldFormat: false, + }, + { + chunkSize: 512, + dataSize: 512*2 + 10, + startIncl: 512*2 - 3, + endIncl: 512*2 - 1, + useOldFormat: true, + }, + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 59441, + endIncl: 64149, + useOldFormat: false, + }, + + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 59441, + endIncl: 64149, + useOldFormat: true, + }, + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 105610, + endIncl: 105622, + useOldFormat: true, + }, + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 105610, + endIncl: 105622, + useOldFormat: false, + }, + } + + for _, param := range testParams { + p := param + testName := fmt.Sprintf("TestLinearizeMatrixAndReconstruct(oldformat=%v)-%d-%d-%d-%d", + p.useOldFormat, p.chunkSize, p.dataSize, p.startIncl, p.endIncl, + ) + t.Run(testName, func(t *testing.T) { + currentChunkSize := p.chunkSize + dataSize := p.dataSize + startIncl := p.startIncl + endIncl := p.endIncl + + data := make([]byte, dataSize) + for i := range dataSize { + data[i] = byte('A' + i%26) + } + bm := NewBufferMatrix(currentChunkSize, len(data), backend.K) + if p.useOldFormat { + bm.UseOldFormat() + } + _, err = io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, currentChunkSize) + require.NoError(t, err) + defer encoded.Free() + + rangeM := backend.GetRangeMatrix(startIncl, endIncl, currentChunkSize, len(encoded.Data[0])) + require.NotNil(t, rangeM) + + /* Decode the matrix as if it was requested and + checks that the result matches the payload on the requested range. */ + frags := make([][]byte, 0) + for i := 0; i < rangeM.FragCount; i++ { + fragIdx := (rangeM.FragFirstIncl + i) % backend.K + buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl] + frags = append(frags, buffer) + } + + decoded, err := backend.LinearizeMatrix(frags, currentChunkSize) + require.NoError(t, err) + defer decoded.Free() - result.Free() + expected := data[startIncl:endIncl] + + linearizedRangeEndExcl := rangeM.LinearizedRangeStartIncl + (endIncl - startIncl) + found := decoded.Data[rangeM.LinearizedRangeStartIncl:linearizedRangeEndExcl] + + require.True(t, bytes.Equal(expected, found)) + + frags2 := make([][]byte, 0) + for i := 0; i < backend.K+backend.M; i++ { + frags2 = append(frags2, encoded.Data[i][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl]) + } + + // now do the same test, but this time, insted of linearizing, we are going to reconstruct the stripes + reconstructed, err := backend.ReconstructMatrix(frags2[1:], 0, currentChunkSize) + + require.NoError(t, err) + defer reconstructed.Free() + + require.True(t, bytes.Equal(frags2[0], reconstructed.Data)) }) } - backend.Close() } func TestLinearizeMatrix(t *testing.T) { - assert := assert.New(t) - pieceSize := DefaultChunkSize k := 4 m := 1 backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: m}) - if err != nil { - t.Fatalf("cannot init backend: (%v)", err) - } + require.NoError(t, err) + defer func() { + _ = backend.Close() + }() rangeValues := func(values []reflect.Value, rng *rand.Rand) { dataSize := 1 + rng.Intn(7*1024*1024) @@ -856,29 +957,31 @@ func TestLinearizeMatrix(t *testing.T) { t.Logf("TestLinearizeMatrix check %d-%d-%d", startIncl, endIncl, dataSize) data := make([]byte, dataSize) - cryptorand.Read(data) - - encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) - if err != nil { - t.Fatalf("failed to encode buffer: (%v)", err) - } + _, _ = cryptorand.Read(data) + + bm := NewBufferMatrix(DefaultChunkSize, len(data), backend.K) + _, err := io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer encoded.Free() fragSize := len(encoded.Data[0]) rangeM := backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize) - assert.NotNil(rangeM) + require.NotNil(t, rangeM) /* Decode the matrix as if it was requested and checks that the result matches the payload on the requested range. */ frags := make([][]byte, 0) - for i := 0; i < rangeM.FragCount; i += 1 { + for i := 0; i < rangeM.FragCount; i++ { fragIdx := (rangeM.FragFirstIncl + i) % k buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl] frags = append(frags, buffer) } decoded, err := backend.LinearizeMatrix(frags, pieceSize) - assert.Nil(err) + require.NoError(t, err) defer decoded.Free() expected := data[startIncl : endIncl+1] @@ -892,9 +995,8 @@ func TestLinearizeMatrix(t *testing.T) { Values: rangeValues, } - if err := quick.Check(checkRange, &config); err != nil { - t.Error(err) - } + require.NoError(t, quick.Check(checkRange, &config)) + } func TestDecodeMatrix(t *testing.T) { @@ -932,12 +1034,14 @@ func TestDecodeMatrix(t *testing.T) { t.Logf("TestDecodeMatrix check %d-%d-%d-%d", startIncl, endIncl, dataSize, failedFragIdx) data := make([]byte, dataSize) - cryptorand.Read(data) - - encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) - if err != nil { - t.Fatalf("failed to encode buffer: (%v)", err) - } + _, _ = cryptorand.Read(data) + + bm := NewBufferMatrix(DefaultChunkSize, len(data), backend.K) + _, err := io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer encoded.Free() fragSize := len(encoded.Data[0]) @@ -947,7 +1051,7 @@ func TestDecodeMatrix(t *testing.T) { /* Decode the matrix as if it was requested and checks that the result matches the payload on the requested range. */ frags := make([][]byte, 0) - for i := 0; i < (k + m); i += 1 { + for i := 0; i < (k + m); i++ { fragIdx := i if fragIdx == failedFragIdx { continue @@ -991,16 +1095,18 @@ func TestValidateFragmentMatrix(t *testing.T) { dataSize := 7 * 1024 * 1024 data := make([]byte, dataSize) - cryptorand.Read(data) - - encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) - if err != nil { - t.Fatalf("failed to encode buffer: (%v)", err) - } + _, _ = cryptorand.Read(data) + + bm := NewBufferMatrix(DefaultChunkSize, len(data), backend.K) + _, err = io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer encoded.Free() fragSize := len(encoded.Data[0]) - for i := 0; i < len(encoded.Data); i += 1 { + for i := 0; i < len(encoded.Data); i++ { rangeMatrix := backend.GetRangeMatrix(0, dataSize-1, pieceSize, fragSize) assert.NotNil(rangeMatrix) @@ -1011,7 +1117,7 @@ func TestValidateFragmentMatrix(t *testing.T) { chunkSize := pieceSize + backend.headerSize offset := 0 for offset < len(frag) { - for altered := 0; altered < backend.headerSize; altered += 1 { + for altered := 0; altered < backend.headerSize; altered++ { t.Logf("frag %d altered offset %d altered %d", i, offset, altered) previous := frag[offset+altered] frag[offset+altered] = previous + 1 @@ -1063,14 +1169,15 @@ func TestReconstructM(t *testing.T) { testName := fmt.Sprintf("TestReconstruct-%d-%d", p.chunkUnit, p.fragNumber) t.Run(testName, func(t *testing.T) { // Do the matrix encoding - result, err := backend.EncodeMatrix(buf, p.chunkUnit) + bm := NewBufferMatrix(p.chunkUnit, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(t, err) + bm.Finish() + result, err := backend.EncodeMatrixWithBufferMatrix(bm, p.chunkUnit) + require.NoError(t, err) defer result.Free() - if err != nil { - t.Errorf("failed to encode %+v", err) - } - var vect [][]byte for i := 0; i < backend.K+backend.M; i++ { if i != p.fragNumber { @@ -1079,22 +1186,15 @@ func TestReconstructM(t *testing.T) { } ddata, err := backend.ReconstructMatrix(vect, p.fragNumber, p.chunkUnit) - if err != nil { - t.Errorf("cannot reconstruct fragment %d cause=%v", p.fragNumber, err) - return - } - if ddata == nil { - t.Fatal("unexpected error / fragment rebuilt is nil") - } + require.NoError(t, err) + require.NotNil(t, ddata) res := bytes.Compare(ddata.Data, result.Data[p.fragNumber]) - ddata.Free() - if res != 0 { - t.Errorf("Error, fragment rebuilt is different from the original one") - } + require.Equal(t, 0, res) + defer ddata.Free() }) } - backend.Close() + _ = backend.Close() } func TestEncodeDecodeMatrix(t *testing.T) { @@ -1111,74 +1211,51 @@ func TestEncodeDecodeMatrix(t *testing.T) { t.Errorf("Error creating backend %v: %q", params, err) continue } - + defer func() { + _ = backend.Close() + }() for patternIndex, pattern := range testPatterns { t.Run(fmt.Sprintf("%s_%d_%d-%d-%d", params.Name, params.K, params.M, patternIndex, len(pattern)), func(t *testing.T) { - data, err := backend.EncodeMatrix(pattern, 32768) - if err != nil { - t.Errorf("Error encoding %v: %q", params, err) - return - } + bm := NewBufferMatrix(DefaultChunkSize, len(pattern), backend.K) + _, err := io.Copy(bm, bytes.NewReader(pattern)) + require.NoError(t, err) + bm.Finish() + data, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer data.Free() frags := data.Data - decode := func(frags [][]byte, description string) bool { - decoded, err := backend.DecodeMatrix(frags, 32768) - if err != nil { - t.Errorf("%v: %v: %q for pattern %d", description, backend, err, patternIndex) - return false - } else if !bytes.Equal(decoded.Data, pattern) { - t.Errorf("%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, decoded.Data) - return false - } - decoded.Free() - return true + decode := func(frags [][]byte, description string) { + decoded, err := backend.DecodeMatrix(frags, DefaultChunkSize) + require.NoError(t, err) + require.True(t, bytes.Equal(decoded.Data, pattern), "%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, decoded.Data) + defer decoded.Free() } - var good bool - good = decode(frags, "all frags") - good = good && decode(shuf(frags), "all frags, shuffled") - good = good && decode(frags[:params.K], "data frags") - good = good && decode(shuf(frags[:params.K]), "shuffled data frags") - good = good && decode(frags[params.M:], "with parity frags") - good = good && decode(shuf(frags[params.M:]), "shuffled parity frags") - - if !good { - return - } - - // remove := func(s [][]byte, i int) [][]byte { - // s[i] = s[len(s)-1] - // return s[:len(s)-1] - // } + decode(frags, "all frags") + decode(shuf(frags), "all frags, shuffled") + decode(frags[:params.K], "data frags") + decode(shuf(frags[:params.K]), "shuffled data frags") + decode(frags[params.M:], "with parity frags") + decode(shuf(frags[params.M:]), "shuffled parity frags") for fIdx := 0; fIdx < params.K; fIdx++ { newFrags := frags[fIdx+1:] if fIdx >= 1 { newFrags = append(newFrags, frags[0:fIdx]...) } - ddata, err := backend.ReconstructMatrix(newFrags, fIdx, 32768) - if err != nil { - t.Fatal("cannot reconstruct ", err) - } - if !bytes.Equal(ddata.Data, frags[fIdx]) { - ddata.Free() - t.Fatalf("part %d reconstructed not equal to original len: %q != %q", fIdx, ddata.Data, frags[fIdx]) - } - ddata.Free() + ddata, err := backend.ReconstructMatrix(newFrags, fIdx, DefaultChunkSize) + require.NoError(t, err) + require.True(t, bytes.Equal(ddata.Data, frags[fIdx]), "part %d reconstructed not equal to original len: %q != %q", fIdx, ddata.Data, frags[fIdx]) + defer ddata.Free() } }) } - - err = backend.Close() - if err != nil { - t.Errorf("Error closing backend %v: %q", backend, err) - } } } @@ -1205,7 +1282,7 @@ func TestRangeMatrix(t *testing.T) { startIncl := 0 endIncl := 0 rangeM = backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize) - rangeM = backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize) + assert.Equal(t, rangeM.FragCount, 1) assert.Equal(t, rangeM.FragFirstIncl, 0) assert.Equal(t, rangeM.ReqStartIncl, startIncl) @@ -1242,3 +1319,313 @@ func TestRangeMatrix(t *testing.T) { totalRead = rangeM.FragCount * (rangeM.InFragRangeEndExcl - rangeM.InFragRangeStartIncl) assert.Equal(t, expectedTotalRead, totalRead) } + +func TestGetRangeMatrix(t *testing.T) { + type testCase struct { + name string + start int + end int + chunksize int + fragSize int + payloadSize int + expectedFragStart int + expectedFragEnd int + expectedDecStart int + expectedDecEnd int + } + + backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: 2, M: 1}) + defer func() { + _ = backend.Close() + }() + + testCases := []testCase{ + { + name: "First 128 bytes, 100k payload", + start: 0, + end: 128, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 100000, + expectedFragStart: 0, + expectedFragEnd: 32768 + backend.headerSize, + expectedDecStart: 0, + expectedDecEnd: 128, + }, + { + name: "First 128 bytes, 1MB payload", + start: 0, + end: 128, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 1000000, + expectedFragStart: 0, + expectedFragEnd: 32768 + backend.headerSize, + expectedDecStart: 0, + expectedDecEnd: 128, + }, + { + name: "64k Block in the middle, 100k payload", + start: 32768, + end: 32768 + 65536, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 100000, + expectedFragStart: 0, + expectedFragEnd: 65536 + 2*backend.headerSize, + expectedDecStart: 32768, + expectedDecEnd: 65536 + 32768, + }, + { + name: "64k Block in the middle, 1MB payload", + start: 500000, + end: 500000 + 65536, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 1000000, + expectedFragStart: 7 * (32768 + 80), + expectedFragEnd: 7*(32768+80) + 65536 + backend.headerSize*2, + expectedDecStart: 500000 - 458752, + expectedDecEnd: 500000 - 458752 + 65536, + }, + { + name: "Last 80 bytes, 100k payload", + start: 100000 - 80, + end: 100000, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 100000, + expectedFragStart: 32768 + 80, + expectedFragEnd: 32768 + 80 + 80 + 32768, + expectedDecStart: 100000 - 65536 - 80, + expectedDecEnd: 100000 - 65536, + }, + { + name: "Last 80 bytes, 1MB payload", + start: 1000000 - 80, + end: 1000000, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 1000000, + expectedFragStart: (1000000 / 32768 / 2) * (32768 + 80), + expectedFragEnd: (1000000/32768/2)*(32768+80) + 32768 + backend.headerSize, + expectedDecStart: 1000000 - ((1000000 / 32768) * 32768) - 80, + expectedDecEnd: 1000000 - ((1000000 / 32768) * 32768), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + length := tc.end - tc.start + rm := backend.GetRangeMatrix(tc.start, tc.end, tc.chunksize, tc.fragSize) + require.NotNil(t, rm, "GetRangeMatrix returned nil") + assert.Equal(t, tc.expectedFragStart, rm.InFragRangeStartIncl, "FragRangeStart mismatch") + assert.Equal(t, tc.expectedFragEnd, rm.InFragRangeEndExcl, "FragRangeEnd mismatch") + assert.Equal(t, tc.expectedDecStart, rm.DecodedRangeStartIncl, "DecodedRangeStart mismatch") + assert.Equal(t, tc.expectedDecEnd, rm.DecodedRangeStartIncl+length, "DecodedRangeEnd mismatch") + }) + } +} + +func TestRange(t *testing.T) { + testCases := []struct { + name string + useNewFormat bool + }{ + {"OldFormat", false}, + {"NewFormat", true}, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + testRangeHelper(t, testCase.useNewFormat) + }) + } +} + +func testRangeHelper(t *testing.T, useNewFormat bool) { + backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: 2, M: 1}) + defer func() { + _ = backend.Close() + }() + + chunkSize := 32768 + size := 1020300 + + rm := backend.GetRangeMatrix(10, 20, chunkSize, size) + require.NotNil(t, rm, "GetRangeMatrix returned nil") + + buf := make([]byte, size) + for i := range buf { + buf[i] = byte('A' + i%26) + } + + bm := NewBufferMatrix(chunkSize, len(buf), backend.K) + if !useNewFormat { + bm.UseOldFormat() + } + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(t, err) + bm.Finish() + encodedData, err := backend.EncodeMatrixWithBufferMatrix(bm, chunkSize) + require.NoError(t, err) + defer encodedData.Free() + stripes := make([][]byte, backend.K+backend.M) + for i := range backend.K + backend.M { + stripes[i] = encodedData.Data[i] + stripes[i] = stripes[i][rm.InFragRangeStartIncl:rm.InFragRangeEndExcl] + } + + // Test decode + decodedData, err := backend.DecodeMatrix(stripes, chunkSize) + require.NoError(t, err) + defer decodedData.Free() + assert.Equal(t, buf[10:20], decodedData.Data[rm.DecodedRangeStartIncl:rm.DecodedRangeStartIncl+20-10], "Decoded data mismatch") + + // Test repair + decodedData2, err := backend.DecodeMatrix(stripes[1:], chunkSize) + require.NoError(t, err) + defer decodedData2.Free() + assert.Equal(t, buf[10:20], decodedData2.Data[rm.DecodedRangeStartIncl:rm.DecodedRangeStartIncl+20-10], "Decoded data mismatch") + + // Test last 80 bytes + rm = backend.GetRangeMatrix(size-80, size, chunkSize, size) + require.NotNil(t, rm, "GetRangeMatrix returned nil") + + for i := range backend.K + backend.M { + stripes[i] = encodedData.Data[i] + stripes[i] = stripes[i][rm.InFragRangeStartIncl:rm.InFragRangeEndExcl] + } + + // Test decode + decodedData3, err := backend.DecodeMatrix(stripes, chunkSize) + require.NoError(t, err) + defer decodedData3.Free() + assert.Equal(t, buf[size-80:size], decodedData3.Data[rm.DecodedRangeStartIncl:rm.DecodedRangeStartIncl+80], "Decoded data mismatch") + // Test repair + decodedData4, err := backend.DecodeMatrix(stripes[1:], chunkSize) + require.NoError(t, err) + defer decodedData4.Free() + assert.Equal(t, buf[size-80:size], decodedData4.Data[rm.DecodedRangeStartIncl:rm.DecodedRangeStartIncl+80], "Decoded data mismatch") +} + +// TestFormatOldNew tests the compatibility of the new format with the old one +// It uses the buffer matrix to encode the data in the old/new format and +// then decodes it using the backend. It checks that the data is the same +// and that the format is correct. +func TestFormatOldNew(t *testing.T) { + testCases := []struct { + useNewFormat bool + k, n int + }{ + {true, 2, 1}, + {true, 5, 1}, + {false, 2, 1}, + {false, 5, 1}, + } + for _, testCase := range testCases { + t.Run(fmt.Sprintf("%v-%d-%d", testCase.useNewFormat, testCase.k, testCase.n), func(t *testing.T) { + // use buffermatrix to storage format in new format and see if we can decode it + backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: testCase.k, M: testCase.n}) + require.NoError(t, err) + defer backend.Close() + buf := bytes.Repeat([]byte("A"), 1024*1024+rand.Intn(1024*1024)) //nolint:gosec + + bm := NewBufferMatrix(32768, len(buf), backend.K) + if testCase.useNewFormat { + bm.UseNewFormat() + } + _, err = io.Copy(bm, bytes.NewReader(buf)) + require.NoError(t, err) + bm.Finish() + + require.Equal(t, len(buf), bm.Length()) + + e, err := backend.EncodeMatrixWithBufferMatrix(bm, 32768) + require.NoError(t, err) + defer e.Free() + + // check the format / first 80 is the header, lets check it + for i := range len(e.Data) { + hdr := e.Data[i][0:80] + + var f fragheader + err = f.UnmarshalBinary(hdr) + require.NoError(t, err) + require.Equal(t, 32768, int(f.meta.size)) + require.Equal(t, 32768*testCase.k, int(f.meta.origDataSize)) //nolint:gosec + } + // case 1; fast decode + ddata, err := backend.DecodeMatrix(e.Data, 32768) + require.NoError(t, err) + require.Equal(t, buf, ddata.Data) + defer ddata.Free() + // case 2: missing data + rdata, err := backend.ReconstructMatrix(e.Data[1:], 0, 32768) + require.NoError(t, err) + require.Equal(t, e.Data[0], rdata.Data) + defer rdata.Free() + // case 3: slow decode + ddata2, err := backend.DecodeMatrix(e.Data[1:], 32768) + require.NoError(t, err) + require.Equal(t, buf, ddata2.Data) + defer ddata2.Free() + // case 4: rebuild missing coding + require.Equal(t, testCase.k, len(e.Data[:testCase.k])) + rdata2, err := backend.ReconstructMatrix(e.Data[:testCase.k], testCase.k, 32768) + require.NoError(t, err) + require.Equal(t, e.Data[testCase.k], rdata2.Data) + }) + } +} + +// duplicate fragment_header_t from libec +type fragheader struct { + meta fragmeta + magic uint32 + libecVersion uint32 + metadataChksum uint32 + padding [9]byte +} + +func (f *fragheader) UnmarshalBinary(data []byte) error { + if len(data) != 80 { + return fmt.Errorf("invalid size for fragment header: %d", len(data)) + } + if err := f.meta.UnmarshalBinary(data[0:63]); err != nil { + return err + } + f.magic = binary.BigEndian.Uint32(data[63:67]) + f.libecVersion = binary.BigEndian.Uint32(data[67:71]) + f.metadataChksum = binary.BigEndian.Uint32(data[71:75]) + copy(f.padding[:], data[75:80]) + return nil +} + +func (f *fragmeta) UnmarshalBinary(data []byte) error { + if len(data) != 63 { + return fmt.Errorf("invalid size for fragment metadata: %d", len(data)) + } + f.idx = binary.BigEndian.Uint32(data[0:4]) + f.size = binary.LittleEndian.Uint32(data[4:8]) + f.fragBackendMetadataSize = binary.LittleEndian.Uint32(data[8:12]) + f.origDataSize = binary.LittleEndian.Uint64(data[12:20]) + f.checksumType = data[20] + copy(f.checksum[:], data[21:53]) + f.checksumMismatch = data[53] + f.backendID = data[54] + f.backendVersion = binary.BigEndian.Uint32(data[55:59]) + return nil +} + +type fragmeta struct { + idx uint32 + size uint32 + fragBackendMetadataSize uint32 + origDataSize uint64 + checksumType uint8 + checksum [32]byte + checksumMismatch uint8 + backendID uint8 + backendVersion uint32 +} diff --git a/buffer.go b/buffer.go index e737e02..433b52d 100644 --- a/buffer.go +++ b/buffer.go @@ -5,53 +5,96 @@ import ( "io" ) -type BufferMatrix struct { - b []byte - zero []byte - hdrSize, bufSize int - len int // len of input - k int - curBlock int - leftInBlock int - finished bool +type BufferInfo struct { + hdrSize, bufSize int + len int + k int + curBlock int + leftInBlock int + sizeOfLastSubGroup int + newStyle bool } // FragLen returns the size of a "fragment" aligned to a block size (data + header) -func (b BufferMatrix) FragLen() int { +func (b BufferInfo) FragLen() int { return b.SubGroups() * (b.bufSize + b.hdrSize) } // SubGroups returns the number of blocks inside a single fragment -func (b BufferMatrix) SubGroups() int { +func (b BufferInfo) SubGroups() int { nbBlocks := (b.len + b.bufSize - 1) / b.bufSize nbStripes := (nbBlocks + b.k - 1) / b.k return nbStripes } -func (b BufferMatrix) maxLen() int { +func (b BufferInfo) maxLen() int { return (b.SubGroups() * b.k) * (b.bufSize + b.hdrSize) } +func (b BufferInfo) IsBlockInLastSubGroup(block int) bool { + cur := block / b.k + return cur == b.SubGroups()-1 +} + +func (b BufferInfo) ComputeSizeOfLastSubGroup() int { + // total of size already in previous subgroups + lastSubGroup := b.SubGroups() - 1 + totalSizeInPreviousSubGroups := lastSubGroup * b.k * (b.bufSize) + leftSize := b.len - totalSizeInPreviousSubGroups + return leftSize +} + +func (b BufferInfo) FragLenLastSubGroup() int { + if !b.newStyle { + return b.bufSize + } + r := b.ComputeSizeOfLastSubGroup() / b.k + if b.ComputeSizeOfLastSubGroup()%b.k != 0 { + r++ + } + return r +} + +func (b *BufferInfo) init(bufSize int, length int, k int) { + b.newStyle = true + + hdrSize := fragmentHeaderSize() + b.hdrSize = hdrSize + b.bufSize = bufSize + b.len = length + b.k = k + b.leftInBlock = -1 + b.curBlock = 0 + + b.sizeOfLastSubGroup = b.FragLenLastSubGroup() +} + +func NewBufferInfo(bufSize int, length int, k int) *BufferInfo { + var b BufferInfo + b.init(bufSize, length, k) + return &b +} + +type BufferMatrix struct { + b []byte + zero []byte + finished bool + BufferInfo +} + // NewBufferMatrix returns a new buffer suitable for data and organized // such as it can be injected into EncodeMatrixWithBuffer without allocation/copying // the data into shards -func NewBufferMatrix(bufSize int, l int, k int) *BufferMatrix { +func NewBufferMatrix(bufSize int, length int, k int) *BufferMatrix { var b BufferMatrix - b.Reset(bufSize, l, k) + b.Reset(bufSize, length, k) return &b } // Reset serves the same purpose as NewBufferMatrix but use the existing buffer and // tries to avoid allocation of the underlying buffer. func (b *BufferMatrix) Reset(bufSize int, length int, k int) { - hdrSize := fragmentHeaderSize() - b.hdrSize = hdrSize - b.bufSize = bufSize - b.len = length - b.k = k - b.leftInBlock = -1 - b.curBlock = 0 - b.finished = false + b.init(bufSize, length, k) maxLen := b.maxLen() @@ -66,30 +109,38 @@ func (b *BufferMatrix) Reset(bufSize int, length int, k int) { if len(b.zero) < bufSize { b.zero = make([]byte, bufSize) } + b.finished = false } -var emptyErasureHeader = bytes.Repeat([]byte{0}, fragmentHeaderSize()) - -// getOffset returns current offset in buffer and size left in the current block -// So that it is safe to copy bytes at . -// If we are at a boundary, it will init the header and skip it. -func (b *BufferMatrix) getOffset() (int, int) { - realCurBlock := b.getRealBlock(b.curBlock) - blockSize := b.hdrSize + b.bufSize - blockOffset := realCurBlock * blockSize - if b.leftInBlock == -1 { - // Start of a block - copy(b.b[blockOffset:], emptyErasureHeader) - b.leftInBlock = b.bufSize +// UseNewFormat sets the buffer to use the new format. +// The new format is more efficient for the last stripe/subgroup. +// Note: will panic if called after any Write() or ReadFrom() +func (b *BufferMatrix) UseNewFormat() { + if b.curBlock != 0 || b.leftInBlock != -1 || b.finished { + panic("UseNewFormat must be called before any Write") } + b.newStyle = true +} - curOffset := blockOffset + (b.bufSize - b.leftInBlock) + b.hdrSize +func (b *BufferMatrix) UseOldFormat() { + if b.curBlock != 0 || b.leftInBlock != -1 || b.finished { + panic("UseOldFormat must be called before any Write") + } + b.newStyle = false +} - return curOffset, b.leftInBlock +// getOffset is a wrapper around getOffsetOld and getOffsetNew. +// It will call the right one depending on the newStyle flag. +func (b *BufferMatrix) getOffset() (int, int) { + if b.newStyle { + return b.getOffsetNew() + } + return b.getOffsetOld() } +var emptyErasureHeader = bytes.Repeat([]byte{0}, fragmentHeaderSize()) + // Finish *must* be called after the final Write() *before* using the buffer -// in EncodeMatrix // It is safe to call it multiple times. func (b *BufferMatrix) Finish() { if b.finished { @@ -111,9 +162,84 @@ func (b *BufferMatrix) Finish() { b.finished = true } +// In b.b buffer, the data is organized as follow: +// - for each block, we have a header of size hdrSize +// - then the data of size bufSize +// - then the next block +// - etc. +// The data is organized in stripes of k blocks. +// It is meant to be split later and stored as shards. +// Shard 0 will contain block 0, k, 2k, 3k, etc. +// Shard 1 will contain block 1, k+1, 2k+1, 3k+1, etc. +// etc. +// So b.b is organized as follow: +// [hdr][block 0][hdr][block k][hdr][block 2k][hdr][block 3k] ... [hdr][block 1][hdr][block k+1] etc... +// When writing to buffer, we will write in the current block until it is full. +// Then we will skip the header and write in the next block. +// For example when block 0 is full, we will skip the header and write in block 1. When block 1 is full, we will skip the header and write in block 2, etc. +// Them, when all blocks from 0 to k-1 are full, we will write in block k, k+1, etc. + +// getRealBlock returns the real block index in the buffer. +// For example, if we have k=2 and 5 blocks in total, the buffer will be organized as follow: +// [hdr][block 0] [hdr][block 2] [hdr][block 4] [hdr][block 1] [hdr][block 3] +// So getRealBlock(0) will return 0, getRealBlock(1) will return 3, getRealBlock(2) will return 1, getRealBlock(3) will return 4, getRealBlock(4) will return 2. + +// getRealBlock returns the real block index in the buffer. +// blockidx is the block index in the incoming data (0-indexed) +// the return value is the block index in the buffer (0-indexed) func (b BufferMatrix) getRealBlock(blockidx int) int { - subgroup := b.SubGroups() - return (blockidx%b.k)*subgroup + (blockidx / b.k) + nbStripes := b.SubGroups() + return (blockidx%b.k)*nbStripes + (blockidx / b.k) +} + +// getOffSetNew returns current offset in buffer and size left in the current block +// Same a getOffset when blocks are not in the last stripe/subgroup +// When blocks are in the last stripe/subgroup, it will split the size left in k parts +// and return the offset and size left for the current block. +// For example, if we have 5*blocksize bytes and k=2, the buffer will be organized as follow: +// [hdr][block 0] [hdr][block 2] [hdr][block 4] / [hdr][block 1] [hdr][block 3] [hdr][block 5] +// where size(block4) + size(block5) == len - (4 * blocksize) == size of last subgroup +// when/if the size of last subgroup is not divisible by k, the block 4 may be one byte longer than block 5 +func (b *BufferMatrix) getOffsetNew() (int, int) { + realCurBlock := b.getRealBlock(b.curBlock) + blockSize := b.hdrSize + b.bufSize + blockOffset := realCurBlock * blockSize + if b.leftInBlock == -1 { + // Start of a block + copy(b.b[blockOffset:], emptyErasureHeader) + if b.IsBlockInLastSubGroup(b.curBlock) { + b.leftInBlock = b.FragLenLastSubGroup() + } else { + b.leftInBlock = b.bufSize + } + } + + bufSize := b.bufSize + if b.IsBlockInLastSubGroup(b.curBlock) { + bufSize = b.FragLenLastSubGroup() + } + + curOffset := blockOffset + (bufSize - b.leftInBlock) + b.hdrSize + + return curOffset, b.leftInBlock +} + +// getOffset returns current offset in buffer and size left in the current block +// So that it is safe to copy bytes at . +// If we are at a boundary, it will init the header and skip it. +func (b *BufferMatrix) getOffsetOld() (int, int) { + realCurBlock := b.getRealBlock(b.curBlock) + blockSize := b.hdrSize + b.bufSize + blockOffset := realCurBlock * blockSize + if b.leftInBlock == -1 { + // Start of a block + copy(b.b[blockOffset:], emptyErasureHeader) + b.leftInBlock = b.bufSize + } + + curOffset := blockOffset + (b.bufSize - b.leftInBlock) + b.hdrSize + + return curOffset, b.leftInBlock } func (b *BufferMatrix) Write(p []byte) (int, error) { @@ -122,13 +248,7 @@ func (b *BufferMatrix) Write(p []byte) (int, error) { for len(p) > 0 { curOffset, leftToCopy := b.getOffset() - var m int - - if len(p) > leftToCopy { - m = leftToCopy - } else { - m = len(p) - } + m := min(len(p), leftToCopy) n := copy(b.b[curOffset:], p[:m]) @@ -176,7 +296,12 @@ func (b BufferMatrix) RealData() []byte { for block := 0; len(res) < b.len; block++ { blockSize := b.hdrSize + b.bufSize curOffset := b.getRealBlock(block)*blockSize + b.hdrSize - res = append(res, b.b[curOffset:curOffset+b.bufSize]...) + if b.newStyle && b.IsBlockInLastSubGroup(block) { + amountToCopy := min(b.FragLenLastSubGroup(), b.len-len(res)) + res = append(res, b.b[curOffset:curOffset+amountToCopy]...) + } else { + res = append(res, b.b[curOffset:curOffset+b.bufSize]...) + } } return res[:b.len] @@ -189,3 +314,28 @@ func (b BufferMatrix) Bytes() []byte { func (b BufferMatrix) Length() int { return b.len } + +func (b *BufferMatrix) IsBlockInLastSubGroup(block int) bool { + cur := block / b.k + return cur == b.SubGroups()-1 +} + +func (b *BufferMatrix) ComputeSizeOfLastSubGroup() int { + // total of size already in previous subgroups + lastSubGroup := b.SubGroups() - 1 + totalSizeInPreviousSubGroups := lastSubGroup * b.k * (b.bufSize) + leftSize := b.len - totalSizeInPreviousSubGroups + return leftSize +} + +func (b *BufferMatrix) FragLenLastSubGroup() int { + if !b.newStyle { + return b.bufSize + } + + r := b.ComputeSizeOfLastSubGroup() / b.k + if b.ComputeSizeOfLastSubGroup()%b.k != 0 { + r++ + } + return r +} diff --git a/buffer_test.go b/buffer_test.go index 9cbded0..f8ea204 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -71,49 +71,6 @@ func TestBuffer(t *testing.T) { } } -func TestComparisonEncode(t *testing.T) { - for _, test := range bufferTests { - t.Run(test.name, func(t *testing.T) { - size := test.size - k := test.k - m := test.m - blockSize := test.blockSize - b := NewBufferMatrix(blockSize, size, k) - - data := make([]byte, size) - for i := 0; i < size; i++ { - data[i] = byte(i) - } - n, err := io.Copy(b, bytes.NewReader(data)) - assert.Equal(t, size, int(n)) - b.Finish() - assert.NoError(t, err) - - defer runtime.KeepAlive(b) - - backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: 5}) - defer backend.Close() - - encoded2, err := backend.EncodeMatrixWithBufferMatrix(b, blockSize) - assert.NoError(t, err) - defer encoded2.Free() - - encoded, err := backend.EncodeMatrix(data, blockSize) - assert.NoError(t, err) - defer encoded.Free() - - for j := 0; j < len(encoded2.Data); j++ { - for i := 0; i < len(encoded2.Data[0]); i++ { - assert.Equal(t, encoded2.Data[j][i], encoded.Data[j][i]) - } - } - for i := 0; i < k+m; i++ { - assert.Equal(t, (encoded2.Data[i]), (encoded.Data[i])) - } - }) - } -} - func TestEncodeBufferMatrix(t *testing.T) { for _, test := range bufferTests { t.Run(test.name, func(t *testing.T) { @@ -131,7 +88,9 @@ func TestEncodeBufferMatrix(t *testing.T) { defer runtime.KeepAlive(b) backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: 5}) - defer backend.Close() + defer func() { + _ = backend.Close() + }() encoded, err := backend.EncodeMatrixWithBufferMatrix(b, blockSize) assert.NoError(t, err) @@ -160,82 +119,164 @@ func TestEncodeBufferMatrix(t *testing.T) { } } -// BenchmarkEncodeMatrix compares speeds of both style of encoding -// using a generic buffer (and then requiring some allocations in the C shim) -// using a specific buffer (less allocations) -func BenchmarkEncodeMatrix(b *testing.B) { +func TestBufferNew(t *testing.T) { for _, test := range bufferTests { - b.Run(test.name, func(b *testing.B) { + t.Run(test.name, func(t *testing.T) { size := test.size k := test.k - m := test.m blockSize := test.blockSize - backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: 5}) - defer backend.Close() - b.Run("original", func(b *testing.B) { - buf := bytes.Repeat([]byte("A"), size) - b.ResetTimer() - for i := 0; i < b.N; i++ { - encoded, err := backend.EncodeMatrix(buf, blockSize) - - if err != nil { - b.Fatal(err) - } - encoded.Free() - } - }) - - b.Run("no copy", func(b *testing.B) { - buf := NewBufferMatrix(blockSize, size, k) - data := bytes.Repeat([]byte("A"), size) - _, err := io.Copy(buf, bytes.NewReader(data)) - assert.NoError(b, err) - buf.Finish() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - encoded, err := backend.EncodeMatrixWithBufferMatrix(buf, blockSize) - - if err != nil { - b.Fatal(err) - } - encoded.Free() - } - }) + b := NewBufferMatrix(blockSize, size, k) + b.UseNewFormat() + data := make([]byte, size) + for i := range size { + data[i] = byte(i) + } + n, err := io.Copy(b, bytes.NewReader(data)) + + assert.NoError(t, err) + assert.Equal(t, int64(size), n) + + newData := b.RealData() + assert.Equal(t, len(data), len(newData)) + assert.Equal(t, data, newData) + + b2 := NewBufferMatrix(blockSize, size, k) + b2.UseNewFormat() + n, err = b2.ReadFrom(bytes.NewReader(data)) + + assert.NoError(t, err) + assert.Equal(t, int64(size), n) + + newData = b2.RealData() + assert.Equal(t, len(data), len(newData)) + assert.Equal(t, data, newData) }) } } -// BenchmarkBufferCopy compares basic buffer vs matrix implementation -// w.r.t. filling speeds. Please note the matrix implementation is expected -// to be slower. Speed gains will occur during encoding phase -func BenchmarkBufferCopy(b *testing.B) { - for _, test := range bufferTests { - b.Run(test.name, func(b *testing.B) { - originalData := bytes.Repeat([]byte("A"), test.size) - reader := bytes.NewReader(originalData) - b.Run("original", func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - reader.Seek(0, 0) - buf := bytes.NewBuffer(make([]byte, 0, test.size)) - if _, err := io.Copy(buf, reader); err != nil { - b.Fatal("cannot read buffer") - } - } - }) - b.Run("buffermatrix", func(b *testing.B) { - b.ResetTimer() - buf := NewBufferMatrix(test.blockSize, test.size, test.k) - for i := 0; i < b.N; i++ { - reader.Seek(0, 0) - buf.Reset(test.blockSize, test.size, test.k) - if _, err := io.Copy(buf, reader); err != nil { - b.Fatal("cannot read buffer") - } - buf.Finish() - } - }) - }) - } +func TestGetRealBlock(t *testing.T) { + b := NewBufferMatrix(32, 32*5, 2) + assert.Equal(t, 0, b.getRealBlock(0)) + assert.Equal(t, 3, b.getRealBlock(1)) + assert.Equal(t, 1, b.getRealBlock(2)) + assert.Equal(t, 4, b.getRealBlock(3)) + assert.Equal(t, 2, b.getRealBlock(4)) + assert.Equal(t, 5, b.getRealBlock(5)) +} + +func TestGetOffset(t *testing.T) { + b := NewBufferMatrix(32, 32*5, 2) + b.curBlock = 0 + off, size := b.getOffset() + assert.Equal(t, b.hdrSize, off) + assert.Equal(t, 32, size) + + // 2nd block is located after the first part. + // 1st part is 3 blocks long included the header (3*(hdrSize+32)) + // and the block is itself starts at offset hdrsize and is 32 bytes long + b.curBlock = 1 + off, size = b.getOffset() + assert.Equal(t, 3*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 32, size) +} + +// TestGetOffsetNew tests the new offset calculation +// for the case where the block size is not a multiple of the buffer size. +// It ensures that the offsets are calculated correctly +// for the first and last blocks in the buffer. +func TestGetOffsetNew(t *testing.T) { + b := NewBufferMatrix(32, 32*5, 2) + b.UseNewFormat() + b.curBlock = 0 + b.leftInBlock = -1 + off, size := b.getOffset() + assert.Equal(t, b.hdrSize, off) + assert.Equal(t, 32, size) + + // 2nd block is located after the first part. + // 1st part is 3 blocks long included the header (3*(hdrSize+32)) + // and the block is itself starts at offset hdrsize and is 32 bytes long + b.curBlock = 1 + b.leftInBlock = -1 + off, size = b.getOffset() + assert.Equal(t, 3*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 32, size) + + // 5th block is located at the end of the first part + b.curBlock = 4 + b.leftInBlock = -1 + off, size = b.getOffset() + assert.Equal(t, 2*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 16, size) + + // 6th block is located at the end of the second part + b.curBlock = 5 + b.leftInBlock = -1 + off, size = b.getOffset() + assert.Equal(t, 5*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 16, size) +} + +// TestGetOffsetNew200 tests the new offset calculation for the case +// of size=200 and block size=32. +func TestGetOffsetNew200(t *testing.T) { + b := NewBufferMatrix(32, 200, 2) + b.UseNewFormat() + b.curBlock = 0 + b.leftInBlock = -1 + off, size := b.getOffsetNew() + assert.Equal(t, b.hdrSize, off) + assert.Equal(t, 32, size) + + // 2nd block is located after the first part. + // 1st part is 3 blocks long included the header (3*(hdrSize+32)) + // and the block is itself starts at offset hdrsize and is 32 bytes long + b.curBlock = 1 + b.leftInBlock = -1 + off, size = b.getOffsetNew() + assert.Equal(t, 4*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 32, size) + + b.curBlock = 4 + b.leftInBlock = -1 + off, size = b.getOffsetNew() + assert.Equal(t, 2*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 32, size) + + b.curBlock = 6 + b.leftInBlock = -1 + off, size = b.getOffsetNew() + assert.Equal(t, 3*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 4, size) + + // 6th block is located at the end of the second part + b.curBlock = 7 + b.leftInBlock = -1 + off, size = b.getOffsetNew() + assert.Equal(t, 7*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 4, size) +} + +// TestIsBlockInLastSubGroup tests the IsBlockInLastSubGroup method +func TestIsBlockInLastSubGroup(t *testing.T) { + b := NewBufferMatrix(32, 32*5, 2) + nbSubGroups := b.SubGroups() + assert.Equal(t, 3, nbSubGroups) + assert.True(t, b.IsBlockInLastSubGroup(4)) + assert.True(t, b.IsBlockInLastSubGroup(5)) + + b = NewBufferMatrix(32, 32*4+2, 4) + nbSubGroups = b.SubGroups() + assert.Equal(t, 2, nbSubGroups) + assert.False(t, b.IsBlockInLastSubGroup(3)) + assert.True(t, b.IsBlockInLastSubGroup(4)) +} + +func TestComputeSizeOfLastSubGroup(t *testing.T) { + b := NewBufferMatrix(32, 200, 2) + b.UseNewFormat() + size := b.FragLenLastSubGroup() + assert.Equal(t, 4, size) + s := b.ComputeSizeOfLastSubGroup() + assert.Equal(t, 8, s) } diff --git a/memalign.go b/memalign.go index db8eb96..63cc62a 100644 --- a/memalign.go +++ b/memalign.go @@ -6,6 +6,7 @@ import ( ) func getAlignDifference(b []byte, align int) int { + // nolint:gosec return int(uintptr(unsafe.Pointer(&b[0])) & uintptr(align-1)) }