Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ require (
github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260622152157-c8e129347b8b
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b
github.com/smartcontractkit/chainlink-protos/metering/go v0.0.0-20260626162307-b56f9af1d196
github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b
github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0
github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions pkg/durableemitter/durable_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to reader: changes made here instead of pkg/beholder as we are trying to leave beholder as is without accruing much more complexity, and durableemitter pkg is completely owned by foundations, so is a cleaner division of responsibility

cc @pkcll

)

// nodeCSAKeyExtension is the CloudEvent extension name carrying the node's CSA
// public key on durable events. CloudEvents extension names must be lowercase
// alphanumeric, so this is the durable-path equivalent of beholder's
// node_csa_key attribute.
const nodeCSAKeyExtension = "nodecsakey"

// BatchEmitter is the transport interface DurableEmitter delegates to for
// batched delivery of CloudEvents to Chip Ingress.
//
Expand All @@ -45,6 +51,10 @@ type BatchEmitter interface {

// Config configures the DurableEmitter behaviour.
type Config struct {
// NodeCSAKey is the node's CSA public key (hex). When non-empty it is
// stamped as the node_csa_key attribute on every emitted event, providing a
// durable node identity for downstream billing/lookup
NodeCSAKey string
// RetransmitInterval controls how often the retransmit loop ticks.
RetransmitInterval time.Duration
// RetransmitAfter is the minimum age of an event before the retransmit
Expand Down Expand Up @@ -376,6 +386,11 @@ func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any)
}

event.SetExtension("emitter", "DurableEmitter")
// Include the node's CSA public key so every durable event carries a
// durable node identity for downstream billing/lookup
if d.cfg.NodeCSAKey != "" {
event.SetExtension(nodeCSAKeyExtension, d.cfg.NodeCSAKey)
}

eventPb, err := chipingress.EventToProto(event)
if err != nil {
Expand Down
75 changes: 75 additions & 0 deletions pkg/durableemitter/durable_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,3 +1290,78 @@ func (m *MemDurableEventStore) ObserveDurableQueue(_ context.Context, eventTTL,
st.OldestPendingAge = now.Sub(oldest)
return st, nil
}

// capturingBatchEmitter records every CloudEvent handed to QueueMessage so tests
// can assert on the emitted event's attributes/extensions.
type capturingBatchEmitter struct {
mu sync.Mutex
events []*chipingress.CloudEventPb
}

func (b *capturingBatchEmitter) QueueMessage(event *chipingress.CloudEventPb, cb func(error)) error {
b.mu.Lock()
b.events = append(b.events, event)
b.mu.Unlock()
if cb != nil {
cb(nil)
}
return nil
}

func (b *capturingBatchEmitter) Start(_ context.Context) {}
func (b *capturingBatchEmitter) Stop() {}

func (b *capturingBatchEmitter) last() *chipingress.CloudEventPb {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.events) == 0 {
return nil
}
return b.events[len(b.events)-1]
}

func TestDurableEmitter_StampsNodeCSAKey(t *testing.T) {
be := &capturingBatchEmitter{}

cfg := DefaultConfig()
cfg.DisablePruning = true
cfg.InsertBatchSize = 0 // inline insert for a deterministic single event
cfg.MarkBatchSize = 0
cfg.NodeCSAKey = "deadbeef"

em := newTestDurableEmitter(t, NewMemDurableEventStore(), be, &cfg)
require.NoError(t, em.Start(t.Context()))
t.Cleanup(func() { _ = em.Close() })

require.NoError(t, em.Emit(t.Context(), []byte("body"), testEmitAttrs()...))

require.Eventually(t, func() bool { return be.last() != nil }, 2*time.Second, 10*time.Millisecond)

ev := be.last()
require.NotNil(t, ev)
attr := ev.GetAttributes()[nodeCSAKeyExtension]
require.NotNil(t, attr, "nodecsakey extension should be present on durable events")
assert.Equal(t, "deadbeef", attr.GetCeString())
}

func TestDurableEmitter_OmitsNodeCSAKeyWhenUnset(t *testing.T) {
be := &capturingBatchEmitter{}

cfg := DefaultConfig()
cfg.DisablePruning = true
cfg.InsertBatchSize = 0
cfg.MarkBatchSize = 0
// NodeCSAKey intentionally left empty.

em := newTestDurableEmitter(t, NewMemDurableEventStore(), be, &cfg)
require.NoError(t, em.Start(t.Context()))
t.Cleanup(func() { _ = em.Close() })

require.NoError(t, em.Emit(t.Context(), []byte("body"), testEmitAttrs()...))
require.Eventually(t, func() bool { return be.last() != nil }, 2*time.Second, 10*time.Millisecond)

ev := be.last()
require.NotNil(t, ev)
_, ok := ev.GetAttributes()[nodeCSAKeyExtension]
assert.False(t, ok, "nodecsakey extension must be absent when NodeCSAKey is unset")
}
4 changes: 4 additions & 0 deletions pkg/durableemitter/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func Setup(
emitterCfg = *cfg.EmitterConfig
}

