Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
// Consensus
consensusController, err := consensus.NewConsensusController(
ctx, eth2Cl, p2pNode, sender, peers, p2pKey,
deadlineFunc, gaterFunc, consensusDebugger)
deadlineFunc, gaterFunc, consensusDebugger, featureset.Enabled(featureset.ChainSplitHalt))
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion app/log/loki/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
batchWait = 1 * time.Second
batchMax = 5 * 1 << 20 // 5MB
maxLogLineLen = 4 << 10 // 4096B
inputBuffer = 1000 // Buffered channel capacity for log lines
)

// lazyLabelsFunc abstracts lazy loading of labels, logs will only be sent when it returns true.
Expand Down Expand Up @@ -59,7 +60,7 @@ func newInternal(endpoint string, serviceLabel string, batchWait time.Duration,
endpoint: endpoint,
done: make(chan struct{}),
quit: make(chan struct{}),
input: make(chan string),
input: make(chan string, inputBuffer),
batchMax: batchMax,
batchWait: batchWait,
maxLogLineLen: maxLogLineLen,
Expand Down Expand Up @@ -156,6 +157,8 @@ func (c *Client) Add(line string) {
select {
case c.input <- line:
case <-c.quit:
default:
droppedTotal.Inc()
}
}

Expand Down
16 changes: 16 additions & 0 deletions app/log/loki/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright © 2022-2026 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1

package loki

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/obolnetwork/charon/app/promauto"
)

var droppedTotal = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "app",
Subsystem: "log_loki",
Name: "dropped_total",
Help: "Total count of dropped log lines due to full buffer",
})
4 changes: 2 additions & 2 deletions core/consensus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ type consensusController struct {
// NewConsensusController creates a new consensus controller with the default consensus protocol.
func NewConsensusController(ctx context.Context, eth2Cl eth2wrap.Client, p2pNode host.Host, sender *p2p.Sender,
peers []p2p.Peer, p2pKey *k1.PrivateKey, deadlineFunc core.DeadlineFunc,
gaterFunc core.DutyGaterFunc, debugger Debugger,
gaterFunc core.DutyGaterFunc, debugger Debugger, compareAttestations bool,
) (core.ConsensusController, error) {
qbftDeadliner := core.NewDeadliner(ctx, "consensus.qbft", deadlineFunc)

defaultConsensus, err := qbft.NewConsensus(ctx, eth2Cl, p2pNode, sender, peers, p2pKey, qbftDeadliner, gaterFunc, debugger.AddInstance)
defaultConsensus, err := qbft.NewConsensus(ctx, eth2Cl, p2pNode, sender, peers, p2pKey, qbftDeadliner, gaterFunc, debugger.AddInstance, compareAttestations)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestConsensusController(t *testing.T) {
bmock, err := beaconmock.New(ctx)
require.NoError(t, err)

controller, err := consensus.NewConsensusController(ctx, bmock, hosts[0], new(p2p.Sender), peers, p2pkeys[0], deadlineFunc, gaterFunc, debugger)
controller, err := consensus.NewConsensusController(ctx, bmock, hosts[0], new(p2p.Sender), peers, p2pkeys[0], deadlineFunc, gaterFunc, debugger, false)
require.NoError(t, err)
require.NotNil(t, controller)

Expand Down
2 changes: 1 addition & 1 deletion core/consensus/instance/instance_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
RecvBufferSize = 512 // Allow buffering some initial messages when this node is late to start an instance.
RecvBufferSize = 100 // Allow buffering some initial messages when this node is late to start an instance.
)

// NewIO returns a new instanceIO.
Expand Down
10 changes: 5 additions & 5 deletions core/consensus/qbft/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newMsg(pbMsg *pbv1.QBFTMsg, justification []*pbv1.QBFTMsg, values map[[32]b
}
}

var justImpls []qbft.Msg[core.Duty, [32]byte]
var justImpls []qbft.Msg[core.Duty, [32]byte, proto.Message]

for _, j := range justification {
impl, err := newMsg(j, nil, values)
Expand All @@ -62,15 +62,15 @@ func newMsg(pbMsg *pbv1.QBFTMsg, justification []*pbv1.QBFTMsg, values map[[32]b
}, nil
}

// Msg wraps *pbv1.QBFTMsg and justifications and implements qbft.Msg[core.Duty, [32]byte].
// Msg wraps *pbv1.QBFTMsg and justifications and implements qbft.Msg[core.Duty, [32]byte, proto.Message].
type Msg struct {
msg *pbv1.QBFTMsg
valueHash [32]byte
preparedValueHash [32]byte
values map[[32]byte]*anypb.Any

justificationProtos []*pbv1.QBFTMsg
justification []qbft.Msg[core.Duty, [32]byte]
justification []qbft.Msg[core.Duty, [32]byte, proto.Message]
}

func (m Msg) Type() qbft.MsgType {
Expand Down Expand Up @@ -118,7 +118,7 @@ func (m Msg) PreparedValue() [32]byte {
return m.preparedValueHash
}

func (m Msg) Justification() []qbft.Msg[core.Duty, [32]byte] {
func (m Msg) Justification() []qbft.Msg[core.Duty, [32]byte, proto.Message] {
return m.justification
}

Expand Down Expand Up @@ -227,4 +227,4 @@ func toHash32(val []byte) ([32]byte, bool) {
return resp, true
}

var _ qbft.Msg[core.Duty, [32]byte] = Msg{} // Interface assertion
var _ qbft.Msg[core.Duty, [32]byte, proto.Message] = Msg{} // Interface assertion
Loading
Loading