Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions collector/receiver/telemetryapireceiver/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Telemetry API Receiver

| Status | |
| ------------------------ |--------------|
| Stability | [alpha] |
| Supported pipeline types | traces, logs |
| Distributions | [extension] |
| Status | |
|--------------------------|-----------------------|
| Stability | [alpha] |
| Supported pipeline types | traces, logs, metrics |
| Distributions | [extension] |

This receiver generates telemetry in response to events from the [Telemetry API](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html). It does this by setting up an endpoint and registering itself with the Telemetry API on startup.

Expand All @@ -15,22 +15,26 @@ Supported events:

## Configuration

| Field | Default | Description |
|---------|---------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 4325 | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |
| Field | Default | Description |
|-----------------------|---------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `port` | 4325 | HTTP server port to receive Telemetry API data. |
| `types` | ["platform", "function", "extension"] | [Types](https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api-reference.html#telemetry-subscribe-api) of telemetry to subscribe to |
| `metrics_temporality` | cumulative | The [aggregation temporality](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#temporality) to use for metrics. Supported values: `delta`, `cumulative`. |
| `export_interval_ms` | 60000 | The interval in milliseconds at which metrics are exported. If set to 0, metrics are exported immediately upon receipt. |


```yaml
receivers:
telemetryapi:
telemetryapi/1:
port: 4326
export_interval_ms: 30000
telemetryapi/2:
port: 4327
types:
- platform
- function
metrics_temporality: delta
telemetryapi/3:
port: 4328
types: ["platform", "function"]
Expand Down
4 changes: 4 additions & 0 deletions collector/receiver/telemetryapireceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@ type Config struct {
Types []string `mapstructure:"types"`
LogReport bool `mapstructure:"log_report"`
MetricsTemporality string `mapstructure:"metrics_temporality"`
ExportInterval int `mapstructure:"export_interval_ms"`
}

// Validate validates the configuration by checking for missing or invalid fields
func (cfg *Config) Validate() error {
if cfg.ExportInterval < 0 {
return fmt.Errorf("export_interval_ms must be non-negative: %d", cfg.ExportInterval)
}
for _, t := range cfg.Types {
if t != platform && t != function && t != extension {
return fmt.Errorf("unknown extension type: %s", t)
Expand Down
7 changes: 4 additions & 3 deletions collector/receiver/telemetryapireceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ func TestLoadConfig(t *testing.T) {
// Helper function to create expected Config
createExpectedConfig := func(types []string) *Config {
return &Config{
extensionID: "extensionID",
Port: 12345,
Types: types,
extensionID: "extensionID",
Port: 12345,
Types: types,
ExportInterval: defaultExportInterval,
}
}

Expand Down
20 changes: 11 additions & 9 deletions collector/receiver/telemetryapireceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
)

const (
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
platform = "platform"
function = "function"
extension = "extension"
typeStr = "telemetryapi"
stability = component.StabilityLevelDevelopment
defaultPort = 4325
defaultExportInterval = 60000
platform = "platform"
function = "function"
extension = "extension"
)

var (
Expand All @@ -44,9 +45,10 @@ func NewFactory(extensionID string) receiver.Factory {
Type,
func() component.Config {
return &Config{
extensionID: extensionID,
Port: defaultPort,
Types: []string{platform, function, extension},
extensionID: extensionID,
Port: defaultPort,
Types: []string{platform, function, extension},
ExportInterval: defaultExportInterval,
}
},
receiver.WithTraces(createTracesReceiver, stability),
Expand Down
7 changes: 6 additions & 1 deletion collector/receiver/telemetryapireceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ func TestNewFactory(t *testing.T) {
testFunc: func(t *testing.T) {
factory := NewFactory("test")

var expectedCfg component.Config = &Config{extensionID: "test", Port: defaultPort, Types: []string{platform, function, extension}}
var expectedCfg component.Config = &Config{
extensionID: "test",
Port: defaultPort,
Types: []string{platform, function, extension},
ExportInterval: defaultExportInterval,
}

require.Equal(t, expectedCfg, factory.CreateDefaultConfig())
},
Expand Down
95 changes: 65 additions & 30 deletions collector/receiver/telemetryapireceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type telemetryAPIReceiver struct {
faaSMetricBuilders *FaaSMetricBuilders
currentFaasInvocationID string
logReport bool
exportInterval time.Duration
stopCh chan struct{}
wg sync.WaitGroup
}

func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error {
Expand All @@ -94,6 +97,11 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e
_ = r.httpServer.ListenAndServe()
}()

if r.exportInterval > 0 {
r.wg.Add(1)
go r.startMetricsExporter()
}

telemetryClient := telemetryapi.NewClient(r.logger)
if len(r.types) > 0 {
_, err := telemetryClient.Subscribe(ctx, r.types, r.extensionID, fmt.Sprintf("http://%s/", address))
Expand All @@ -106,9 +114,60 @@ func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) e
}

func (r *telemetryAPIReceiver) Shutdown(ctx context.Context) error {
close(r.stopCh)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be shutting down the httpServer here first?

Suggested change
close(r.stopCh)
r.httpServer.Shutdown(ctx)
close(r.stopCh)

r.wg.Wait()
if r.exportInterval > 0 {
r.flushMetrics(ctx)
}
return nil
}

func (r *telemetryAPIReceiver) startMetricsExporter() {
defer r.wg.Done()
ticker := time.NewTicker(r.exportInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
r.flushMetrics(context.Background())
case <-r.stopCh:
return
}
}
}

func (r *telemetryAPIReceiver) flushMetrics(ctx context.Context) {
r.mu.Lock()
defer r.mu.Unlock()
r.flushMetricsLocked(ctx)
}

func (r *telemetryAPIReceiver) flushMetricsLocked(ctx context.Context) {
metric := pmetric.NewMetrics()
resourceMetric := metric.ResourceMetrics().AppendEmpty()
r.resource.CopyTo(resourceMetric.Resource())
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
scopeMetric.Scope().SetName(scopeName)
scopeMetric.SetSchemaUrl(semconv.SchemaURL)

ts := pcommon.NewTimestampFromTime(time.Now())
r.faaSMetricBuilders.coldstartsMetric.AppendDataPoints(scopeMetric, ts)
r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, ts)
r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, ts)
r.faaSMetricBuilders.initDurationMetric.AppendDataPoints(scopeMetric, ts)
r.faaSMetricBuilders.memUsageMetric.AppendDataPoints(scopeMetric, ts)
r.faaSMetricBuilders.invocationsMetric.AppendDataPoints(scopeMetric, ts)
r.faaSMetricBuilders.invokeDurationMetric.AppendDataPoints(scopeMetric, ts)

if metric.MetricCount() > 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edge case here but think of the following scenario:

  • User configures delta temporality
  • User removes their metrics pipeline but forgets to remove the delta temporality config
    Their collector config could look something like this:
receivers:
  telemetryapi:
    types:
      - platform
    metrics_temporality: "delta"
...
service:
  pipelines:
    traces:
      receivers: [telemetryapi]
      exporters: [otlp]

This leds to r.nextMetrics being nil (see factory.go where the createMetricsReceiver method is not called in this case).

Whenever the ticker fires and flushMetricsLocked method is now called, the AppendDataPoints methods will not early return. Because for delta temporality specifically the export variable will be true:

export := h.temporality == pmetric.AggregationTemporalityDelta

I believe that is problematic, because now following calls inside that same AppendDataPoints method are causing an empty Metric struct to be appended to scopeMetrics:

metric := scopeMetrics.Metrics().AppendEmpty()

So the check for metric.MetricCount() > 0 here will always evaluate to true when using delta temporality. But since we no longer have that metrics pipeline, the r.nextMetrics.ConsumeMetrics(ctx, metric call below will panic because r.nextMetrics is nil.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, even when properly using an actual metrics pipeline We run the risk of just producing metrics with 0 datapoints when temporality is delta, which seems useless / wasteful.

err := r.nextMetrics.ConsumeMetrics(ctx, metric)
if err != nil {
r.logger.Error("error flushing metrics", zap.Error(err))
}
}
}

func newSpanID() pcommon.SpanID {
sid := pcommon.SpanID{}
_, _ = crand.Read(sid[:])
Expand Down Expand Up @@ -198,13 +257,9 @@ func (r *telemetryAPIReceiver) httpHandler(w http.ResponseWriter, req *http.Requ
}
// Metrics
if r.nextMetrics != nil {
if metrics, err := r.createMetrics(slice); err == nil {
if metrics.MetricCount() > 0 {
err := r.nextMetrics.ConsumeMetrics(context.Background(), metrics)
if err != nil {
r.logger.Error("error receiving metrics", zap.Error(err))
}
}
r.recordMetrics(slice)
if r.exportInterval == 0 {
r.flushMetricsLocked(context.Background())
}
}

Expand Down Expand Up @@ -233,37 +288,22 @@ func (r *telemetryAPIReceiver) getRecordRequestId(record map[string]interface{})
return ""
}

func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, error) {
metric := pmetric.NewMetrics()
resourceMetric := metric.ResourceMetrics().AppendEmpty()
r.resource.CopyTo(resourceMetric.Resource())
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
scopeMetric.Scope().SetName(scopeName)
scopeMetric.SetSchemaUrl(semconv.SchemaURL)

func (r *telemetryAPIReceiver) recordMetrics(slice []event) {
for _, el := range slice {
r.logger.Debug(fmt.Sprintf("Event: %s", el.Type), zap.Any("event", el))
record, ok := el.Record.(map[string]any)
if !ok {
continue
}
ts, err := time.Parse(time.RFC3339, el.Time)
if err != nil {
continue
}

switch el.Type {
case string(telemetryapi.PlatformInitStart):
r.faaSMetricBuilders.coldstartsMetric.Add(1)
r.faaSMetricBuilders.coldstartsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
case string(telemetryapi.PlatformInitReport):
status, _ := record["status"].(string)
if status == telemetryFailureStatus || status == telemetryErrorStatus {
r.faaSMetricBuilders.errorsMetric.Add(1)
r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
} else if status == telemetryTimeoutStatus {
r.faaSMetricBuilders.timeoutsMetric.Add(1)
r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
}

metrics, ok := record["metrics"].(map[string]any)
Expand All @@ -277,7 +317,6 @@ func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, er
}

r.faaSMetricBuilders.initDurationMetric.Record(durationMs / 1000.0)
r.faaSMetricBuilders.initDurationMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
case string(telemetryapi.PlatformReport):
metrics, ok := record["metrics"].(map[string]any)
if !ok {
Expand All @@ -287,20 +326,16 @@ func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, er
maxMemoryUsedMb, ok := metrics["maxMemoryUsedMB"].(float64)
if ok {
r.faaSMetricBuilders.memUsageMetric.Record(maxMemoryUsedMb * 1000000.0)
r.faaSMetricBuilders.memUsageMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
}
case string(telemetryapi.PlatformRuntimeDone):
status, _ := record["status"].(string)

if status == telemetrySuccessStatus {
r.faaSMetricBuilders.invocationsMetric.Add(1)
r.faaSMetricBuilders.invocationsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
} else if status == telemetryFailureStatus || status == telemetryErrorStatus {
r.faaSMetricBuilders.errorsMetric.Add(1)
r.faaSMetricBuilders.errorsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
} else if status == telemetryTimeoutStatus {
r.faaSMetricBuilders.timeoutsMetric.Add(1)
r.faaSMetricBuilders.timeoutsMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
}

metrics, ok := record["metrics"].(map[string]any)
Expand All @@ -311,11 +346,9 @@ func (r *telemetryAPIReceiver) createMetrics(slice []event) (pmetric.Metrics, er
durationMs, ok := metrics["durationMs"].(float64)
if ok {
r.faaSMetricBuilders.invokeDurationMetric.Record(durationMs / 1000.0)
r.faaSMetricBuilders.invokeDurationMetric.AppendDataPoints(scopeMetric, pcommon.NewTimestampFromTime(ts))
}
}
}
return metric, nil
}

func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) {
Expand Down Expand Up @@ -675,6 +708,8 @@ func newTelemetryAPIReceiver(
resource: r,
faaSMetricBuilders: NewFaaSMetricBuilders(pcommon.NewTimestampFromTime(time.Now()), getMetricsTemporality(cfg)),
logReport: cfg.LogReport,
exportInterval: time.Duration(cfg.ExportInterval) * time.Millisecond,
stopCh: make(chan struct{}),
}, nil
}

Expand Down
Loading
Loading