if emitterCfg.NodeCSAKey == "" {
emitterCfg.NodeCSAKey = cfg.Auth.AuthPublicKeyHex
}

emitter, err := NewDurableEmitter(store, batchClient, fallbackClient, cfg.RetransmitEnabled, emitterCfg, lggr, cfg.Meter)
if err != nil {
batchClient.Stop()
Expand Down
44 changes: 44 additions & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ const (
envTelemetryPrometheusBridgeEnabled = "CL_TELEMETRY_PROMETHEUS_BRIDGE_ENABLED"
envTelemetryPrometheusBridgePrefixes = "CL_TELEMETRY_PROMETHEUS_BRIDGE_PREFIXES"
envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR"
envMeterRecordsEnabled = "CL_METER_RECORDS_ENABLED"
envMeterSnapshotsEnabled = "CL_METER_SNAPSHOTS_ENABLED"
envMeteringProduct = "CL_METERING_PRODUCT"
envMeteringEnvironment = "CL_METERING_ENVIRONMENT"
envMeteringZone = "CL_METERING_ZONE"
envMeteringNodeID = "CL_METERING_NODE_ID"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to reader, DonID isn't here b/c a node can belong to many DONs; it is the responsibility of the loop to properly receive its DonID from Initialise in the standard cap interface (or for non-CRE use cases, elsewhere)

envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
Expand Down Expand Up @@ -171,6 +177,24 @@ type EnvConfig struct {
TelemetryPrometheusBridgeEnabled bool
TelemetryPrometheusBridgePrefixes []string
TelemetryLogCompressor string
MeterRecordsEnabled bool
MeterSnapshotsEnabled bool

// MeteringProduct / MeteringEnvironment / MeteringZone / MeteringNodeID are
// the static deployment+node identity dimensions used as coarse
// metering/billing rollup dimensions. They are resolved once from node
// config by the host and delivered to every LOOP plugin over the env, the
// same channel as the metering toggles above (rather than the
// standard-capabilities boundary). Any may be empty if the host did not
// provide it.
//
// MeteringNodeID is the node's logical name (e.g. "clp-cre-wf-zone-a-1"),
// not the CSA public key; the CSA key rides emitted events separately as the
// node_csa_key attribute.
MeteringProduct string
MeteringEnvironment string
MeteringZone string
MeteringNodeID string

TracingEnabled bool
TracingCollectorTarget string
Expand Down Expand Up @@ -264,6 +288,12 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryPrometheusBridgeEnabled, strconv.FormatBool(e.TelemetryPrometheusBridgeEnabled))
add(envTelemetryPrometheusBridgePrefixes, strings.Join(e.TelemetryPrometheusBridgePrefixes, ","))
add(envTelemetryLogCompressor, e.TelemetryLogCompressor)
add(envMeterRecordsEnabled, strconv.FormatBool(e.MeterRecordsEnabled))
add(envMeterSnapshotsEnabled, strconv.FormatBool(e.MeterSnapshotsEnabled))
add(envMeteringProduct, e.MeteringProduct)
add(envMeteringEnvironment, e.MeteringEnvironment)
add(envMeteringZone, e.MeteringZone)
add(envMeteringNodeID, e.MeteringNodeID)

add(envChipIngressEndpoint, e.ChipIngressEndpoint)
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
Expand Down Expand Up @@ -518,6 +548,20 @@ func (e *EnvConfig) parse() error {
e.CRESettings = os.Getenv(envCRESettings)
e.CRESettingsDefault = os.Getenv(envCRESettingsDefault)

e.MeterRecordsEnabled, err = getBool(envMeterRecordsEnabled)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envMeterRecordsEnabled, err)
}
e.MeterSnapshotsEnabled, err = getBool(envMeterSnapshotsEnabled)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envMeterSnapshotsEnabled, err)
}

e.MeteringProduct = os.Getenv(envMeteringProduct)
e.MeteringEnvironment = os.Getenv(envMeteringEnvironment)
e.MeteringZone = os.Getenv(envMeteringZone)
e.MeteringNodeID = os.Getenv(envMeteringNodeID)

