diff --git a/go.mod b/go.mod index cb3990e1e0..27f1711e04 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260622152157-c8e129347b8b github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b + github.com/smartcontractkit/chainlink-protos/metering/go v0.0.0-20260626162307-b56f9af1d196 github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9 diff --git a/go.sum b/go.sum index f7e3962cf2..8d6b2809f4 100644 --- a/go.sum +++ b/go.sum @@ -266,6 +266,8 @@ github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260622152157-c8e129 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260622152157-c8e129347b8b/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= +github.com/smartcontractkit/chainlink-protos/metering/go v0.0.0-20260626162307-b56f9af1d196 h1:5SZtASpkbCyK7xtxqiUNwLxYEt7rXE6mkjZKecve2yY= +github.com/smartcontractkit/chainlink-protos/metering/go v0.0.0-20260626162307-b56f9af1d196/go.mod h1:z7lx7wI3XZ4u9kmUtAVdwn1BCC9T8aieWSDcuDgPTdQ= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b/go.mod h1:dkR2uYg9XYJuT1JASkPzWE51jjFkVb86P7a/yXe5/GM= github.com/smartcontractkit/chainlink-protos/storage-service v0.3.0 h1:B7itmjy+CMJ26elVw/cAJqqhBQ3Xa/mBYWK0/rQ5MuI= diff --git a/pkg/durableemitter/durable_emitter.go b/pkg/durableemitter/durable_emitter.go index 5fa68bd62d..f90888b3b9 100644 --- a/pkg/durableemitter/durable_emitter.go +++ b/pkg/durableemitter/durable_emitter.go @@ -19,6 +19,12 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" ) +// nodeCSAKeyExtension is the CloudEvent extension name carrying the node's CSA +// public key on durable events. CloudEvents extension names must be lowercase +// alphanumeric, so this is the durable-path equivalent of beholder's +// node_csa_key attribute. +const nodeCSAKeyExtension = "nodecsakey" + // BatchEmitter is the transport interface DurableEmitter delegates to for // batched delivery of CloudEvents to Chip Ingress. // @@ -45,6 +51,10 @@ type BatchEmitter interface { // Config configures the DurableEmitter behaviour. type Config struct { + // NodeCSAKey is the node's CSA public key (hex). When non-empty it is + // stamped as the node_csa_key attribute on every emitted event, providing a + // durable node identity for downstream billing/lookup + NodeCSAKey string // RetransmitInterval controls how often the retransmit loop ticks. RetransmitInterval time.Duration // RetransmitAfter is the minimum age of an event before the retransmit @@ -376,6 +386,11 @@ func (d *DurableEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) } event.SetExtension("emitter", "DurableEmitter") + // Include the node's CSA public key so every durable event carries a + // durable node identity for downstream billing/lookup + if d.cfg.NodeCSAKey != "" { + event.SetExtension(nodeCSAKeyExtension, d.cfg.NodeCSAKey) + } eventPb, err := chipingress.EventToProto(event) if err != nil { diff --git a/pkg/durableemitter/durable_emitter_test.go b/pkg/durableemitter/durable_emitter_test.go index d47f3961a0..bb13fceca4 100644 --- a/pkg/durableemitter/durable_emitter_test.go +++ b/pkg/durableemitter/durable_emitter_test.go @@ -1290,3 +1290,78 @@ func (m *MemDurableEventStore) ObserveDurableQueue(_ context.Context, eventTTL, st.OldestPendingAge = now.Sub(oldest) return st, nil } + +// capturingBatchEmitter records every CloudEvent handed to QueueMessage so tests +// can assert on the emitted event's attributes/extensions. +type capturingBatchEmitter struct { + mu sync.Mutex + events []*chipingress.CloudEventPb +} + +func (b *capturingBatchEmitter) QueueMessage(event *chipingress.CloudEventPb, cb func(error)) error { + b.mu.Lock() + b.events = append(b.events, event) + b.mu.Unlock() + if cb != nil { + cb(nil) + } + return nil +} + +func (b *capturingBatchEmitter) Start(_ context.Context) {} +func (b *capturingBatchEmitter) Stop() {} + +func (b *capturingBatchEmitter) last() *chipingress.CloudEventPb { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.events) == 0 { + return nil + } + return b.events[len(b.events)-1] +} + +func TestDurableEmitter_StampsNodeCSAKey(t *testing.T) { + be := &capturingBatchEmitter{} + + cfg := DefaultConfig() + cfg.DisablePruning = true + cfg.InsertBatchSize = 0 // inline insert for a deterministic single event + cfg.MarkBatchSize = 0 + cfg.NodeCSAKey = "deadbeef" + + em := newTestDurableEmitter(t, NewMemDurableEventStore(), be, &cfg) + require.NoError(t, em.Start(t.Context())) + t.Cleanup(func() { _ = em.Close() }) + + require.NoError(t, em.Emit(t.Context(), []byte("body"), testEmitAttrs()...)) + + require.Eventually(t, func() bool { return be.last() != nil }, 2*time.Second, 10*time.Millisecond) + + ev := be.last() + require.NotNil(t, ev) + attr := ev.GetAttributes()[nodeCSAKeyExtension] + require.NotNil(t, attr, "nodecsakey extension should be present on durable events") + assert.Equal(t, "deadbeef", attr.GetCeString()) +} + +func TestDurableEmitter_OmitsNodeCSAKeyWhenUnset(t *testing.T) { + be := &capturingBatchEmitter{} + + cfg := DefaultConfig() + cfg.DisablePruning = true + cfg.InsertBatchSize = 0 + cfg.MarkBatchSize = 0 + // NodeCSAKey intentionally left empty. + + em := newTestDurableEmitter(t, NewMemDurableEventStore(), be, &cfg) + require.NoError(t, em.Start(t.Context())) + t.Cleanup(func() { _ = em.Close() }) + + require.NoError(t, em.Emit(t.Context(), []byte("body"), testEmitAttrs()...)) + require.Eventually(t, func() bool { return be.last() != nil }, 2*time.Second, 10*time.Millisecond) + + ev := be.last() + require.NotNil(t, ev) + _, ok := ev.GetAttributes()[nodeCSAKeyExtension] + assert.False(t, ok, "nodecsakey extension must be absent when NodeCSAKey is unset") +} diff --git a/pkg/durableemitter/setup.go b/pkg/durableemitter/setup.go index 5a8c5a8027..2d0ca7b3e8 100644 --- a/pkg/durableemitter/setup.go +++ b/pkg/durableemitter/setup.go @@ -128,6 +128,10 @@ func Setup( emitterCfg = *cfg.EmitterConfig } + if emitterCfg.NodeCSAKey == "" { + emitterCfg.NodeCSAKey = cfg.Auth.AuthPublicKeyHex + } + emitter, err := NewDurableEmitter(store, batchClient, fallbackClient, cfg.RetransmitEnabled, emitterCfg, lggr, cfg.Meter) if err != nil { batchClient.Stop() diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 396c7d979d..a38d6921a5 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -85,6 +85,12 @@ const ( envTelemetryPrometheusBridgeEnabled = "CL_TELEMETRY_PROMETHEUS_BRIDGE_ENABLED" envTelemetryPrometheusBridgePrefixes = "CL_TELEMETRY_PROMETHEUS_BRIDGE_PREFIXES" envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" + envMeterRecordsEnabled = "CL_METER_RECORDS_ENABLED" + envMeterSnapshotsEnabled = "CL_METER_SNAPSHOTS_ENABLED" + envMeteringProduct = "CL_METERING_PRODUCT" + envMeteringEnvironment = "CL_METERING_ENVIRONMENT" + envMeteringZone = "CL_METERING_ZONE" + envMeteringNodeID = "CL_METERING_NODE_ID" envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" @@ -171,6 +177,24 @@ type EnvConfig struct { TelemetryPrometheusBridgeEnabled bool TelemetryPrometheusBridgePrefixes []string TelemetryLogCompressor string + MeterRecordsEnabled bool + MeterSnapshotsEnabled bool + + // MeteringProduct / MeteringEnvironment / MeteringZone / MeteringNodeID are + // the static deployment+node identity dimensions used as coarse + // metering/billing rollup dimensions. They are resolved once from node + // config by the host and delivered to every LOOP plugin over the env, the + // same channel as the metering toggles above (rather than the + // standard-capabilities boundary). Any may be empty if the host did not + // provide it. + // + // MeteringNodeID is the node's logical name (e.g. "clp-cre-wf-zone-a-1"), + // not the CSA public key; the CSA key rides emitted events separately as the + // node_csa_key attribute. + MeteringProduct string + MeteringEnvironment string + MeteringZone string + MeteringNodeID string TracingEnabled bool TracingCollectorTarget string @@ -264,6 +288,12 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envTelemetryPrometheusBridgeEnabled, strconv.FormatBool(e.TelemetryPrometheusBridgeEnabled)) add(envTelemetryPrometheusBridgePrefixes, strings.Join(e.TelemetryPrometheusBridgePrefixes, ",")) add(envTelemetryLogCompressor, e.TelemetryLogCompressor) + add(envMeterRecordsEnabled, strconv.FormatBool(e.MeterRecordsEnabled)) + add(envMeterSnapshotsEnabled, strconv.FormatBool(e.MeterSnapshotsEnabled)) + add(envMeteringProduct, e.MeteringProduct) + add(envMeteringEnvironment, e.MeteringEnvironment) + add(envMeteringZone, e.MeteringZone) + add(envMeteringNodeID, e.MeteringNodeID) add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) @@ -518,6 +548,20 @@ func (e *EnvConfig) parse() error { e.CRESettings = os.Getenv(envCRESettings) e.CRESettingsDefault = os.Getenv(envCRESettingsDefault) + e.MeterRecordsEnabled, err = getBool(envMeterRecordsEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envMeterRecordsEnabled, err) + } + e.MeterSnapshotsEnabled, err = getBool(envMeterSnapshotsEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envMeterSnapshotsEnabled, err) + } + + e.MeteringProduct = os.Getenv(envMeteringProduct) + e.MeteringEnvironment = os.Getenv(envMeteringEnvironment) + e.MeteringZone = os.Getenv(envMeteringZone) + e.MeteringNodeID = os.Getenv(envMeteringNodeID) + return nil } diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index d592543fd4..a878ce8114 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -85,6 +85,12 @@ func TestEnvConfig_parse(t *testing.T) { envTelemetryLogStreamingEnabled: "false", envTelemetryPrometheusBridgeEnabled: "true", envTelemetryPrometheusBridgePrefixes: "foo,bar", + envMeterRecordsEnabled: "true", + envMeterSnapshotsEnabled: "false", + envMeteringProduct: "cre-mainline", + envMeteringEnvironment: "production", + envMeteringZone: "wf-zone-a", + envMeteringNodeID: "csa-pubkey-1", envChipIngressEndpoint: "chip-ingress.example.com:50051", envChipIngressInsecureConnection: "true", @@ -199,6 +205,12 @@ var envCfgFull = EnvConfig{ TelemetryLogStreamingEnabled: false, TelemetryPrometheusBridgeEnabled: true, TelemetryPrometheusBridgePrefixes: []string{"foo", "bar"}, + MeterRecordsEnabled: true, + MeterSnapshotsEnabled: false, + MeteringProduct: "cre-mainline", + MeteringEnvironment: "production", + MeteringZone: "wf-zone-a", + MeteringNodeID: "csa-pubkey-1", ChipIngressEndpoint: "chip-ingress.example.com:50051", ChipIngressInsecureConnection: true, @@ -264,6 +276,12 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { assert.Equal(t, "false", got[envTelemetryLogStreamingEnabled]) assert.Equal(t, "true", got[envTelemetryPrometheusBridgeEnabled]) assert.Equal(t, "foo,bar", got[envTelemetryPrometheusBridgePrefixes]) + assert.Equal(t, "true", got[envMeterRecordsEnabled]) + assert.Equal(t, "false", got[envMeterSnapshotsEnabled]) + assert.Equal(t, "cre-mainline", got[envMeteringProduct]) + assert.Equal(t, "production", got[envMeteringEnvironment]) + assert.Equal(t, "wf-zone-a", got[envMeteringZone]) + assert.Equal(t, "csa-pubkey-1", got[envMeteringNodeID]) // Assert ChipIngress environment variables assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint]) diff --git a/pkg/resourcemanager/idempotency.go b/pkg/resourcemanager/idempotency.go new file mode 100644 index 0000000000..e01e2938d3 --- /dev/null +++ b/pkg/resourcemanager/idempotency.go @@ -0,0 +1,47 @@ +package resourcemanager + +import ( + "math/big" + "strconv" + + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" +) + +// UtilizationFields identifies one billed utilization dimension. +type UtilizationFields struct { + ResourceType string + ResourceID string + EventID string + OrgID string +} + +// NewUtilization builds a Utilization with int64 quantity encoded as a decimal +// string value. +func NewUtilization(value int64, fields UtilizationFields) *meteringpb.Utilization { + return NewUtilizationString(strconv.FormatInt(value, 10), fields) +} + +// NewUtilizationString builds a Utilization from a pre-formatted numeric string +// value. +func NewUtilizationString(value string, fields UtilizationFields) *meteringpb.Utilization { + return &meteringpb.Utilization{ + Value: value, + ResourceType: fields.ResourceType, + ResourceId: fields.ResourceID, + EventId: fields.EventID, + OrgId: fields.OrgID, + } +} + +// NewUtilizationBig builds a Utilization from an arbitrary-precision integer. +func NewUtilizationBig(value *big.Int, fields UtilizationFields) *meteringpb.Utilization { + if value == nil { + return NewUtilizationString("0", fields) + } + return NewUtilizationString(value.String(), fields) +} + +// NewUtilizationFloat builds a Utilization from a floating-point value. +func NewUtilizationFloat(value float64, fields UtilizationFields) *meteringpb.Utilization { + return NewUtilizationString(strconv.FormatFloat(value, 'g', -1, 64), fields) +} diff --git a/pkg/resourcemanager/labels.go b/pkg/resourcemanager/labels.go new file mode 100644 index 0000000000..bb8b921cd3 --- /dev/null +++ b/pkg/resourcemanager/labels.go @@ -0,0 +1,15 @@ +package resourcemanager + +import "strings" + +// OwnerLabel returns the canonical form of the "owner" label, used by all +// producers: a leading "0x"/"0X" prefix is stripped and the remainder is +// lowercased. Downstream joins on the owner label depend on every producer +// emitting exactly this form, so producers must not write the owner label +// any other way. +func OwnerLabel(owner string) string { + if strings.HasPrefix(owner, "0x") || strings.HasPrefix(owner, "0X") { + owner = owner[2:] + } + return strings.ToLower(owner) +} diff --git a/pkg/resourcemanager/labels_test.go b/pkg/resourcemanager/labels_test.go new file mode 100644 index 0000000000..5581250493 --- /dev/null +++ b/pkg/resourcemanager/labels_test.go @@ -0,0 +1,30 @@ +package resourcemanager + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOwnerLabel(t *testing.T) { + tests := []struct { + name string + owner string + want string + }{ + {name: "lowercase 0x prefix stripped", owner: "0xabc123def", want: "abc123def"}, + {name: "uppercase 0X prefix stripped", owner: "0Xabc123def", want: "abc123def"}, + {name: "no prefix unchanged", owner: "abc123def", want: "abc123def"}, + {name: "mixed case lowercased", owner: "0xAbC123dEF", want: "abc123def"}, + {name: "mixed case without prefix lowercased", owner: "AbC123dEF", want: "abc123def"}, + {name: "empty string", owner: "", want: ""}, + {name: "prefix only", owner: "0x", want: ""}, + {name: "only one prefix stripped", owner: "0x0Xabc", want: "0xabc"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, OwnerLabel(tt.owner)) + }) + } +} diff --git a/pkg/resourcemanager/meterable.go b/pkg/resourcemanager/meterable.go new file mode 100644 index 0000000000..e6937daca4 --- /dev/null +++ b/pkg/resourcemanager/meterable.go @@ -0,0 +1,124 @@ +package resourcemanager + +import ( + "context" + + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" + "google.golang.org/protobuf/proto" +) + +// Meterable is implemented by producers that manage durable billable +// resources (trigger registrations, workflow specs, log filters). A producer +// registers itself with a ResourceManager (see ResourceManager.Register) so it +// is polled once per snapshot tick for the absolute state of its currently +// active resources, in addition to emitting lifecycle edges inline via +// EmitMeterRecord. +type Meterable interface { + // ResourceIdentity returns the producer's base identity: the six coarse + // dimensions (product, environment, zone, don_id, node_id, service) plus + // the service-level resource_pool / resource_pool_id. Per-resource billing + // fields (resource_type/resource_id/org_id/event_id/value) are carried by + // Utilizations on MeterRecord and MeterSnapshot. + ResourceIdentity() ResourceIdentity + + // GetUtilization returns the current level of the producer's currently + // active resources, one SnapshotEntry per resource. The manager emits one + // MeterSnapshot per entry. + // + // It is called on the snapshot tick and MUST be a cheap, non-blocking + // read-snapshot of in-memory state: no network, no disk, no lock held + // across I/O. It must tolerate ctx cancellation (returning promptly, and + // nil/empty is acceptable) and tolerate concurrent registration of new + // resources. An empty or nil return is valid and means nothing is currently + // active: no snapshots are emitted, and billing zeroes the resource out by + // its absence from subsequent snapshots. + GetUtilization(ctx context.Context) []SnapshotEntry +} + +// SnapshotEntry is the current level of one active resource at a snapshot tick. +// Identity is the base resource identity, and Utilizations carries one or more +// billed dimensions for that resource (resource_type/resource_id/org_id/event_id/value). +type SnapshotEntry struct { + Identity ResourceIdentity + Utilizations []*meteringpb.Utilization +} + +// DeploymentIdentity carries the static deployment + node identity dimensions +// that are fixed for a LOOP plugin process. They are resolved once from node +// config by the host and delivered to every LOOP plugin over the environment +// (loop.EnvConfig), not the standard-capabilities boundary, so any LOOP plugin +// can populate the coarse metering rollup dimensions. Any field may be empty if +// the host did not provide it. +type DeploymentIdentity struct { + // Product is the deployment product, e.g. "cre-mainline". + Product string + // Environment is the deployment environment, e.g. "production", "staging". + Environment string + // Zone is the deployment zone, e.g. "wf-zone-a". + Zone string + // NodeID is the node's logical name, e.g. "clp-cre-wf-zone-a-1". It is NOT + // the CSA public key; it is a stable name the billing service can use to + // look up the node's CSA key in the workflow registry. The CSA key itself is + // carried separately as the node_csa_key event attribute. + NodeID string +} + +// ResourceIdentity is the structured, first-class identity of a durable +// resource. Its fields map 1:1 to metering.v1.ResourceIdentity so every +// emitted record carries each dimension as a discrete column rather than a +// parsed dotted string or out-of-band telemetry attribute. +type ResourceIdentity struct { + // Product is the deployment product, e.g. "cre-mainline". A coarse + // billing-rollup dimension. + Product string + + // Environment is the deployment environment, e.g. "production", + // "staging". A coarse billing-rollup dimension. + Environment string + + // Zone is the deployment zone, e.g. "wf-zone-a". A coarse billing-rollup + // dimension. + Zone string + + // DONID is the DON identifier the emitting service belongs to. A coarse + // billing-rollup dimension; used with NodeID to count distinct nodes for + // quorum. + DONID string + + // NodeID is the node identifier (the node's CSA public key). A coarse + // billing-rollup dimension; lets billing dedup a node's retries and count + // distinct nodes. + NodeID string + + // Service is the stable service constant identifying the emitting service, + // e.g. "cron-trigger", "http-trigger", "evm-log-trigger", + // "workflow-syncer-v2". A coarse billing-rollup dimension. It must not + // encode deployment environment or zone. + Service string + + // ResourcePool is the service-level resource pool the record applies to, + // e.g. "trigger_registrations", "log_filters", "workflow_storage". + ResourcePool string + + // ResourcePoolID optionally scopes identity further within the resource pool. + ResourcePoolID string +} + +// toProto converts r to its wire form. Field order mirrors the proto. +func (r ResourceIdentity) toProto() *meteringpb.ResourceIdentity { + pb := &meteringpb.ResourceIdentity{ + Product: r.Product, + Environment: r.Environment, + Zone: r.Zone, + Service: r.Service, + ResourcePool: r.ResourcePool, + ResourcePoolId: r.ResourcePoolID, + } + if r.DONID != "" { + pb.DonId = proto.String(r.DONID) + } + if r.NodeID != "" { + pb.NodeId = proto.String(r.NodeID) + } + return pb +} diff --git a/pkg/resourcemanager/resourcemanager.go b/pkg/resourcemanager/resourcemanager.go new file mode 100644 index 0000000000..670d035bd2 --- /dev/null +++ b/pkg/resourcemanager/resourcemanager.go @@ -0,0 +1,459 @@ +// Package resourcemanager emits metering.v1 events for durable billable +// resources such as trigger registrations, workflow specs, and log filters. +// +// It emits two kinds of records, each covering exactly one resource identified +// by its ResourceIdentity: +// +// - MeterRecord lifecycle edges, inline, via EmitMeterRecord at the point a +// resource is reserved, released, or its utilization changes; and +// - MeterSnapshot records, on a timer, one per resource a registered +// Meterable reports as currently active. Snapshots are the +// liveness/utilization-over-time signal that pure RESERVE/RELEASE cannot +// provide (a node panic would otherwise leak a reservation forever). +// +// The ResourceManager is the single owner of the snapshot tick: each producer +// starts the manager as a sub-service and only Registers itself; producers +// never run their own snapshot loop. +// +// Emission is fail-open by design: EmitMeterRecord and the snapshot loop +// return no error, and a metering failure must never gate, delay, or retry the +// resource operation being metered. Failures surface via error-level logs and +// the resource_manager_*_failure_total counters; billing correctness is +// recovered downstream through idempotency keys and reconciliation. +package resourcemanager + +import ( + "context" + "strconv" + "sync" + "time" + + "github.com/jonboulle/clockwork" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" +) + +// Beholder routing attributes. Each entity value is the contract shared with +// the CHiP schema registration and the consumer topic name; all must match +// exactly. +const ( + beholderDomain = "cll-meter" + beholderEntity = "metering.v1.MeterRecord" + beholderDataSchema = "metering.v1.meter_record" + beholderSnapshotEntity = "metering.v1.MeterSnapshot" + beholderSnapshotDataSchema = "metering.v1.meter_snapshot" +) + +// Counter names are a dashboard contract; do not rename. +// +// Success means the record was accepted for asynchronous export (enqueued +// with the OTel batch processor), not that it was delivered to Kafka; +// delivery failures past this point are invisible to these counters. Emit is +// non-blocking only while the batch processors are enabled, which is the +// default. +const ( + emitSuccessCounterName = "resource_manager_emit_success_total" + emitFailureCounterName = "resource_manager_emit_failure_total" + snapshotEmitSuccessCounterName = "resource_manager_snapshot_emit_success_total" + snapshotEmitFailureCounterName = "resource_manager_snapshot_emit_failure_total" +) + +// utilizationGaugeName is the per-resource utilization gauge. Unlike the +// low-cardinality emit counters, it is labeled with EVERY ResourceIdentity +// dimension (including node_id and resource_id) so dashboards can break +// utilization down by any dimension. That is high cardinality by construction; +// it is the intended, requested behavior. +const utilizationGaugeName = "resource_manager_utilization" + +// DefaultSnapshotInterval is the recommended snapshot period. It is NOT +// applied implicitly: a zero ResourceManagerConfig.SnapshotInterval disables +// snapshots. Callers that want snapshots pass this (or their own value) +// explicitly. +const DefaultSnapshotInterval = 60 * time.Second + +// Emitter delivers an encoded metering message with its routing attributes. +// beholder.Emitter satisfies it, so production wiring is +// beholder.GetEmitter(); tests substitute a fake. +type Emitter interface { + Emit(ctx context.Context, body []byte, attrKVs ...any) error +} + +// ResourceManagerConfig configures a ResourceManager. +type ResourceManagerConfig struct { + // MeterRecordsEnabled is the meter-record rollout gate. When false (the + // default), EmitMeterRecord is a no-op. + MeterRecordsEnabled bool + + // MeterSnapshotsEnabled gates snapshot emission. Snapshots are only emitted + // when this is true AND MeterRecordsEnabled is true. + MeterSnapshotsEnabled bool + + // Emitter delivers encoded records, typically beholder.GetEmitter(). A nil + // Emitter makes EmitMeterRecord a no-op even when Enabled is true and keeps + // the snapshot loop from starting. + Emitter Emitter + + // SnapshotInterval is the period between snapshots. Zero (the default) + // DISABLES the snapshot loop; the manager still starts as a service and + // EmitMeterRecord still works. Callers that want snapshots set a positive + // value, e.g. DefaultSnapshotInterval. The default is not substituted for + // zero — zero means off. + SnapshotInterval time.Duration + + // Clock drives snapshot tick timing and record timestamps. Nil selects the + // real clock. + Clock clockwork.Clock +} + +// registration is one registered Meterable. It is a pointer-identified handle +// so Register can return an idempotent unregister closure. +type registration struct { + m Meterable +} + +// ResourceManager emits MeterRecords and periodic MeterSnapshots for durable +// resources. It is a services.Service: callers start it (typically as a +// sub-service of the producer) and Register Meterables to be snapshotted. It +// is safe for concurrent use. +type ResourceManager struct { + services.Service + srvcEng *services.Engine + + lggr logger.SugaredLogger + meterRecordsEnabled bool + meterSnapshotsEnabled bool + emitter Emitter + snapshotInterval time.Duration + clock clockwork.Clock + + mu sync.RWMutex + registrations map[*registration]struct{} + + emitSuccess metric.Int64Counter + emitFailure metric.Int64Counter + snapshotEmitSuccess metric.Int64Counter + snapshotEmitFailure metric.Int64Counter + utilization metric.Int64Gauge +} + +// NewResourceManager returns a ResourceManager. A failure to create a metric +// instrument is logged and that instrument is skipped; it never prevents +// construction. The manager must be Started before its snapshot loop runs; +// EmitMeterRecord works regardless of Start. +func NewResourceManager(lggr logger.Logger, cfg ResourceManagerConfig) *ResourceManager { + meter := beholder.GetMeter() + sugared := logger.Sugared(lggr) + newCounter := func(name string) metric.Int64Counter { + c, err := meter.Int64Counter(name) + if err != nil { + sugared.Errorw("failed to create metering counter", "counter", name, "err", err) + return nil + } + return c + } + gauge, err := meter.Int64Gauge(utilizationGaugeName) + if err != nil { + sugared.Errorw("failed to create metering gauge", "gauge", utilizationGaugeName, "err", err) + gauge = nil + } + + clock := cfg.Clock + if clock == nil { + clock = clockwork.NewRealClock() + } + + meterSnapshotsEnabled := cfg.MeterRecordsEnabled && cfg.MeterSnapshotsEnabled && cfg.SnapshotInterval > 0 + if cfg.MeterSnapshotsEnabled && !cfg.MeterRecordsEnabled { + sugared.Warn("MeterSnapshotsEnabled ignored because MeterRecordsEnabled is false") + } + + rm := &ResourceManager{ + meterRecordsEnabled: cfg.MeterRecordsEnabled, + meterSnapshotsEnabled: meterSnapshotsEnabled, + emitter: cfg.Emitter, + snapshotInterval: cfg.SnapshotInterval, + clock: clock, + registrations: make(map[*registration]struct{}), + emitSuccess: newCounter(emitSuccessCounterName), + emitFailure: newCounter(emitFailureCounterName), + snapshotEmitSuccess: newCounter(snapshotEmitSuccessCounterName), + snapshotEmitFailure: newCounter(snapshotEmitFailureCounterName), + utilization: gauge, + } + rm.Service, rm.srvcEng = services.Config{ + Name: "ResourceManager", + Start: rm.start, + Close: rm.close, + }.NewServiceEngine(lggr) + rm.lggr = logger.Sugared(rm.srvcEng) + return rm +} + +// start launches the snapshot loop. The loop runs only when metering is +// enabled, an emitter is configured, and a positive SnapshotInterval is set; +// otherwise the service starts cleanly with snapshots disabled and +// EmitMeterRecord remains available. +func (rm *ResourceManager) start(_ context.Context) error { + if !rm.meterSnapshotsEnabled || rm.emitter == nil { + rm.lggr.Infow("snapshot loop disabled", + "meterRecordsEnabled", rm.meterRecordsEnabled, + "meterSnapshotsEnabled", rm.meterSnapshotsEnabled, + "hasEmitter", rm.emitter != nil, + "snapshotInterval", rm.snapshotInterval, + ) + return nil + } + rm.srvcEng.Go(func(ctx context.Context) { + ticker := rm.clock.NewTicker(rm.snapshotInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.Chan(): + rm.emitSnapshots(ctx) + } + } + }) + return nil +} + +func (rm *ResourceManager) close() error { return nil } + +// Register adds m to the snapshot registry and returns an idempotent function +// that removes it. Calling the returned function more than once is a no-op. +// The returned closure is safe for concurrent use. +func (rm *ResourceManager) Register(m Meterable) (unregister func()) { + reg := ®istration{m: m} + + rm.mu.Lock() + rm.registrations[reg] = struct{}{} + rm.mu.Unlock() + + var once sync.Once + return func() { + once.Do(func() { + rm.mu.Lock() + delete(rm.registrations, reg) + rm.mu.Unlock() + }) + } +} + +// emitSnapshots is the snapshot tick: it snapshots the registry under a read +// lock, then emits snapshots for every registered Meterable. +func (rm *ResourceManager) emitSnapshots(ctx context.Context) { + rm.mu.RLock() + ms := make([]Meterable, 0, len(rm.registrations)) + for reg := range rm.registrations { + ms = append(ms, reg.m) + } + rm.mu.RUnlock() + + for _, m := range ms { + rm.emitSnapshot(ctx, m) + } +} + +// emitSnapshot emits one MeterSnapshot per active resource reported by m. Each +// snapshot covers exactly one resource, fully identified by its +// ResourceIdentity. The interval bucket (timestamp truncated to the snapshot +// interval) keys the snapshot per interval. An empty utilization list emits +// nothing: billing zeroes a resource out by its absence from later snapshots. +// Fail-open: per-resource errors are logged and counted, never returned. +func (rm *ResourceManager) emitSnapshot(ctx context.Context, m Meterable) { + if !rm.meterSnapshotsEnabled || rm.emitter == nil { + return + } + + now := rm.clock.Now() + ts := timestamppb.New(now) + interval := durationpb.New(rm.snapshotInterval) + + for _, e := range m.GetUtilization(ctx) { + if len(e.Utilizations) == 0 { + continue + } + for _, u := range e.Utilizations { + rm.recordUtilization(ctx, e.Identity, u, meteringpb.MeterAction_METER_ACTION_UPDATE) + } + + snapshot := &meteringpb.MeterSnapshot{ + Timestamp: ts, + Identity: e.Identity.toProto(), + Utilization: e.Utilizations, + Interval: interval, + } + + body, err := proto.Marshal(snapshot) + if err != nil { + rm.lggr.Errorw("failed to marshal snapshot", + "service", e.Identity.Service, + "resourcePool", e.Identity.ResourcePool, + "resourcePoolID", e.Identity.ResourcePoolID, + "err", err, + ) + rm.countSnapshot(ctx, rm.snapshotEmitFailure, e.Identity, attribute.String("reason", "marshal")) + continue + } + + if err := rm.emitter.Emit(ctx, body, + beholder.AttrKeyDataSchema, beholderSnapshotDataSchema, + beholder.AttrKeyDomain, beholderDomain, + beholder.AttrKeyEntity, beholderSnapshotEntity, + ); err != nil { + rm.lggr.Errorw("failed to emit snapshot", + "service", e.Identity.Service, + "resourcePool", e.Identity.ResourcePool, + "resourcePoolID", e.Identity.ResourcePoolID, + "err", err, + ) + rm.countSnapshot(ctx, rm.snapshotEmitFailure, e.Identity, attribute.String("reason", "emit")) + continue + } + + rm.countSnapshot(ctx, rm.snapshotEmitSuccess, e.Identity) + } +} + +// EmitMeterRecord emits a metering.v1.MeterRecord, timestamped now, for action +// on the one resource described by identity. +// +// EmitMeterRecord is fail-open and returns no error: when the manager is +// disabled or has no emitter it does nothing, and marshal or emit failures are +// recorded only via error-level logs and the failure counter. Callers must +// never gate resource allocation or deallocation on emission. +func (rm *ResourceManager) EmitMeterRecord(ctx context.Context, identity ResourceIdentity, action meteringpb.MeterAction, utilizations []*meteringpb.Utilization) { + if !rm.meterRecordsEnabled { + rm.lggr.Debugw("metering disabled; meter record not emitted", + "service", identity.Service, + "resourcePool", identity.ResourcePool, + "action", action.String(), + ) + return + } + + for _, u := range utilizations { + rm.recordUtilization(ctx, identity, u, action) + } + + if rm.emitter == nil { + return + } + + record := &meteringpb.MeterRecord{ + Timestamp: timestamppb.New(rm.clock.Now()), + Identity: identity.toProto(), + Action: action, + Utilizations: utilizations, + } + + body, err := proto.Marshal(record) + if err != nil { + rm.lggr.Errorw("failed to marshal meter record", + "service", identity.Service, + "resourcePool", identity.ResourcePool, + "action", action.String(), + "err", err, + ) + rm.countRecord(ctx, rm.emitFailure, identity, action, attribute.String("reason", "marshal")) + return + } + + if err := rm.emitter.Emit(ctx, body, + beholder.AttrKeyDataSchema, beholderDataSchema, + beholder.AttrKeyDomain, beholderDomain, + beholder.AttrKeyEntity, beholderEntity, + ); err != nil { + rm.lggr.Errorw("failed to emit meter record", + "service", identity.Service, + "resourcePool", identity.ResourcePool, + "action", action.String(), + "err", err, + ) + rm.countRecord(ctx, rm.emitFailure, identity, action, attribute.String("reason", "emit")) + return + } + + rm.countRecord(ctx, rm.emitSuccess, identity, action) +} + +// recordUtilization records value to the per-resource utilization gauge, +// labeled with every ResourceIdentity dimension plus utilization identity. +func (rm *ResourceManager) recordUtilization(ctx context.Context, id ResourceIdentity, u *meteringpb.Utilization, action meteringpb.MeterAction) { + if rm.utilization == nil { + return + } + if u == nil { + return + } + + var value int64 + if action == meteringpb.MeterAction_METER_ACTION_RELEASE { + value = 0 + } else { + parsed, err := strconv.ParseInt(u.GetValue(), 10, 64) + if err != nil { + rm.lggr.Debugw("skipping utilization gauge record for non-int64 value", + "value", u.GetValue(), + "resourceType", u.GetResourceType(), + "resourceID", u.GetResourceId(), + "orgID", u.GetOrgId(), + "err", err, + ) + return + } + value = parsed + } + rm.utilization.Record(ctx, value, metric.WithAttributes( + attribute.String("product", id.Product), + attribute.String("environment", id.Environment), + attribute.String("zone", id.Zone), + attribute.String("don_id", id.DONID), + attribute.String("node_id", id.NodeID), + attribute.String("service", id.Service), + attribute.String("resource_pool", id.ResourcePool), + attribute.String("resource_pool_id", id.ResourcePoolID), + attribute.String("resource_type", u.GetResourceType()), + attribute.String("resource_id", u.GetResourceId()), + attribute.String("org_id", u.GetOrgId()), + )) +} + +// countRecord records a MeterRecord emit outcome. Labels are intentionally +// low-cardinality: service, resource, action (+ reason on failure). node_id +// and resource_id are deliberately excluded here to keep the time series +// bounded; the utilization gauge carries the full identity instead. +func (rm *ResourceManager) countRecord(ctx context.Context, c metric.Int64Counter, identity ResourceIdentity, action meteringpb.MeterAction, extra ...attribute.KeyValue) { + if c == nil { + return + } + attrs := append([]attribute.KeyValue{ + attribute.String("service", identity.Service), + attribute.String("resource_pool", identity.ResourcePool), + attribute.String("action", action.String()), + }, extra...) + c.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +// countSnapshot records a MeterSnapshot emit outcome. Labels are intentionally +// low-cardinality: service, resource (+ reason on failure). +func (rm *ResourceManager) countSnapshot(ctx context.Context, c metric.Int64Counter, identity ResourceIdentity, extra ...attribute.KeyValue) { + if c == nil { + return + } + attrs := append([]attribute.KeyValue{ + attribute.String("service", identity.Service), + attribute.String("resource_pool", identity.ResourcePool), + }, extra...) + c.Add(ctx, 1, metric.WithAttributes(attrs...)) +} diff --git a/pkg/resourcemanager/resourcemanager_test.go b/pkg/resourcemanager/resourcemanager_test.go new file mode 100644 index 0000000000..b0ab380be1 --- /dev/null +++ b/pkg/resourcemanager/resourcemanager_test.go @@ -0,0 +1,322 @@ +package resourcemanager + +import ( + "context" + "errors" + "math/big" + "sync" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/beholder/beholdertest" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" +) + +var testIdentity = ResourceIdentity{ + Product: "cre-mainline", + Environment: "production", + Zone: "wf-zone-a", + DONID: "don-1", + NodeID: "node-1", + Service: "cron-trigger", + ResourcePool: "trigger_registrations", + ResourcePoolID: "", +} + +type emitCall struct { + body []byte + attrKVs []any +} + +type fakeEmitter struct { + mu sync.Mutex + err error + calls []emitCall +} + +func (f *fakeEmitter) Emit(_ context.Context, body []byte, attrKVs ...any) error { + f.mu.Lock() + defer f.mu.Unlock() + f.calls = append(f.calls, emitCall{body: body, attrKVs: attrKVs}) + return f.err +} + +func (f *fakeEmitter) snapshot() []emitCall { + f.mu.Lock() + defer f.mu.Unlock() + return append([]emitCall(nil), f.calls...) +} + +type fakeMeterable struct { + identity ResourceIdentity + + mu sync.Mutex + entries []SnapshotEntry +} + +func (f *fakeMeterable) ResourceIdentity() ResourceIdentity { return f.identity } + +func (f *fakeMeterable) GetUtilization(_ context.Context) []SnapshotEntry { + f.mu.Lock() + defer f.mu.Unlock() + return append([]SnapshotEntry(nil), f.entries...) +} + +func (f *fakeMeterable) set(entries []SnapshotEntry) { + f.mu.Lock() + defer f.mu.Unlock() + f.entries = entries +} + +func attrMap(t *testing.T, attrKVs []any) map[string]any { + t.Helper() + require.Zero(t, len(attrKVs)%2) + m := make(map[string]any, len(attrKVs)/2) + for i := 0; i < len(attrKVs); i += 2 { + key, ok := attrKVs[i].(string) + require.True(t, ok) + m[key] = attrKVs[i+1] + } + return m +} + +func waitForSnapshotCount(t *testing.T, emitter *fakeEmitter, want int) { + t.Helper() + require.Eventually(t, func() bool { + return len(decodeSnapshots(t, emitter.snapshot())) == want + }, time.Second, time.Millisecond) +} + +func TestEmitMeterRecord_Gating(t *testing.T) { + tests := []struct { + name string + recordsEnabled bool + snapshotsEnabled bool + emitter *fakeEmitter + wantEmits int + }{ + {name: "disabled does not emit", recordsEnabled: false, snapshotsEnabled: true, emitter: &fakeEmitter{}, wantEmits: 0}, + {name: "enabled emits", recordsEnabled: true, snapshotsEnabled: true, emitter: &fakeEmitter{}, wantEmits: 1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := ResourceManagerConfig{ + MeterRecordsEnabled: tt.recordsEnabled, + MeterSnapshotsEnabled: tt.snapshotsEnabled, + SnapshotInterval: time.Minute, + } + if tt.emitter != nil { + cfg.Emitter = tt.emitter + } + rm := NewResourceManager(logger.Test(t), cfg) + u := NewUtilization(1, UtilizationFields{ + ResourceType: "operations", + ResourceID: "trigger-1", + EventID: "event-1", + OrgID: "org-1", + }) + rm.EmitMeterRecord(t.Context(), testIdentity, meteringpb.MeterAction_METER_ACTION_RESERVE, []*meteringpb.Utilization{u}) + assert.Len(t, tt.emitter.calls, tt.wantEmits) + }) + } +} + +func TestEmitMeterRecord_Success(t *testing.T) { + emitter := &fakeEmitter{} + rm := NewResourceManager(logger.Test(t), ResourceManagerConfig{ + MeterRecordsEnabled: true, + Emitter: emitter, + }) + + u := NewUtilization(1, UtilizationFields{ + ResourceType: "operations", + ResourceID: "trigger-1", + EventID: "event-1", + OrgID: "org-1", + }) + rm.EmitMeterRecord(t.Context(), testIdentity, meteringpb.MeterAction_METER_ACTION_RESERVE, []*meteringpb.Utilization{u}) + + require.Len(t, emitter.calls, 1) + call := emitter.calls[0] + attrs := attrMap(t, call.attrKVs) + assert.Equal(t, "metering.v1.meter_record", attrs[beholder.AttrKeyDataSchema]) + assert.Equal(t, "platform", attrs[beholder.AttrKeyDomain]) + assert.Equal(t, "metering.v1.MeterRecord", attrs[beholder.AttrKeyEntity]) + + var record meteringpb.MeterRecord + require.NoError(t, proto.Unmarshal(call.body, &record)) + require.NotNil(t, record.GetIdentity()) + assert.Equal(t, testIdentity.ResourcePool, record.GetIdentity().GetResourcePool()) + assert.Equal(t, meteringpb.MeterAction_METER_ACTION_RESERVE, record.GetAction()) + require.Len(t, record.GetUtilizations(), 1) + assert.Equal(t, "1", record.GetUtilizations()[0].GetValue()) + assert.Equal(t, "operations", record.GetUtilizations()[0].GetResourceType()) + assert.Equal(t, "trigger-1", record.GetUtilizations()[0].GetResourceId()) + assert.Equal(t, "event-1", record.GetUtilizations()[0].GetEventId()) + assert.Equal(t, "org-1", record.GetUtilizations()[0].GetOrgId()) +} + +func TestEmitMeterRecord_EmitFailureIsSwallowed(t *testing.T) { + emitter := &fakeEmitter{err: errors.New("collector unavailable")} + rm := NewResourceManager(logger.Test(t), ResourceManagerConfig{ + MeterRecordsEnabled: true, + Emitter: emitter, + }) + + u := NewUtilization(1, UtilizationFields{ + ResourceType: "operations", + ResourceID: "trigger-1", + EventID: "event-2", + OrgID: "org-1", + }) + require.NotPanics(t, func() { + rm.EmitMeterRecord(t.Context(), testIdentity, meteringpb.MeterAction_METER_ACTION_RELEASE, []*meteringpb.Utilization{u}) + }) + require.Len(t, emitter.calls, 1) +} + +func decodeSnapshots(t *testing.T, calls []emitCall) []*meteringpb.MeterSnapshot { + t.Helper() + var snaps []*meteringpb.MeterSnapshot + for _, c := range calls { + attrs := attrMap(t, c.attrKVs) + if attrs[beholder.AttrKeyEntity] != "metering.v1.MeterSnapshot" { + continue + } + var s meteringpb.MeterSnapshot + require.NoError(t, proto.Unmarshal(c.body, &s)) + snaps = append(snaps, &s) + } + return snaps +} + +func TestEmitSnapshot_OnePerResource(t *testing.T) { + emitter := &fakeEmitter{} + clock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + rm := NewResourceManager(logger.Test(t), ResourceManagerConfig{ + MeterRecordsEnabled: true, + MeterSnapshotsEnabled: true, + Emitter: emitter, + SnapshotInterval: 30 * time.Second, + Clock: clock, + }) + + m := &fakeMeterable{identity: testIdentity} + m.set([]SnapshotEntry{ + { + Identity: testIdentity, + Utilizations: []*meteringpb.Utilization{ + NewUtilization(3, UtilizationFields{ + ResourceType: "operations", + ResourceID: "trigger-1", + OrgID: "org-1", + }), + }, + }, + { + Identity: testIdentity, + Utilizations: []*meteringpb.Utilization{ + NewUtilization(5, UtilizationFields{ + ResourceType: "operations", + ResourceID: "trigger-2", + OrgID: "org-2", + }), + }, + }, + }) + unregister := rm.Register(m) + defer unregister() + + servicetest.Run(t, rm) + require.NoError(t, clock.BlockUntilContext(t.Context(), 1)) + clock.Advance(30 * time.Second) + waitForSnapshotCount(t, emitter, 2) + + snaps := decodeSnapshots(t, emitter.snapshot()) + require.Len(t, snaps, 2) + + byID := map[string]*meteringpb.MeterSnapshot{} + for _, s := range snaps { + require.Len(t, s.GetUtilization(), 1) + byID[s.GetUtilization()[0].GetResourceId()] = s + } + require.NotNil(t, byID["trigger-1"]) + assert.Equal(t, "3", byID["trigger-1"].GetUtilization()[0].GetValue()) + require.NotNil(t, byID["trigger-2"]) + assert.Equal(t, "5", byID["trigger-2"].GetUtilization()[0].GetValue()) +} + +func TestSnapshotsCannotRunWithoutRecords(t *testing.T) { + emitter := &fakeEmitter{} + clock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + rm := NewResourceManager(logger.Test(t), ResourceManagerConfig{ + MeterRecordsEnabled: false, + MeterSnapshotsEnabled: true, + Emitter: emitter, + SnapshotInterval: 10 * time.Second, + Clock: clock, + }) + + m := &fakeMeterable{identity: testIdentity} + m.set([]SnapshotEntry{{ + Identity: testIdentity, + Utilizations: []*meteringpb.Utilization{ + NewUtilization(1, UtilizationFields{ + ResourceType: "operations", + ResourceID: "trigger-1", + }), + }, + }}) + rm.Register(m) + servicetest.Run(t, rm) + assert.Empty(t, decodeSnapshots(t, emitter.snapshot())) +} + +func TestNewUtilizationVariants(t *testing.T) { + fields := UtilizationFields{ + ResourceType: "operations", + ResourceID: "rid-1", + EventID: "evt-1", + OrgID: "org-1", + } + + uInt := NewUtilization(42, fields) + assert.Equal(t, "42", uInt.GetValue()) + assert.Equal(t, "operations", uInt.GetResourceType()) + assert.Equal(t, "rid-1", uInt.GetResourceId()) + assert.Equal(t, "evt-1", uInt.GetEventId()) + assert.Equal(t, "org-1", uInt.GetOrgId()) + + uBig := NewUtilizationBig(big.NewInt(0).Exp(big.NewInt(2), big.NewInt(80), nil), fields) + assert.Equal(t, "1208925819614629174706176", uBig.GetValue()) + + uFloat := NewUtilizationFloat(3.5, fields) + assert.Equal(t, "3.5", uFloat.GetValue()) +} + +func TestEmitMeterRecord_BeholderObserver(t *testing.T) { + observer := beholdertest.NewObserver(t) + rm := NewResourceManager(logger.Test(t), ResourceManagerConfig{ + MeterRecordsEnabled: true, + Emitter: beholder.GetEmitter(), + }) + u := NewUtilization(1, UtilizationFields{ + ResourceType: "operations", + ResourceID: "trigger-1", + EventID: "event-1", + OrgID: "org-1", + }) + rm.EmitMeterRecord(t.Context(), testIdentity, meteringpb.MeterAction_METER_ACTION_RESERVE, []*meteringpb.Utilization{u}) + + msgs := observer.Messages(t, beholder.AttrKeyEntity, "metering.v1.MeterRecord") + require.Len(t, msgs, 1) +} diff --git a/pkg/types/core/gateway_connector.go b/pkg/types/core/gateway_connector.go index b402ad5083..6111435bc3 100644 --- a/pkg/types/core/gateway_connector.go +++ b/pkg/types/core/gateway_connector.go @@ -48,7 +48,7 @@ type GatewayConnectorHandler interface { } var ( - _ GatewayConnector = (*UnimplementedGatewayConnector)(nil) + _ GatewayConnector = (*UnimplementedGatewayConnector)(nil) _ MultiGatewayConnector = (*UnimplementedGatewayConnector)(nil) )