Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
syntax = "proto3";

option go_package = "github.com/ozontech/seq-db/pkg/storeapi;storeapi";

package api;

import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

option go_package = "github.com/ozontech/seq-db/pkg/storeapi;storeapi";

service StoreApi {
rpc Bulk(BulkRequest) returns (google.protobuf.Empty) {}

Expand All @@ -20,7 +20,7 @@ service StoreApi {
rpc CancelAsyncSearch(CancelAsyncSearchRequest) returns (CancelAsyncSearchResponse) {}

rpc DeleteAsyncSearch(DeleteAsyncSearchRequest) returns (DeleteAsyncSearchResponse) {}

rpc GetAsyncSearchesList(GetAsyncSearchesListRequest) returns (GetAsyncSearchesListResponse) {}

rpc Fetch(FetchRequest) returns (stream BinaryData) {}
Expand Down Expand Up @@ -86,7 +86,7 @@ message SearchResponse {
}

message IdWithHint {
Id id = 1;
Id id = 1;
string hint = 3;
}

Expand Down Expand Up @@ -187,7 +187,7 @@ message FetchAsyncSearchResultResponse {

repeated AggQuery aggs = 9;
int64 histogram_interval = 10;

string query = 11;
google.protobuf.Timestamp from = 12;
google.protobuf.Timestamp to = 13;
Expand All @@ -196,11 +196,11 @@ message FetchAsyncSearchResultResponse {
int64 size = 16;
}

message CancelAsyncSearchRequest{
message CancelAsyncSearchRequest {
string search_id = 1;
}

message CancelAsyncSearchResponse{}
message CancelAsyncSearchResponse {}

message DeleteAsyncSearchRequest {
string search_id = 1;
Expand All @@ -210,7 +210,7 @@ message DeleteAsyncSearchResponse {}

message GetAsyncSearchesListRequest {
optional AsyncSearchStatus status = 1;
repeated string ids = 2;
repeated string ids = 2;
}

message GetAsyncSearchesListResponse {
Expand All @@ -229,7 +229,7 @@ message AsyncSearchesListItem {

repeated AggQuery aggs = 9;
int64 histogram_interval = 10;

string query = 11;
google.protobuf.Timestamp from = 12;
google.protobuf.Timestamp to = 13;
Expand All @@ -254,6 +254,9 @@ message FetchRequest {
bool explain = 3;
repeated IdWithHint ids_with_hints = 4;
FieldsFilter fields_filter = 5;
// if false, skip masks will be evaluated
// set to true in fetch after search since ids are already skipped
bool noSkipMasks = 6;
}

message StatusRequest {}
Expand Down
4 changes: 2 additions & 2 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,15 @@ func (f *Active) String() string {
return fracToString(f, "active")
}

func (f *Active) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (f *Active) Fetch(ctx context.Context, ids []seq.ID, noSkipMasks bool) ([][]byte, error) {
if f.Info().DocsTotal == 0 { // it is empty active fraction state
return nil, nil
}

dp := f.createDataProvider(ctx)
defer dp.release()

return dp.Fetch(ids)
return dp.Fetch(ids, noSkipMasks)
}

func (f *Active) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
Expand Down
10 changes: 7 additions & 3 deletions frac/active_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (dp *activeDataProvider) getTokenIndex() *activeTokenIndex {
}
}

func (dp *activeDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
func (dp *activeDataProvider) Fetch(ids []seq.ID, noSkipMasks bool) ([][]byte, error) {
sw := stopwatch.New()

defer sw.Export(
Expand All @@ -88,7 +88,7 @@ func (dp *activeDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
}}

for _, fi := range indexes {
if err := processor.IndexFetch(ids, sw, &fi, res); err != nil {
if err := processor.IndexFetch(ids, noSkipMasks, sw, &fi, res); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -271,12 +271,16 @@ func (di *activeFetchIndex) GetBlocksOffsets(num uint32) uint64 {
return di.blocksOffsets[num]
}

func (di *activeFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
func (di *activeFetchIndex) GetDocPos(ids []seq.ID, noSkipMasks bool) ([]seq.DocPos, error) {
docsPos := make([]seq.DocPos, len(ids))
for i, id := range ids {
docsPos[i] = di.docsPositions.GetSync(id)
}

if noSkipMasks {
return docsPos, nil
}

minLID, maxLID := uint32(0), uint32(math.MaxUint32)
skipLIDsIterator, has, err := di.skipMaskProvider.GetIDsIteratorByFrac(di.fracName, minLID, maxLID, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion frac/fraction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Fraction interface {
Info() *common.Info
IsIntersecting(from seq.MID, to seq.MID) bool
Contains(mid seq.MID) bool
Fetch(context.Context, []seq.ID) ([][]byte, error)
Fetch(context.Context, []seq.ID, bool) ([][]byte, error)
Search(context.Context, processor.SearchParams) (*seq.QPR, error)
FindLIDs(context.Context, []seq.ID) ([]seq.LID, error)
}
Expand Down
2 changes: 1 addition & 1 deletion frac/fraction_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func readTest(t *testing.T, fraction Fraction, numReaders, numQueries int, docs
return fmt.Errorf("search failed: %w", err)
}

fetchedResult, err := fraction.Fetch(ctx, qpr.IDs.IDs())
fetchedResult, err := fraction.Fetch(ctx, qpr.IDs.IDs(), false)
if err != nil {
return fmt.Errorf("fetch failed: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1653,7 +1653,7 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
qprIDs := qpr.IDs.IDs()
totalIDsScrolled += len(qprIDs)

docs, err := s.fraction.Fetch(context.Background(), qprIDs)
docs, err := s.fraction.Fetch(context.Background(), qprIDs, false)
s.Require().NoError(err, "fetch failed for order=%v", order)

for j, doc := range docs {
Expand Down Expand Up @@ -1962,7 +1962,7 @@ func (s *FractionTestSuite) AssertSearchWithSearchParams(
s.Require().NoError(err, "search failed for query with order=%v", order)
s.Require().Equal(len(expectedIndexes), qpr.IDs.Len(), "doc count doesn't match")

docs, err := s.fraction.Fetch(context.Background(), qpr.IDs.IDs())
docs, err := s.fraction.Fetch(context.Background(), qpr.IDs.IDs(), false)
s.Require().NoError(err, "failed to fetch docs")

if order.IsReverse() {
Expand Down
6 changes: 3 additions & 3 deletions frac/processor/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import (

type fetchIndex interface {
GetBlocksOffsets(uint32) uint64
GetDocPos([]seq.ID) ([]seq.DocPos, error)
GetDocPos([]seq.ID, bool) ([]seq.DocPos, error)
ReadDocs(blockOffset uint64, docOffsets []uint64) ([][]byte, error)
}

func IndexFetch(ids []seq.ID, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error {
func IndexFetch(ids []seq.ID, noSkipMasks bool, sw *stopwatch.Stopwatch, fetchIndex fetchIndex, res [][]byte) error {
m := sw.Start("get_docs_pos")
docsPos, err := fetchIndex.GetDocPos(ids)
docsPos, err := fetchIndex.GetDocPos(ids, noSkipMasks)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,14 @@ func (f *Remote) Contains(mid seq.MID) bool {
return f.info.IsIntersecting(mid, mid)
}

func (f *Remote) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (f *Remote) Fetch(ctx context.Context, ids []seq.ID, noSkipMasks bool) ([][]byte, error) {
dp, err := f.createDataProvider(ctx)
if err != nil {
return nil, err
}
defer dp.release()

return dp.Fetch(ids)
return dp.Fetch(ids, noSkipMasks)
}

func (f *Remote) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
Expand Down
4 changes: 2 additions & 2 deletions frac/sealed.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,10 @@ func (f *Sealed) String() string {
return fracToString(f, "sealed")
}

func (f *Sealed) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (f *Sealed) Fetch(ctx context.Context, ids []seq.ID, noSkipMasks bool) ([][]byte, error) {
dp := f.createDataProvider(ctx)
defer dp.release()
return dp.Fetch(ids)
return dp.Fetch(ids, noSkipMasks)
}

func (f *Sealed) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
Expand Down
10 changes: 7 additions & 3 deletions frac/sealed_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (dp *sealedDataProvider) release() {
dp.idsProvider.Release()
}

func (dp *sealedDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
func (dp *sealedDataProvider) Fetch(ids []seq.ID, noSkipMasks bool) ([][]byte, error) {
sw := stopwatch.New()

defer sw.Export(
Expand All @@ -100,7 +100,7 @@ func (dp *sealedDataProvider) Fetch(ids []seq.ID) ([][]byte, error) {
)

res := make([][]byte, len(ids))
if err := processor.IndexFetch(ids, sw, dp.getFetchIndex(), res); err != nil {
if err := processor.IndexFetch(ids, noSkipMasks, sw, dp.getFetchIndex(), res); err != nil {
return nil, err
}

Expand Down Expand Up @@ -280,9 +280,13 @@ func (fi *sealedFetchIndex) GetBlocksOffsets(num uint32) uint64 {
return fi.blocksOffsets[num]
}

func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID) ([]seq.DocPos, error) {
func (fi *sealedFetchIndex) GetDocPos(ids []seq.ID, noSkipMasks bool) ([]seq.DocPos, error) {
allLids := fi.findLIDs(ids)

if noSkipMasks {
return fi.getDocPosByLIDs(allLids), nil
}

minLID, maxLID := uint32(0), uint32(math.MaxUint32)
if len(allLids) > 0 {
// allLids can be not sorted
Expand Down
12 changes: 6 additions & 6 deletions fracmanager/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewFetcher(maxWorkersNum int) *Fetcher {
}
}

func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource) ([][]byte, error) {
func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource, noSkipMasks bool) ([][]byte, error) {
sw := stopwatch.New()

m := sw.Start("fill_revers_pos")
Expand All @@ -44,7 +44,7 @@ func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource)
m.Stop()

m = sw.Start("fetch_async")
docsByFracs, err := f.fetchDocsAsync(ctx, fracs, idsByFrac)
docsByFracs, err := f.fetchDocsAsync(ctx, fracs, idsByFrac, noSkipMasks)
m.Stop()

// arrange the result in the original order of ids
Expand All @@ -64,7 +64,7 @@ func (f *Fetcher) FetchDocs(ctx context.Context, fracs List, ids []seq.IDSource)
return result, err
}

func (f *Fetcher) fetchDocsAsync(ctx context.Context, fracs []frac.Fraction, idsByFrac [][]seq.ID) ([][][]byte, error) {
func (f *Fetcher) fetchDocsAsync(ctx context.Context, fracs []frac.Fraction, idsByFrac [][]seq.ID, noSkipMasks bool) ([][][]byte, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -84,7 +84,7 @@ loop:
wg.Add(1)
go func() {
var fracErr error
if docs[i], fracErr = fracFetch(ctx, frac, idsByFrac[i]); fracErr != nil {
if docs[i], fracErr = fracFetch(ctx, frac, idsByFrac[i], noSkipMasks); fracErr != nil {
once.Do(func() {
err = fracErr
cancel()
Expand All @@ -105,13 +105,13 @@ loop:
return docs, nil
}

func fracFetch(ctx context.Context, f frac.Fraction, ids []seq.ID) (_ [][]byte, err error) {
func fracFetch(ctx context.Context, f frac.Fraction, ids []seq.ID, noSkipMasks bool) (_ [][]byte, err error) {
defer func() {
if panicData := util.RecoverToError(recover(), metric.StorePanics); panicData != nil {
err = fmt.Errorf("internal error: fetch panicked on fraction %s, error=%w", f.Info().Name(), panicData)
}
}()
return f.Fetch(ctx, ids)
return f.Fetch(ctx, ids, noSkipMasks)
}

func sortIDs(idsOrig seq.IDSources) (seq.IDSources, seq.MID, seq.MID) {
Expand Down
14 changes: 7 additions & 7 deletions fracmanager/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestFetcher_ShouldFetchMultiFrac(t *testing.T) {
{ID: seq.SimpleID(30)},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs, false)

assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("doc1"), []byte("doc2"), []byte("doc4"), []byte("doc3")}, docs)
Expand All @@ -51,7 +51,7 @@ func TestFetcher_DocNotFound(t *testing.T) {
{ID: seq.SimpleID(20)},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1}, fetchIDs, false)

assert.NoError(t, err)
assert.Len(t, docs, 2)
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestFetcher_ShouldUseHints(t *testing.T) {
{ID: seq.SimpleID(10), Hint: frac1.Info().Name()},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs, false)

assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("apple"), []byte("pineapple"), []byte("orange")}, docs)
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestFetcher_ShouldUseHints_MixedScenario(t *testing.T) {
{ID: seq.SimpleID(50)},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2, frac3}, fetchIDs, false)

assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("apple"), []byte("pineapple"), []byte("orange"), []byte("mango")}, docs)
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestFetcher_OutOfRangeFractions(t *testing.T) {
{ID: seq.SimpleID(20)},
}

docs, err := fetcher.FetchDocs(context.Background(), List{frac1}, fetchIDs)
docs, err := fetcher.FetchDocs(context.Background(), List{frac1}, fetchIDs, false)

assert.NoError(t, err)
assert.Equal(t, [][]byte{[]byte("apple"), nil, []byte("banana"), nil}, docs)
Expand All @@ -170,7 +170,7 @@ func TestFetcher_FetchError(t *testing.T) {

fetchIDs := []seq.IDSource{{ID: seq.SimpleID(20)}}

_, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2}, fetchIDs)
_, err := fetcher.FetchDocs(context.Background(), List{frac1, frac2}, fetchIDs, false)

assert.ErrorContains(t, err, "fetch failed")
}
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestFetcher_ContextCancellation(t *testing.T) {
{ID: seq.SimpleID(20)},
}

_, err := fetcher.FetchDocs(ctx, List{frac1, frac2, frac3, frac4}, ids)
_, err := fetcher.FetchDocs(ctx, List{frac1, frac2, frac3, frac4}, ids, false)

assert.Error(t, err)
assert.ErrorIs(t, context.Canceled, err)
Expand Down
6 changes: 3 additions & 3 deletions fracmanager/proxy_frac.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func (p *fractionProxy) Contains(mid seq.MID) bool {
return p.impl.Contains(mid)
}

func (p *fractionProxy) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (p *fractionProxy) Fetch(ctx context.Context, ids []seq.ID, noSkipMasks bool) ([][]byte, error) {
p.mu.RLock()
defer p.mu.RUnlock()
return p.impl.Fetch(ctx, ids)
return p.impl.Fetch(ctx, ids, noSkipMasks)
}

func (p *fractionProxy) Search(ctx context.Context, params processor.SearchParams) (*seq.QPR, error) {
Expand Down Expand Up @@ -187,7 +187,7 @@ func (emptyFraction) Contains(mid seq.MID) bool {
return false
}

func (emptyFraction) Fetch(ctx context.Context, ids []seq.ID) ([][]byte, error) {
func (emptyFraction) Fetch(ctx context.Context, ids []seq.ID, noSkipMasks bool) ([][]byte, error) {
return nil, nil
}

Expand Down
Loading
Loading