return nil
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/loop/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ func TestEnvConfig_parse(t *testing.T) {
envTelemetryLogStreamingEnabled: "false",
envTelemetryPrometheusBridgeEnabled: "true",
envTelemetryPrometheusBridgePrefixes: "foo,bar",
envMeterRecordsEnabled: "true",
envMeterSnapshotsEnabled: "false",
envMeteringProduct: "cre-mainline",
envMeteringEnvironment: "production",
envMeteringZone: "wf-zone-a",
envMeteringNodeID: "csa-pubkey-1",

envChipIngressEndpoint: "chip-ingress.example.com:50051",
envChipIngressInsecureConnection: "true",
Expand Down Expand Up @@ -199,6 +205,12 @@ var envCfgFull = EnvConfig{
TelemetryLogStreamingEnabled: false,
TelemetryPrometheusBridgeEnabled: true,
TelemetryPrometheusBridgePrefixes: []string{"foo", "bar"},
MeterRecordsEnabled: true,
MeterSnapshotsEnabled: false,
MeteringProduct: "cre-mainline",
MeteringEnvironment: "production",
MeteringZone: "wf-zone-a",
MeteringNodeID: "csa-pubkey-1",

ChipIngressEndpoint: "chip-ingress.example.com:50051",
ChipIngressInsecureConnection: true,
Expand Down Expand Up @@ -264,6 +276,12 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) {
assert.Equal(t, "false", got[envTelemetryLogStreamingEnabled])
assert.Equal(t, "true", got[envTelemetryPrometheusBridgeEnabled])
assert.Equal(t, "foo,bar", got[envTelemetryPrometheusBridgePrefixes])
assert.Equal(t, "true", got[envMeterRecordsEnabled])
assert.Equal(t, "false", got[envMeterSnapshotsEnabled])
assert.Equal(t, "cre-mainline", got[envMeteringProduct])
assert.Equal(t, "production", got[envMeteringEnvironment])
assert.Equal(t, "wf-zone-a", got[envMeteringZone])
assert.Equal(t, "csa-pubkey-1", got[envMeteringNodeID])

// Assert ChipIngress environment variables
assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint])
Expand Down
47 changes: 47 additions & 0 deletions pkg/resourcemanager/idempotency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package resourcemanager

import (
"math/big"
"strconv"

meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go"
)

// UtilizationFields identifies one billed utilization dimension.
type UtilizationFields struct {
ResourceType string
ResourceID string
EventID string
OrgID string
}

// NewUtilization builds a Utilization with int64 quantity encoded as a decimal
// string value.
func NewUtilization(value int64, fields UtilizationFields) *meteringpb.Utilization {
return NewUtilizationString(strconv.FormatInt(value, 10), fields)
}

// NewUtilizationString builds a Utilization from a pre-formatted numeric string
// value.
func NewUtilizationString(value string, fields UtilizationFields) *meteringpb.Utilization {
return &meteringpb.Utilization{
Value: value,
ResourceType: fields.ResourceType,
ResourceId: fields.ResourceID,
EventId: fields.EventID,
OrgId: fields.OrgID,
}
}

// NewUtilizationBig builds a Utilization from an arbitrary-precision integer.
func NewUtilizationBig(value *big.Int, fields UtilizationFields) *meteringpb.Utilization {
if value == nil {
return NewUtilizationString("0", fields)
}
return NewUtilizationString(value.String(), fields)
}

// NewUtilizationFloat builds a Utilization from a floating-point value.
func NewUtilizationFloat(value float64, fields UtilizationFields) *meteringpb.Utilization {
return NewUtilizationString(strconv.FormatFloat(value, 'g', -1, 64), fields)
}
15 changes: 15 additions & 0 deletions pkg/resourcemanager/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package resourcemanager

import "strings"

// OwnerLabel returns the canonical form of the "owner" label, used by all
// producers: a leading "0x"/"0X" prefix is stripped and the remainder is
// lowercased. Downstream joins on the owner label depend on every producer
// emitting exactly this form, so producers must not write the owner label
// any other way.
func OwnerLabel(owner string) string {
if strings.HasPrefix(owner, "0x") || strings.HasPrefix(owner, "0X") {
owner = owner[2:]
}
return strings.ToLower(owner)
}
30 changes: 30 additions & 0 deletions pkg/resourcemanager/labels_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package resourcemanager

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestOwnerLabel(t *testing.T) {
tests := []struct {
name string
owner string
want string
}{
{name: "lowercase 0x prefix stripped", owner: "0xabc123def", want: "abc123def"},
{name: "uppercase 0X prefix stripped", owner: "0Xabc123def", want: "abc123def"},
{name: "no prefix unchanged", owner: "abc123def", want: "abc123def"},
{name: "mixed case lowercased", owner: "0xAbC123dEF", want: "abc123def"},
{name: "mixed case without prefix lowercased", owner: "AbC123dEF", want: "abc123def"},
{name: "empty string", owner: "", want: ""},
{name: "prefix only", owner: "0x", want: ""},
{name: "only one prefix stripped", owner: "0x0Xabc", want: "0xabc"},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, OwnerLabel(tt.owner))
})
}
}
Loading
Loading