Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 65 additions & 97 deletions core/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package core

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -19,7 +20,9 @@ import (
const (
// marginFactor defines the fraction of the slot duration to use as a margin.
// This is to consider network delays and other factors that may affect the timing.
marginFactor = 12
marginFactor = 12
expiredBufferSize = 10
tickerInterval = time.Second
)

// DeadlineFunc is a function that returns the deadline for a duty.
Expand All @@ -39,19 +42,15 @@ type Deadliner interface {
C() <-chan Duty
}

// deadlineInput represents the input to inputChan.
type deadlineInput struct {
duty Duty
success chan<- bool
}

// deadliner implements the Deadliner interface.
type deadliner struct {
lock sync.Mutex
label string
inputChan chan deadlineInput
deadlineChan chan Duty
deadlineFunc DeadlineFunc
duties map[Duty]time.Time
expiredChan chan Duty
clock clockwork.Clock
quit chan struct{}
done chan struct{}
}

// NewDeadlinerForT returns a Deadline for use in tests.
Expand All @@ -64,7 +63,7 @@ func NewDeadlinerForT(ctx context.Context, t *testing.T, deadlineFunc DeadlineFu
// NewDeadliner returns a new instance of Deadline.
//
// It also starts a goroutine which is responsible for reading and storing duties,
// and sending the deadlined duty to receiver's deadlineChan until the context is closed.
// and sending the deadlined duty to receiver's expiredChan until the context is closed.
func NewDeadliner(ctx context.Context, label string, deadlineFunc DeadlineFunc) Deadliner {
return newDeadliner(ctx, label, deadlineFunc, clockwork.NewRealClock())
}
Expand Down Expand Up @@ -113,134 +112,103 @@ func NewDutyDeadlineFunc(ctx context.Context, eth2Cl eth2wrap.Client) (DeadlineF

// newDeadliner returns a new Deadliner, this is for internal use only.
func newDeadliner(ctx context.Context, label string, deadlineFunc DeadlineFunc, clock clockwork.Clock) Deadliner {
// outputBuffer big enough to support all duty types, which can expire at the same time
// while external consumer is synchronously adding duties (so not reading output).
const outputBuffer = 10

d := &deadliner{
label: label,
inputChan: make(chan deadlineInput), // Not buffering this since writer wait for response.
deadlineChan: make(chan Duty, outputBuffer),
deadlineFunc: deadlineFunc,
duties: make(map[Duty]time.Time),
expiredChan: make(chan Duty, expiredBufferSize),
clock: clock,
quit: make(chan struct{}),
done: make(chan struct{}),
}

go d.run(ctx, deadlineFunc)
go d.run(ctx)

return d
}

func (d *deadliner) run(ctx context.Context, deadlineFunc DeadlineFunc) {
duties := make(map[Duty]bool)
currDuty, currDeadline := getCurrDuty(duties, deadlineFunc)
currTimer := d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))
func (d *deadliner) run(ctx context.Context) {
defer close(d.done)

defer func() {
close(d.quit)
currTimer.Stop()
}()

setCurrState := func() {
currTimer.Stop()

currDuty, currDeadline = getCurrDuty(duties, deadlineFunc)
currTimer = d.clock.NewTimer(currDeadline.Sub(d.clock.Now()))
}
// The simple approach does not require a min-heap or priority queue to store the duties and their deadlines,
// but it is sufficient for our use case as the number of duties is expected to be small.
// A disadvantage of this approach is the expiration precision is rounded to the nearest second.
timer := d.clock.NewTicker(tickerInterval)
defer timer.Stop()

// TODO(dhruv): optimise getCurrDuty and updating current state if earlier deadline detected,
// using min heap or ordered map
for {
select {
case <-ctx.Done():
return
case input := <-d.inputChan:
log.Debug(ctx, "Deadliner.run() received duty", z.Any("duty", input.duty))

deadline, canExpire := deadlineFunc(input.duty)
if !canExpire {
// Drop duties that never expire
input.success <- false
continue
}

expired := deadline.Before(d.clock.Now())

input.success <- !expired

// Ignore expired duties
if expired {
case <-timer.Chan():
// Get all expired duties at the current time.
expiredDuties := d.getExpiredDuties(d.clock.Now())
if len(expiredDuties) == 0 {
continue
}

duties[input.duty] = true
log.Debug(ctx, "Deadliner.run() got expired duties", z.Int("count", len(expiredDuties)))

if deadline.Before(currDeadline) {
setCurrState()
for _, expiredDuty := range expiredDuties {
// Send the expired duty to the receiver.
select {
case <-ctx.Done():
return
case d.expiredChan <- expiredDuty:
}
}
case <-currTimer.Chan():
log.Debug(ctx, "Deadliner.run() currTimer expired", z.Any("currDuty", currDuty), z.Int("deadlineChanLen", len(d.deadlineChan)))

// Send deadlined duty to receiver.
select {
case <-ctx.Done():
return
case d.deadlineChan <- currDuty:
default:
log.Warn(ctx, "Deadliner output channel full", nil,
z.Str("label", d.label),
z.Any("duty", currDuty),
)
}

delete(duties, currDuty)
setCurrState()
}
}
}

// Add adds a duty to be notified of the deadline. It returns true if the duty was added successfully.
func (d *deadliner) Add(duty Duty) bool {
log.Debug(context.Background(), "Deadliner.Add() adding duty", z.Any("duty", duty))

success := make(chan bool)
log.Debug(context.Background(), "Deadliner.Add()", z.Any("duty", duty))

select {
case <-d.quit:
case <-d.done:
// Run goroutine has stopped, ignore new duties.
return false
case d.inputChan <- deadlineInput{duty: duty, success: success}:
default:
}

select {
case <-d.quit:
deadline, canExpire := d.deadlineFunc(duty)
if !canExpire {
// Drop duties that never expire
return false
}

expired := deadline.Before(d.clock.Now())
if expired {
// Drop expired duties
return false
case ok := <-success:
return ok
}

d.lock.Lock()
defer d.lock.Unlock()

d.duties[duty] = deadline

return true
}

// C returns the deadline channel.
func (d *deadliner) C() <-chan Duty {
return d.deadlineChan
return d.expiredChan
}

// getCurrDuty gets the duty to process next along-with the duty deadline. It selects duty with the latest deadline.
func getCurrDuty(duties map[Duty]bool, deadlineFunc DeadlineFunc) (Duty, time.Time) {
var currDuty Duty

currDeadline := time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)
// getExpiredDuties selects all expired duties.
func (d *deadliner) getExpiredDuties(now time.Time) []Duty {
expiredDuties := []Duty{}

for duty := range duties {
dutyDeadline, ok := deadlineFunc(duty)
if !ok {
// Ignore the duties that never expire.
continue
}
d.lock.Lock()
defer d.lock.Unlock()

if currDeadline.After(dutyDeadline) {
currDuty = duty
currDeadline = dutyDeadline
for duty, deadline := range d.duties {
if deadline.Before(now) {
expiredDuties = append(expiredDuties, duty)
delete(d.duties, duty)
}
}

return currDuty, currDeadline
return expiredDuties
}
11 changes: 7 additions & 4 deletions core/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/obolnetwork/charon/testutil/beaconmock"
)

//go:generate go test .

func TestDeadliner(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
Expand All @@ -45,6 +43,10 @@ func TestDeadliner(t *testing.T) {

deadliner := core.NewDeadlinerForT(ctx, t, deadlineFuncProvider(), clock)

// Wait for the run goroutine to be waiting on the ticker before interacting.
err := clock.BlockUntilContext(ctx, 1)
require.NoError(t, err)

wg := &sync.WaitGroup{}

// Add our duties to the deadliner.
Expand Down Expand Up @@ -73,8 +75,9 @@ func TestDeadliner(t *testing.T) {
}
}

// Advance clock to trigger deadline of all non-expired duties.
clock.Advance(time.Duration(maxSlot) * time.Second)
// Advance clock past the latest deadline to trigger expiration of all non-expired duties.
// Use maxSlot+1 because Before() is strict (not <=).
clock.Advance(time.Duration(maxSlot+1) * time.Second)

var actualDuties []core.Duty
for range len(nonExpiredDuties) {
Expand Down
Loading