From 25c05c30a0cf5c20d8f0a0c9372ad6d27cd98453 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Wed, 11 Mar 2026 18:20:36 +0300 Subject: [PATCH] core: new deadliner (#4386) Deadliner component refactoring. category: refactor ticket: none --- core/deadline.go | 162 +++++++++++++++++------------------------- core/deadline_test.go | 11 +-- 2 files changed, 72 insertions(+), 101 deletions(-) diff --git a/core/deadline.go b/core/deadline.go index cd948e485..dbee972c3 100644 --- a/core/deadline.go +++ b/core/deadline.go @@ -4,6 +4,7 @@ package core import ( "context" + "sync" "testing" "time" @@ -19,7 +20,9 @@ import ( const ( // marginFactor defines the fraction of the slot duration to use as a margin. // This is to consider network delays and other factors that may affect the timing. - marginFactor = 12 + marginFactor = 12 + expiredBufferSize = 10 + tickerInterval = time.Second ) // DeadlineFunc is a function that returns the deadline for a duty. @@ -39,19 +42,15 @@ type Deadliner interface { C() <-chan Duty } -// deadlineInput represents the input to inputChan. -type deadlineInput struct { - duty Duty - success chan<- bool -} - // deadliner implements the Deadliner interface. type deadliner struct { + lock sync.Mutex label string - inputChan chan deadlineInput - deadlineChan chan Duty + deadlineFunc DeadlineFunc + duties map[Duty]time.Time + expiredChan chan Duty clock clockwork.Clock - quit chan struct{} + done chan struct{} } // NewDeadlinerForT returns a Deadline for use in tests. @@ -64,7 +63,7 @@ func NewDeadlinerForT(ctx context.Context, t *testing.T, deadlineFunc DeadlineFu // NewDeadliner returns a new instance of Deadline. // // It also starts a goroutine which is responsible for reading and storing duties, -// and sending the deadlined duty to receiver's deadlineChan until the context is closed. +// and sending the deadlined duty to receiver's expiredChan until the context is closed. func NewDeadliner(ctx context.Context, label string, deadlineFunc DeadlineFunc) Deadliner { return newDeadliner(ctx, label, deadlineFunc, clockwork.NewRealClock()) } @@ -113,134 +112,103 @@ func NewDutyDeadlineFunc(ctx context.Context, eth2Cl eth2wrap.Client) (DeadlineF // newDeadliner returns a new Deadliner, this is for internal use only. func newDeadliner(ctx context.Context, label string, deadlineFunc DeadlineFunc, clock clockwork.Clock) Deadliner { - // outputBuffer big enough to support all duty types, which can expire at the same time - // while external consumer is synchronously adding duties (so not reading output). - const outputBuffer = 10 - d := &deadliner{ label: label, - inputChan: make(chan deadlineInput), // Not buffering this since writer wait for response. - deadlineChan: make(chan Duty, outputBuffer), + deadlineFunc: deadlineFunc, + duties: make(map[Duty]time.Time), + expiredChan: make(chan Duty, expiredBufferSize), clock: clock, - quit: make(chan struct{}), + done: make(chan struct{}), } - go d.run(ctx, deadlineFunc) + go d.run(ctx) return d } -func (d *deadliner) run(ctx context.Context, deadlineFunc DeadlineFunc) { - duties := make(map[Duty]bool) - currDuty, currDeadline := getCurrDuty(duties, deadlineFunc) - currTimer := d.clock.NewTimer(currDeadline.Sub(d.clock.Now())) +func (d *deadliner) run(ctx context.Context) { + defer close(d.done) - defer func() { - close(d.quit) - currTimer.Stop() - }() - - setCurrState := func() { - currTimer.Stop() - - currDuty, currDeadline = getCurrDuty(duties, deadlineFunc) - currTimer = d.clock.NewTimer(currDeadline.Sub(d.clock.Now())) - } + // The simple approach does not require a min-heap or priority queue to store the duties and their deadlines, + // but it is sufficient for our use case as the number of duties is expected to be small. + // A disadvantage of this approach is the expiration precision is rounded to the nearest second. + timer := d.clock.NewTicker(tickerInterval) + defer timer.Stop() - // TODO(dhruv): optimise getCurrDuty and updating current state if earlier deadline detected, - // using min heap or ordered map for { select { case <-ctx.Done(): return - case input := <-d.inputChan: - log.Debug(ctx, "Deadliner.run() received duty", z.Any("duty", input.duty)) - - deadline, canExpire := deadlineFunc(input.duty) - if !canExpire { - // Drop duties that never expire - input.success <- false - continue - } - - expired := deadline.Before(d.clock.Now()) - - input.success <- !expired - - // Ignore expired duties - if expired { + case <-timer.Chan(): + // Get all expired duties at the current time. + expiredDuties := d.getExpiredDuties(d.clock.Now()) + if len(expiredDuties) == 0 { continue } - duties[input.duty] = true + log.Debug(ctx, "Deadliner.run() got expired duties", z.Int("count", len(expiredDuties))) - if deadline.Before(currDeadline) { - setCurrState() + for _, expiredDuty := range expiredDuties { + // Send the expired duty to the receiver. + select { + case <-ctx.Done(): + return + case d.expiredChan <- expiredDuty: + } } - case <-currTimer.Chan(): - log.Debug(ctx, "Deadliner.run() currTimer expired", z.Any("currDuty", currDuty), z.Int("deadlineChanLen", len(d.deadlineChan))) - - // Send deadlined duty to receiver. - select { - case <-ctx.Done(): - return - case d.deadlineChan <- currDuty: - default: - log.Warn(ctx, "Deadliner output channel full", nil, - z.Str("label", d.label), - z.Any("duty", currDuty), - ) - } - - delete(duties, currDuty) - setCurrState() } } } // Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully. func (d *deadliner) Add(duty Duty) bool { - log.Debug(context.Background(), "Deadliner.Add() adding duty", z.Any("duty", duty)) - - success := make(chan bool) + log.Debug(context.Background(), "Deadliner.Add()", z.Any("duty", duty)) select { - case <-d.quit: + case <-d.done: + // Run goroutine has stopped, ignore new duties. return false - case d.inputChan <- deadlineInput{duty: duty, success: success}: + default: } - select { - case <-d.quit: + deadline, canExpire := d.deadlineFunc(duty) + if !canExpire { + // Drop duties that never expire + return false + } + + expired := deadline.Before(d.clock.Now()) + if expired { + // Drop expired duties return false - case ok := <-success: - return ok } + + d.lock.Lock() + defer d.lock.Unlock() + + d.duties[duty] = deadline + + return true } // C returns the deadline channel. func (d *deadliner) C() <-chan Duty { - return d.deadlineChan + return d.expiredChan } -// getCurrDuty gets the duty to process next along-with the duty deadline. It selects duty with the latest deadline. -func getCurrDuty(duties map[Duty]bool, deadlineFunc DeadlineFunc) (Duty, time.Time) { - var currDuty Duty - - currDeadline := time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC) +// getExpiredDuties selects all expired duties. +func (d *deadliner) getExpiredDuties(now time.Time) []Duty { + expiredDuties := []Duty{} - for duty := range duties { - dutyDeadline, ok := deadlineFunc(duty) - if !ok { - // Ignore the duties that never expire. - continue - } + d.lock.Lock() + defer d.lock.Unlock() - if currDeadline.After(dutyDeadline) { - currDuty = duty - currDeadline = dutyDeadline + for duty, deadline := range d.duties { + if deadline.Before(now) { + expiredDuties = append(expiredDuties, duty) + delete(d.duties, duty) } } - return currDuty, currDeadline + return expiredDuties } diff --git a/core/deadline_test.go b/core/deadline_test.go index 258158784..756416571 100644 --- a/core/deadline_test.go +++ b/core/deadline_test.go @@ -18,8 +18,6 @@ import ( "github.com/obolnetwork/charon/testutil/beaconmock" ) -//go:generate go test . - func TestDeadliner(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) defer cancel() @@ -45,6 +43,10 @@ func TestDeadliner(t *testing.T) { deadliner := core.NewDeadlinerForT(ctx, t, deadlineFuncProvider(), clock) + // Wait for the run goroutine to be waiting on the ticker before interacting. + err := clock.BlockUntilContext(ctx, 1) + require.NoError(t, err) + wg := &sync.WaitGroup{} // Add our duties to the deadliner. @@ -73,8 +75,9 @@ func TestDeadliner(t *testing.T) { } } - // Advance clock to trigger deadline of all non-expired duties. - clock.Advance(time.Duration(maxSlot) * time.Second) + // Advance clock past the latest deadline to trigger expiration of all non-expired duties. + // Use maxSlot+1 because Before() is strict (not <=). + clock.Advance(time.Duration(maxSlot+1) * time.Second) var actualDuties []core.Duty for range len(nonExpiredDuties) {