Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion indexer/pkg/readers/verifier_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func NewVerifierReader(ctx context.Context, verifier protocol.VerifierResultsAPI
verifier: verifier,
demux: common.NewDemultiplexer[protocol.Bytes32, protocol.VerifierResult](),
batcher: batcher.NewBatcher[protocol.Bytes32](
batcherCtx,
config.BatchSize,
time.Duration(config.MaxBatchWaitTime)*time.Millisecond,
1,
Expand Down Expand Up @@ -82,6 +81,7 @@ func (v *VerifierReader) ProcessMessage(messageID protocol.Bytes32) (chan common
// Start returns immediately after spawning the background goroutine. It does not
// wait for the goroutine to complete.
func (v *VerifierReader) Start(ctx context.Context) error {
v.batcher.Start(v.batcherCtx)
runCtx, cancel := context.WithCancel(ctx)
v.runCancel = cancel
v.runWg.Go(func() {
Expand Down
3 changes: 2 additions & 1 deletion integration/pkg/constructors/committee_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// Signing is passed in because it's managed differently in the CL node vs standalone modes.
// The DataSource is passed in from CL node, or created by standalone mode.
func NewVerificationCoordinator(
ctx context.Context,
lggr logger.Logger,
cfg commit.Config,
aggregatorSecret *hmac.ClientConfig,
Expand Down Expand Up @@ -172,7 +173,7 @@ func NewVerificationCoordinator(

// Create verification coordinator
verifierCoordinator, err := verifier.NewCoordinator(
context.TODO(),
ctx,
lggr,
commitVerifier,
sourceReaders,
Expand Down
45 changes: 32 additions & 13 deletions protocol/common/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package batcher

import (
"context"
"errors"
"sync"
"time"
)

var errBatcherNotStarted = errors.New("batcher not started: call Start(ctx) before Add/Retry/AddImmediate")

// Batcher accumulates items and flushes them in batches based on size or time thresholds.
// It maintains insertion order (FIFO) within batches and is thread-safe.
// The batcher also supports delayed retries of failed items via the Retry method.
// This implementation follows Go's CSP approach using channels for communication.
// Call Start(ctx) with the lifecycle context before using Add, Retry, or AddImmediate.
type Batcher[T any] struct {
maxSize int
maxWait time.Duration
Expand All @@ -18,8 +22,9 @@ type Batcher[T any] struct {
retryCh chan []retryItem[T]
outCh chan BatchResult[T]

ctx context.Context
wg sync.WaitGroup
ctx context.Context
wg sync.WaitGroup
once sync.Once
}

// BatchResult carries a batch of items with an optional error.
Expand Down Expand Up @@ -48,35 +53,42 @@ type retryItem[T any] struct {
retryTime time.Time
}

// NewBatcher creates a new Batcher instance.
// The batcher will automatically flush when ctx is canceled.
// NewBatcher creates a new Batcher instance. Call Start(ctx) with the lifecycle context
// before using Add, Retry, or AddImmediate. The batcher will flush and stop when ctx is canceled.
// maxSize: maximum number of items before triggering a flush
// maxWait: maximum duration to wait before flushing incomplete batch
// outChannelSize: size of the output channel buffer (0 for unbuffered, consider your use case
// providing it the right buffer if needed).
func NewBatcher[T any](ctx context.Context, maxSize int, maxWait time.Duration, outChannelSize int) *Batcher[T] {
b := &Batcher[T]{
func NewBatcher[T any](maxSize int, maxWait time.Duration, outChannelSize int) *Batcher[T] {
return &Batcher[T]{
maxSize: maxSize,
maxWait: maxWait,
outCh: make(chan BatchResult[T], outChannelSize),
addCh: make(chan []T),
retryCh: make(chan []retryItem[T]),
ctx: ctx,
}
}

b.wg.Add(1)
go b.run()

return b
// Start starts the batcher with the given lifecycle context. The batcher will stop when ctx is canceled.
// Must be called before Add, Retry, or AddImmediate. Idempotent: safe to call multiple times (first call wins).
func (b *Batcher[T]) Start(ctx context.Context) {
b.once.Do(func() {
b.ctx = ctx //nolint:fatcontext // lifecycle context for run() cancellation only
b.wg.Add(1)
go b.run()
})
}

func (b *Batcher[T]) OutChannel() <-chan BatchResult[T] {
return b.outCh
}

// Add adds an item to the batcher. It may trigger a flush if the batch size is reached.
// This method is thread-safe and non-blocking.
// This method is thread-safe and non-blocking. Returns errBatcherNotStarted if Start was not called.
func (b *Batcher[T]) Add(item ...T) error {
if b.ctx == nil {
return errBatcherNotStarted
}
select {
case b.addCh <- item:
return nil
Expand All @@ -86,8 +98,11 @@ func (b *Batcher[T]) Add(item ...T) error {
}

// AddImmediate sends a batch result immediately to the output channel.
// This method is thread-safe and non-blocking.
// This method is thread-safe and non-blocking. Returns errBatcherNotStarted if Start was not called.
func (b *Batcher[T]) AddImmediate(item BatchResult[T]) error {
if b.ctx == nil {
return errBatcherNotStarted
}
select {
case b.outCh <- item:
return nil
Expand All @@ -100,7 +115,11 @@ func (b *Batcher[T]) AddImmediate(item BatchResult[T]) error {
// The items will be moved to the main buffer after the delay expires.
// This method is thread-safe and non-blocking. Keep in mind that minDelay is approximate,
// because the actual retry processing depends on the background goroutine's timing.
// Returns errBatcherNotStarted if Start was not called.
func (b *Batcher[T]) Retry(minDelay time.Duration, items ...T) error {
if b.ctx == nil {
return errBatcherNotStarted
}
retryTime := time.Now().Add(minDelay)
retryItems := make([]retryItem[T], 0, len(items))
for _, item := range items {
Expand Down
32 changes: 24 additions & 8 deletions protocol/common/batcher/batcher_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ func TestBatcher_RetryBasic(t *testing.T) {
maxSize := 10
maxWait := 50 * time.Millisecond // Short maxWait so retry ticker runs every 100ms

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b
t.Cleanup(func() {
cancel()
_ = batcher.Close()
Expand Down Expand Up @@ -47,7 +49,9 @@ func TestBatcher_RetryWithSizeBasedFlush(t *testing.T) {
maxSize := 5
maxWait := 50 * time.Millisecond // Short wait for retry ticker

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b
t.Cleanup(func() {
cancel()
_ = batcher.Close()
Expand Down Expand Up @@ -80,7 +84,9 @@ func TestBatcher_RetryMixedWithAdd(t *testing.T) {
maxSize := 10
maxWait := 100 * time.Millisecond

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b
t.Cleanup(func() {
cancel()
_ = batcher.Close()
Expand Down Expand Up @@ -130,7 +136,9 @@ func TestBatcher_RetryMultipleBatchesWithDifferentDelays(t *testing.T) {
maxSize := 10
maxWait := 50 * time.Millisecond

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Schedule first retry batch with short delay
firstRetry := []int{1, 2}
Expand Down Expand Up @@ -184,7 +192,9 @@ func TestBatcher_RetryPreservesOrder(t *testing.T) {
maxSize := 10
maxWait := 100 * time.Millisecond

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Retry items in specific order
retryItems := []int{5, 3, 9, 1, 7}
Expand Down Expand Up @@ -217,7 +227,9 @@ func TestBatcher_RetryEmptySlice(t *testing.T) {
maxSize := 10
maxWait := 100 * time.Millisecond

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Retry empty slice should not cause issues
err := batcher.Retry(50 * time.Millisecond)
Expand Down Expand Up @@ -246,7 +258,9 @@ func TestBatcher_RetryZeroDelay(t *testing.T) {
maxSize := 10
maxWait := 50 * time.Millisecond // Short maxWait so retry ticker runs every 100ms

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Retry with zero delay - should be processed on next retry ticker
retryItems := []int{1, 2, 3}
Expand Down Expand Up @@ -278,7 +292,9 @@ func TestBatcher_ConcurrentRetries(t *testing.T) {
maxSize := 50
maxWait := 100 * time.Millisecond

batcher := NewBatcher[int](ctx, maxSize, maxWait, 100)
b := NewBatcher[int](maxSize, maxWait, 100)
b.Start(ctx)
batcher := b

// Concurrently schedule retries from multiple goroutines
numGoroutines := 5
Expand Down
24 changes: 18 additions & 6 deletions protocol/common/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ func TestBatcher_SizeBasedFlush(t *testing.T) {
maxSize := 5
maxWait := 1 * time.Second

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Add exactly maxSize items
for i := range maxSize {
Expand Down Expand Up @@ -45,7 +47,9 @@ func TestBatcher_TimeBasedFlush(t *testing.T) {
maxSize := 100
maxWait := 50 * time.Millisecond

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Add just 3 items (well below maxSize)
for i := range 3 {
Expand Down Expand Up @@ -77,7 +81,9 @@ func TestBatcher_InsertionOrder(t *testing.T) {
maxSize := 10
maxWait := 1 * time.Second

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Add items in specific order
expectedOrder := []int{5, 3, 9, 1, 7, 2, 8, 4, 6, 0}
Expand Down Expand Up @@ -106,7 +112,9 @@ func TestBatcher_MultipleBatches(t *testing.T) {
maxSize := 3
maxWait := 1 * time.Second

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Add items that will trigger multiple batches
totalItems := 9
Expand Down Expand Up @@ -141,7 +149,9 @@ func TestBatcher_EmptyClose(t *testing.T) {
maxSize := 10
maxWait := 1 * time.Second

batcher := NewBatcher[int](ctx, maxSize, maxWait, 10)
b := NewBatcher[int](maxSize, maxWait, 10)
b.Start(ctx)
batcher := b

// Cancel context first
cancel()
Expand All @@ -167,7 +177,9 @@ func TestBatcher_ConcurrentAdds(t *testing.T) {
maxSize := 50
maxWait := 100 * time.Millisecond

batcher := NewBatcher[int](ctx, maxSize, maxWait, 100)
b := NewBatcher[int](maxSize, maxWait, 100)
b.Start(ctx)
batcher := b

// Concurrently add items from multiple goroutines
numGoroutines := 10
Expand Down
1 change: 0 additions & 1 deletion verifier/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ func newTestSRS(
lggr := logger.Test(t)

srs, err := NewSourceReaderService(
t.Context(),
reader,
chainSelector,
chainStatusMgr,
Expand Down
3 changes: 1 addition & 2 deletions verifier/source_reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ type SourceReaderService struct {

// NewSourceReaderService Constructor: same style as SRS.
func NewSourceReaderService(
ctx context.Context,
sourceReader chainaccess.SourceReader,
chainSelector protocol.ChainSelector,
chainStatusManager protocol.ChainStatusManager,
Expand Down Expand Up @@ -128,7 +127,6 @@ func NewSourceReaderService(

batchSize, batchTimeout := readerConfigWithDefaults(lggr, sourceCfg)
readyTaskBatcher := batcher.NewBatcher[VerificationTask](
ctx,
batchSize,
batchTimeout,
1,
Expand Down Expand Up @@ -166,6 +164,7 @@ func (r *SourceReaderService) ReadyTasksChannel() <-chan batcher.BatchResult[Ver
func (r *SourceReaderService) Start(ctx context.Context) error {
return r.StartOnce(r.Name(), func() error {
r.logger.Infow("Starting SourceReaderService")
r.readyTasksBatcher.Start(ctx)

startBlock, err := r.initializeStartBlock(ctx)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion verifier/source_reader_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,6 @@ func TestSRS_DisableFinalityChecker(t *testing.T) {
lggr := logger.Test(t)

srs, err := NewSourceReaderService(
context.Background(),
reader,
chain,
chainStatusMgr,
Expand Down
3 changes: 1 addition & 2 deletions verifier/storage_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type StorageWriterProcessor struct {
}

func NewStorageBatcherProcessor(
ctx context.Context,
lggr logger.Logger,
verifierID string,
messageTracker MessageLatencyTracker,
Expand All @@ -50,7 +49,6 @@ func NewStorageBatcherProcessor(
) (*StorageWriterProcessor, *batcher.Batcher[protocol.VerifierNodeResult], error) {
storageBatchSize, storageBatchTimeout, retryDelay := configWithDefaults(lggr, config)
storageBatcher := batcher.NewBatcher[protocol.VerifierNodeResult](
ctx,
storageBatchSize,
storageBatchTimeout,
1,
Expand Down Expand Up @@ -93,6 +91,7 @@ func configWithDefaults(lggr logger.Logger, config CoordinatorConfig) (int, time

func (s *StorageWriterProcessor) Start(ctx context.Context) error {
return s.StartOnce(s.Name(), func() error {
s.batcher.Start(ctx)
s.wg.Go(func() {
s.run(ctx)
})
Expand Down
Loading
Loading