diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index ff4b3cd218d..15b4367fd54 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -78,6 +78,7 @@ public final class GeneralConfig { "trace.tracer.metrics.ignored.resources"; public static final String TRACE_STATS_CARDINALITY_LIMITS_ENABLED = "trace.stats.cardinality.limits.enabled"; + public static final String TRACE_STATS_ADDITIONAL_TAGS = "trace.stats.additional.tags"; public static final String AZURE_APP_SERVICES = "azure.app.services"; public static final String INTERNAL_EXIT_ON_FAILURE = "trace.internal.exit.on.failure"; diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdditionalTagsMetricsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdditionalTagsMetricsBenchmark.java new file mode 100644 index 00000000000..b07772f7852 --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdditionalTagsMetricsBenchmark.java @@ -0,0 +1,154 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import de.thetaphi.forbiddenapis.SuppressForbidden; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * JMH benchmark exercising the span-derived primary tags pipeline added in CSS v1.3.0. Parallel to + * {@link AdversarialMetricsBenchmark} but configures two additional-tag keys (each with a per-key + * cardinality cap of {@link MetricCardinalityLimits#ADDITIONAL_TAG_VALUE}) and generates unique + * values per op so the cap saturates fast. The benchmark measures the cost of: + * + * + * + *

The aim is not absolute throughput numbers but a regression guard for the additional-tags hot + * path: any future refactor that adds a tag-map lookup, allocates per call, or pulls the + * sentinel-materialization onto the hot path should show up as a step change here. + * + *

Interpreting the {@code limitsEnabled} parameter. The two arms are NOT a fair "cost of + * limiting" comparison and should not be read as one. With this benchmark's unbounded distinct + * values, the per-key budget saturates almost immediately and the two modes diverge into different + * downstream behavior, not just a different branch in {@code register}: + * + *

+ * + *

So {@code limitsEnabled=true} measures lower throughput here precisely because it does MORE + * useful work per span (it keeps the masked data and records it) where the disabled arm drops the + * overflow. A 2026-06-03 run (3 forks, -prof gc) measured {@code false} at ~19.8M ops/s / 820 B/op + * and {@code true} at ~12.4M ops/s / 888 B/op -- the higher per-op allocation under limits is the + * histogram recording, not the sentinel path. Throughput CIs were wide (>20%); the per-op + * allocation figures are the reliable signal. A production workload with a bounded value set never + * saturates the budget and sees neither arm's overflow behavior (cf. {@code + * HighCardinalityResourceMetricsBenchmark}, which is at parity with limits on/off). + */ +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(SECONDS) +@Threads(8) +@Fork(value = 1) +public class AdditionalTagsMetricsBenchmark { + + private ClientStatsAggregator aggregator; + private AdversarialMetricsBenchmark.CountingHealthMetrics health; + + /** + * Whether the {@link TagCardinalityHandler}s inside {@link AdditionalTagsSchema} substitute the + * {@code blocked_by_tracer} sentinel once the per-key budget is exhausted. JMH runs both values + * within the same fork so the two modes see equivalent thermal conditions. + */ + @Param({"false", "true"}) + public boolean limitsEnabled; + + @State(Scope.Thread) + public static class ThreadState { + int cursor; + } + + @Setup + public void setup() { + this.health = new AdversarialMetricsBenchmark.CountingHealthMetrics(); + // Two configured additional tags. Each key gets a TagCardinalityHandler capped at + // MetricCardinalityLimits.ADDITIONAL_TAG_VALUE (512) distinct values per cycle. The benchmark + // generates 65k distinct values per key so the cap saturates quickly and most ops return the + // blocked sentinel -- that's the contention we want to measure. + AdditionalTagsSchema additionalTagsSchema = + AdditionalTagsSchema.from( + new LinkedHashSet<>(Arrays.asList("region", "tenant_id")), this.health, limitsEnabled); + this.aggregator = + new ClientStatsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + additionalTagsSchema, + new ClientStatsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()), + this.health, + new ClientStatsAggregatorBenchmark.NullSink(), + 2048, + 2048, + false); + this.aggregator.start(); + } + + @TearDown + @SuppressForbidden + public void tearDown() { + aggregator.close(); + System.err.println("[ADDITIONAL-TAGS] counters (across all threads, single fork):"); + System.err.println(" onStatsInboxFull = " + health.inboxFull.sum()); + System.err.println(" onStatsAggregateDropped = " + health.aggregateDropped.sum()); + } + + @Benchmark + public void publish(ThreadState ts, Blackhole blackhole) { + int idx = ts.cursor++; + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + // Distinct values per op -- 65k regions × 65k tenants × random durations. With the per-key cap + // (ADDITIONAL_TAG_VALUE = 512), the first 512 distinct values per key admit; the rest collapse + // to the blocked sentinel and increment the per-tag block counter via the schema's flush path. + int scrambled = idx * 0x9E3779B1; + String region = "region-" + ((scrambled >>> 4) & 0xFFFF); + String tenant = "tenant-" + ((scrambled >>> 16) & 0xFFFF); + long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL); + + SimpleSpan span = + new SimpleSpan("svc", "op", "res", "web", true, true, false, 0, durationNanos, 200); + span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + span.setTag("region", region); + span.setTag("tenant_id", tenant); + + List> trace = Collections.singletonList(span); + blackhole.consume(aggregator.publish(trace)); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsSchema.java new file mode 100644 index 00000000000..376adc19a21 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsSchema.java @@ -0,0 +1,165 @@ +package datadog.trace.common.metrics; + +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.core.monitor.HealthMetrics; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Immutable schema describing the configured span-derived primary tag keys. Built once from {@code + * Config.getTraceStatsAdditionalTags()} at aggregator construction; not replaced at runtime. + * + *

Parallels {@link PeerTagSchema} for shape: a sorted, deduped, validated, capped {@code + * String[]} of names plus per-name {@link TagCardinalityHandler}s for UTF8 caching and value-level + * cardinality limiting. The handlers are reset each reporting cycle via {@link #resetHandlers()}. + * + *

What's pre-built: + * + *

+ */ +final class AdditionalTagsSchema { + + private static final Logger log = LoggerFactory.getLogger(AdditionalTagsSchema.class); + + /** + * Backend stats pipeline supports a small number of primary tag dimensions (4 by default, up to + * ~10 for elevated quotas). Configuring more than this is misuse; we drop the overflow at + * startup. + */ + static final int MAX_ADDITIONAL_TAG_KEYS = 10; + + static final String BLOCKED_VALUE = "blocked_by_tracer"; + + /** Singleton empty schema returned when no additional tags are configured. */ + static final AdditionalTagsSchema EMPTY = + new AdditionalTagsSchema( + new String[0], new UTF8BytesString[0], new TagCardinalityHandler[0], HealthMetrics.NO_OP); + + final String[] names; + final UTF8BytesString[] blockedSentinels; + + /** Per-key handlers providing UTF8 caching and per-cycle cardinality limiting. */ + private final TagCardinalityHandler[] handlers; + + private final HealthMetrics healthMetrics; + + private AdditionalTagsSchema( + String[] names, + UTF8BytesString[] blockedSentinels, + TagCardinalityHandler[] handlers, + HealthMetrics healthMetrics) { + this.names = names; + this.blockedSentinels = blockedSentinels; + this.handlers = handlers; + this.healthMetrics = healthMetrics; + } + + /** + * Builds a schema from the configured tag keys. Validates each key (non-empty, no {@code :}), + * sorts alphabetically, dedupes, and caps at {@link #MAX_ADDITIONAL_TAG_KEYS}. Returns the shared + * empty schema when {@code configured} is null or empty. + */ + /** Test convenience: uses {@link HealthMetrics#NO_OP}. */ + static AdditionalTagsSchema from(Set configured) { + return from(configured, HealthMetrics.NO_OP, AggregateEntry.LIMITS_ENABLED); + } + + static AdditionalTagsSchema from(Set configured, HealthMetrics healthMetrics) { + return from(configured, healthMetrics, AggregateEntry.LIMITS_ENABLED); + } + + static AdditionalTagsSchema from( + Set configured, HealthMetrics healthMetrics, boolean useBlockedSentinel) { + if (configured == null || configured.isEmpty()) { + return EMPTY; + } + List valid = new ArrayList<>(); + for (String key : configured) { + if (key == null || key.isEmpty()) { + log.warn("Ignoring empty additional metric tag key"); + continue; + } + if (key.contains(":")) { + log.warn("Ignoring additional metric tag key '{}': keys must not contain ':'", key); + continue; + } + valid.add(key); + } + if (valid.isEmpty()) { + return EMPTY; + } + Collections.sort(valid); + // Dedup (sort brings duplicates adjacent) + List deduped = new ArrayList<>(valid.size()); + String prev = null; + for (String key : valid) { + if (!key.equals(prev)) { + deduped.add(key); + prev = key; + } + } + if (deduped.size() > MAX_ADDITIONAL_TAG_KEYS) { + log.warn( + "Configured additional metric tag keys ({}) exceeds the supported limit of {}; " + + "dropping extra keys: {}", + deduped.size(), + MAX_ADDITIONAL_TAG_KEYS, + deduped.subList(MAX_ADDITIONAL_TAG_KEYS, deduped.size())); + deduped = deduped.subList(0, MAX_ADDITIONAL_TAG_KEYS); + } + String[] namesArr = deduped.toArray(new String[0]); + UTF8BytesString[] sentinels = new UTF8BytesString[namesArr.length]; + TagCardinalityHandler[] handlersArr = new TagCardinalityHandler[namesArr.length]; + for (int i = 0; i < namesArr.length; i++) { + sentinels[i] = UTF8BytesString.create(namesArr[i] + ":" + BLOCKED_VALUE); + handlersArr[i] = + new TagCardinalityHandler( + namesArr[i], MetricCardinalityLimits.ADDITIONAL_TAG_VALUE, useBlockedSentinel); + } + return new AdditionalTagsSchema(namesArr, sentinels, handlersArr, healthMetrics); + } + + int size() { + return names.length; + } + + String name(int i) { + return names[i]; + } + + UTF8BytesString blockedSentinel(int i) { + return blockedSentinels[i]; + } + + /** + * Canonicalizes {@code value} for the additional tag at slot {@code i} through the per-key {@link + * TagCardinalityHandler}: provides UTF8 caching and returns the per-tag blocked sentinel when the + * per-cycle budget is exhausted. Returns {@link UTF8BytesString#EMPTY} for null inputs. + */ + UTF8BytesString register(int i, String value) { + return handlers[i].register(value); + } + + /** + * Resets every handler's working set and flushes accumulated block counts to {@link + * HealthMetrics}. Must be called on the aggregator thread each cycle. + */ + void resetHandlers() { + for (int i = 0; i < handlers.length; i++) { + long blocked = handlers[i].reset(); + if (blocked > 0) { + healthMetrics.onTagCardinalityBlocked(names[i], blocked); + } + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java index 5e9e29e7458..e02ce3f3264 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateEntry.java @@ -6,9 +6,12 @@ import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.Hashtable; import datadog.trace.util.LongHashingUtils; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLongArray; import javax.annotation.Nullable; /** @@ -100,6 +103,9 @@ final class AggregateEntry extends Hashtable.Entry { */ static final boolean LIMITS_ENABLED = Config.get().isTraceStatsCardinalityLimitsEnabled(); + /** Shared empty array for entries with no additional-tags schema configured. */ + private static final UTF8BytesString[] EMPTY_ADDITIONAL_TAGS = new UTF8BytesString[0]; + // Per-field cardinality handlers. Limits live on MetricCardinalityLimits -- see that class for // per-field rationale. static final PropertyCardinalityHandler RESOURCE_HANDLER = @@ -138,8 +144,17 @@ final class AggregateEntry extends Hashtable.Entry { final boolean traceRoot; final List peerTags; - // Mutable aggregate state -- single-thread (aggregator) writer. - private final Histogram okLatencies = Histogram.newHistogram(); + /** + * Compact, schema-ordered (alphabetical by key) array of the additional metric tag values the + * span actually set -- only present values, no null slots. Each element is the canonical {@code + * "key:value"} UTF8BytesString (or the schema's blocked sentinel when the per-cycle cardinality + * budget was exhausted). The {@code "key:"} prefix makes the packed form unambiguous. Empty array + * for the no-additional-tags case (shared {@link #EMPTY_ADDITIONAL_TAGS}). + */ + final UTF8BytesString[] additionalTags; + + // Recording state. Mutated only on the aggregator thread. Not thread-safe. + private final Histogram okLatencies; /** * Lazily allocated on the first recorded error. Most entries never see an error and keep this @@ -169,7 +184,8 @@ private AggregateEntry( short httpStatusCode, boolean synthetic, boolean traceRoot, - List peerTags) { + List peerTags, + UTF8BytesString[] additionalTags) { super(keyHash); this.resource = resource; this.service = service; @@ -184,86 +200,8 @@ private AggregateEntry( this.synthetic = synthetic; this.traceRoot = traceRoot; this.peerTags = peerTags; - } - - /** - * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link - * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. - */ - AggregateEntry recordOneDuration(long tagAndDuration) { - ++hitCount; - if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { - tagAndDuration ^= TOP_LEVEL_TAG; - ++topLevelCount; - } - if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { - tagAndDuration ^= ERROR_TAG; - errorLatenciesForWrite().accept(tagAndDuration); - ++errorCount; - } else { - okLatencies.accept(tagAndDuration); - } - duration += tagAndDuration; - return this; - } - - int getErrorCount() { - return errorCount; - } - - int getHitCount() { - return hitCount; - } - - int getTopLevelCount() { - return topLevelCount; - } - - long getDuration() { - return duration; - } - - Histogram getOkLatencies() { - return okLatencies; - } - - /** - * Returns the entry's error-latency histogram, or {@code null} if no error has been recorded. - * Callers serializing this should treat {@code null} as "emit a cached empty histogram"; see - * {@link SerializingMetricWriter}. - */ - @Nullable - Histogram getErrorLatencies() { - return errorLatencies; - } - - /** Lazy-allocates {@link #errorLatencies} on the first error. */ - private Histogram errorLatenciesForWrite() { - Histogram h = errorLatencies; - if (h == null) { - h = Histogram.newHistogram(); - errorLatencies = h; - } - return h; - } - - /** - * Resets the per-cycle counters and histograms. Label fields ({@code resource}, {@code service}, - * ..., {@code peerTags}) are deliberately left intact -- they're the entry's bucket identity and - * must persist so a subsequent snapshot with the same key reuses this entry instead of allocating - * a fresh one. Entries that stay at {@code hitCount == 0} across a cycle are reaped by {@link - * AggregateTable#expungeStaleAggregates}. - */ - void clear() { - this.errorCount = 0; - this.hitCount = 0; - this.topLevelCount = 0; - this.duration = 0; - this.okLatencies.clear(); - // errorLatencies stays null on entries that never errored. Only clear if it was allocated. - if (this.errorLatencies != null) { - this.errorLatencies.clear(); - } + this.additionalTags = additionalTags; + this.okLatencies = Histogram.newHistogram(); } /** @@ -312,7 +250,9 @@ static AggregateEntry of( synthetic, traceRoot, peerTagsArr, - peerTagsArr.length); + peerTagsArr.length, + EMPTY_ADDITIONAL_TAGS, + 0); return new AggregateEntry( keyHash, resourceUtf, @@ -327,7 +267,8 @@ static AggregateEntry of( (short) httpStatusCode, synthetic, traceRoot, - peerTagsList); + peerTagsList, + EMPTY_ADDITIONAL_TAGS); } /** @@ -384,7 +325,9 @@ static long hashOf( boolean synthetic, boolean traceRoot, UTF8BytesString[] peerTags, - int peerTagCount) { + int peerTagCount, + UTF8BytesString[] additionalTags, + int additionalTagCount) { long h = 0; h = LongHashingUtils.addToHash(h, resource); h = LongHashingUtils.addToHash(h, service); @@ -398,6 +341,11 @@ static long hashOf( for (int i = 0; i < peerTagCount; i++) { h = LongHashingUtils.addToHash(h, peerTags[i]); } + // Additional tags are packed compactly in schema order (alphabetical by key); each carries its + // "key:" prefix so the packed form is unambiguous without positional null slots. + for (int i = 0; i < additionalTagCount; i++) { + h = LongHashingUtils.addToHash(h, additionalTags[i]); + } h = LongHashingUtils.addToHash(h, httpStatusCode); h = LongHashingUtils.addToHash(h, synthetic); h = LongHashingUtils.addToHash(h, traceRoot); @@ -489,9 +437,148 @@ List getPeerTags() { return peerTags; } - // Production AggregateEntry intentionally has no equals/hashCode override -- AggregateTable - // bucketing uses keyHash + Canonical.matches and never invokes Object.equals. For tests that - // need value-equality (Spock argument matchers), use AggregateEntryTestUtils in src/test. + /** + * Returns the configured-additional-tag values in schema (alphabetical-by-key) order. Each slot + * is either {@code null} (span didn't set that tag) or the canonical {@code "key:value"} + * UTF8BytesString. The array's length matches the schema; empty array when no additional tags are + * configured. + */ + UTF8BytesString[] getAdditionalTags() { + return additionalTags; + } + + // ----- recording state accessors ----- + + int getHitCount() { + return hitCount; + } + + int getErrorCount() { + return errorCount; + } + + int getTopLevelCount() { + return topLevelCount; + } + + long getDuration() { + return duration; + } + + Histogram getOkLatencies() { + return okLatencies; + } + + /** + * Returns the entry's error latency histogram, or {@code null} if no error has been recorded yet. + * Callers should treat null as "serialize as an empty histogram" (see {@link + * SerializingMetricWriter}). + */ + Histogram getErrorLatencies() { + return errorLatencies; + } + + /** Lazy-allocates {@link #errorLatencies} on the first error. */ + private Histogram errorLatenciesForWrite() { + Histogram h = errorLatencies; + if (h == null) { + h = Histogram.newHistogram(); + errorLatencies = h; + } + return h; + } + + /** + * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link + * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. + */ + AggregateEntry recordOneDuration(long tagAndDuration) { + ++hitCount; + if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + tagAndDuration ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { + tagAndDuration ^= ERROR_TAG; + errorLatenciesForWrite().accept(tagAndDuration); + ++errorCount; + } else { + okLatencies.accept(tagAndDuration); + } + duration += tagAndDuration; + return this; + } + + /** + * Records {@code count} durations from {@code durations} (positions 0..count-1). Used by + * integration tests; production code uses {@link #recordOneDuration}. + */ + AggregateEntry recordDurations(int count, AtomicLongArray durations) { + this.hitCount += count; + for (int i = 0; i < count && i < durations.length(); ++i) { + long d = durations.getAndSet(i, 0); + if ((d & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + d ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((d & ERROR_TAG) == ERROR_TAG) { + d ^= ERROR_TAG; + errorLatenciesForWrite().accept(d); + ++errorCount; + } else { + okLatencies.accept(d); + } + this.duration += d; + } + return this; + } + + /** + * Clears the recording state. The OK histogram is reused; the error histogram (if allocated) is + * reused too, but entries that never saw an error keep their {@code errorLatencies} field null. + */ + @SuppressFBWarnings("AT_NONATOMIC_64BIT_PRIMITIVE") + void clearAggregate() { + this.errorCount = 0; + this.hitCount = 0; + this.topLevelCount = 0; + this.duration = 0; + this.okLatencies.clear(); + if (this.errorLatencies != null) { + this.errorLatencies.clear(); + } + } + + /** + * Equality on the 13 label fields (not on the recording counters). Used only by test mock + * matchers; the {@link Hashtable} does its own bucketing via {@link #keyHash} + {@link + * Canonical#matches} and never calls {@code equals}. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof AggregateEntry)) return false; + AggregateEntry that = (AggregateEntry) o; + return httpStatusCode == that.httpStatusCode + && synthetic == that.synthetic + && traceRoot == that.traceRoot + && Objects.equals(resource, that.resource) + && Objects.equals(service, that.service) + && Objects.equals(operationName, that.operationName) + && Objects.equals(serviceSource, that.serviceSource) + && Objects.equals(type, that.type) + && Objects.equals(spanKind, that.spanKind) + && Objects.equals(peerTags, that.peerTags) + && Arrays.equals(additionalTags, that.additionalTags) + && Objects.equals(httpMethod, that.httpMethod) + && Objects.equals(httpEndpoint, that.httpEndpoint) + && Objects.equals(grpcStatusCode, that.grpcStatusCode); + } + + @Override + public int hashCode() { + return (int) keyHash; + } /** * Reusable scratch buffer for canonicalizing a {@link SpanSnapshot} into UTF8 fields, computing @@ -525,8 +612,28 @@ static final class Canonical { int peerTagsSize = 0; + /** Schema + per-key blocked sentinels for additional metric tags. Immutable. */ + final AdditionalTagsSchema additionalTagsSchema; + + /** + * Reusable scratch for canonicalized additional-tag values, sized to the schema. Present values + * are packed at the front in schema order (alphabetical by key); {@link #additionalTagsSize} + * gives the count. Each entry is a {@code "key:value"} UTF8BytesString, so packing loses no + * information -- the key prefix disambiguates which key a value belongs to. Mirrors the {@code + * peerTagsBuffer + peerTagsSize} pattern. {@link #toEntry} copies the populated prefix into the + * new entry. + */ + final UTF8BytesString[] additionalTagsBuffer; + + int additionalTagsSize; + long keyHash; + Canonical(AdditionalTagsSchema additionalTagsSchema) { + this.additionalTagsSchema = additionalTagsSchema; + this.additionalTagsBuffer = new UTF8BytesString[additionalTagsSchema.size()]; + } + /** Canonicalize all fields from {@code s} through the handlers into this buffer. */ void populate(SpanSnapshot s) { this.resource = RESOURCE_HANDLER.register(s.resourceName); @@ -542,22 +649,33 @@ void populate(SpanSnapshot s) { this.synthetic = s.synthetic; this.traceRoot = s.traceRoot; populatePeerTags(s.peerTagSchema, s.peerTagValues); - this.keyHash = - hashOf( - resource, - service, - operationName, - serviceSource, - type, - spanKind, - httpMethod, - httpEndpoint, - grpcStatusCode, - httpStatusCode, - synthetic, - traceRoot, - peerTagsBuffer != null ? peerTagsBuffer : EMPTY_PEER_TAGS, - peerTagsSize); + populateAdditionalTags(s.additionalTagValues); + this.keyHash = computeKeyHash(); + } + + /** Recompute the key hash from the current buffer state (used after a blocked-rebuild). */ + void recomputeKeyHash() { + this.keyHash = computeKeyHash(); + } + + private long computeKeyHash() { + return hashOf( + resource, + service, + operationName, + serviceSource, + type, + spanKind, + httpMethod, + httpEndpoint, + grpcStatusCode, + httpStatusCode, + synthetic, + traceRoot, + peerTagsBuffer != null ? peerTagsBuffer : EMPTY_PEER_TAGS, + peerTagsSize, + additionalTagsBuffer, + additionalTagsSize); } /** @@ -567,7 +685,7 @@ void populate(SpanSnapshot s) { * Producer-side {@code capturePeerTagValues} produces sparse-null arrays, so the skip pays off * whenever a span carries only a subset of the configured peer tags. */ - private void populatePeerTags(PeerTagSchema schema, String[] values) { + private void populatePeerTags(@Nullable PeerTagSchema schema, @Nullable String[] values) { peerTagsSize = 0; if (schema == null || values == null) { return; @@ -585,6 +703,27 @@ private void populatePeerTags(PeerTagSchema schema, String[] values) { } } + /** + * Packs canonical {@code "key:value"} UTF8BytesStrings for each present slot of {@code values} + * into the front of {@link #additionalTagsBuffer} (schema order), via {@link + * AdditionalTagsSchema#register}, and sets {@link #additionalTagsSize}. The handler returns the + * per-key blocked sentinel when the per-cycle value budget is exhausted. + */ + private void populateAdditionalTags(@Nullable String[] values) { + additionalTagsSize = 0; + int n = additionalTagsBuffer.length; + if (n == 0 || values == null) { + return; + } + for (int i = 0; i < n; i++) { + String v = values[i]; + if (v == null) { + continue; + } + additionalTagsBuffer[additionalTagsSize++] = additionalTagsSchema.register(i, v); + } + } + /** * Whether this canonicalized snapshot matches the given entry. Compares UTF8 fields via * content-equality (so an entry surviving a handler reset still matches a freshly-canonicalized @@ -608,7 +747,22 @@ boolean matches(AggregateEntry e) { && peerTagsEqual(peerTagsBuffer, peerTagsSize, e.peerTags) && httpStatusCode == e.httpStatusCode && synthetic == e.synthetic - && traceRoot == e.traceRoot; + && traceRoot == e.traceRoot + && additionalTagsEqual(additionalTagsBuffer, additionalTagsSize, e.additionalTags); + } + + /** Compact compare: first {@code aSize} slots of {@code a} against the entry's packed array. */ + private static boolean additionalTagsEqual( + UTF8BytesString[] a, int aSize, UTF8BytesString[] b) { + if (aSize != b.length) { + return false; + } + for (int i = 0; i < aSize; i++) { + if (!a[i].equals(b[i])) { + return false; + } + } + return true; } private static boolean peerTagsEqual(UTF8BytesString[] a, int aSize, List b) { @@ -638,6 +792,10 @@ AggregateEntry toEntry() { } else { snapshottedPeerTags = Arrays.asList(Arrays.copyOf(peerTagsBuffer, n)); } + UTF8BytesString[] snapshottedAdditionalTags = + additionalTagsSize == 0 + ? EMPTY_ADDITIONAL_TAGS + : Arrays.copyOf(additionalTagsBuffer, additionalTagsSize); return new AggregateEntry( keyHash, resource, @@ -652,7 +810,8 @@ AggregateEntry toEntry() { httpStatusCode, synthetic, traceRoot, - snapshottedPeerTags); + snapshottedPeerTags, + snapshottedAdditionalTags); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java index dae8e1b33f4..e4e09de2f0e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateTable.java @@ -27,7 +27,7 @@ final class AggregateTable { private final Hashtable.Entry[] buckets; private final int maxAggregates; - private final AggregateEntry.Canonical canonical = new AggregateEntry.Canonical(); + private final AggregateEntry.Canonical canonical; private int size; /** @@ -38,8 +38,13 @@ final class AggregateTable { private int evictCursor; AggregateTable(int maxAggregates) { + this(maxAggregates, AdditionalTagsSchema.EMPTY); + } + + AggregateTable(int maxAggregates, AdditionalTagsSchema additionalTagsSchema) { this.buckets = Support.create(maxAggregates, Support.MAX_RATIO); this.maxAggregates = maxAggregates; + this.canonical = new AggregateEntry.Canonical(additionalTagsSchema); } int size() { @@ -65,6 +70,7 @@ AggregateEntry findOrInsert(SpanSnapshot snapshot) { return candidate; } } + // Miss path. if (size >= maxAggregates && !evictOneStale()) { return null; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index d809d452522..2e2ad578ead 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -42,6 +42,8 @@ final class Aggregator implements Runnable { justification = "the field is confined to the agent thread running the Aggregator") private boolean dirty; + private final AdditionalTagsSchema additionalTagsSchema; + Aggregator( MetricWriter writer, MessagePassingQueue inbox, @@ -58,7 +60,49 @@ final class Aggregator implements Runnable { reportingIntervalTimeUnit, DEFAULT_SLEEP_MILLIS, healthMetrics, - onReportCycle); + onReportCycle, + AdditionalTagsSchema.EMPTY); + } + + Aggregator( + MetricWriter writer, + MessagePassingQueue inbox, + int maxAggregates, + long reportingInterval, + TimeUnit reportingIntervalTimeUnit, + HealthMetrics healthMetrics, + AdditionalTagsSchema additionalTagsSchema) { + this( + writer, + inbox, + maxAggregates, + reportingInterval, + reportingIntervalTimeUnit, + DEFAULT_SLEEP_MILLIS, + healthMetrics, + null, + additionalTagsSchema); + } + + Aggregator( + MetricWriter writer, + MessagePassingQueue inbox, + int maxAggregates, + long reportingInterval, + TimeUnit reportingIntervalTimeUnit, + HealthMetrics healthMetrics, + Runnable onReportCycle, + AdditionalTagsSchema additionalTagsSchema) { + this( + writer, + inbox, + maxAggregates, + reportingInterval, + reportingIntervalTimeUnit, + DEFAULT_SLEEP_MILLIS, + healthMetrics, + onReportCycle, + additionalTagsSchema); } Aggregator( @@ -69,10 +113,12 @@ final class Aggregator implements Runnable { TimeUnit reportingIntervalTimeUnit, long sleepMillis, HealthMetrics healthMetrics, - Runnable onReportCycle) { + Runnable onReportCycle, + AdditionalTagsSchema additionalTagsSchema) { this.writer = writer; this.inbox = inbox; - this.aggregates = new AggregateTable(maxAggregates); + this.additionalTagsSchema = additionalTagsSchema; + this.aggregates = new AggregateTable(maxAggregates, additionalTagsSchema); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; this.healthMetrics = healthMetrics; @@ -174,7 +220,7 @@ private void report(long when, SignalItem signal) { writer, (w, entry) -> { w.add(entry); - entry.clear(); + entry.clearAggregate(); }); // note that this may do IO and block writer.finishBucket(); @@ -185,6 +231,10 @@ private void report(long when, SignalItem signal) { } dirty = false; } + // Reset cardinality handlers each report cycle so the per-field budgets refresh. + // Safe to call on this (aggregator) thread; handlers are HashMap-based and not thread-safe. + AggregateEntry.resetCardinalityHandlers(); + additionalTagsSchema.resetHandlers(); signal.complete(); if (skipped) { log.debug("skipped metrics reporting because no points have changed"); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java index c44b71edd74..7118131ddee 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ClientStatsAggregator.java @@ -71,6 +71,7 @@ public final class ClientStatsAggregator implements MetricsAggregator, EventList private final TimeUnit reportingIntervalTimeUnit; private final DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; + private final AdditionalTagsSchema additionalTagsSchema; private final boolean includeEndpointInMetrics; /** @@ -100,6 +101,7 @@ public ClientStatsAggregator( this( config.getWellKnownTags(), config.getMetricsIgnoredResources(), + AdditionalTagsSchema.from(config.getTraceStatsAdditionalTags(), healthMetrics), sharedCommunicationObjects.featuresDiscovery(config), healthMetrics, new OkHttpSink( @@ -126,6 +128,7 @@ public ClientStatsAggregator( this( wellKnownTags, ignoredResources, + AdditionalTagsSchema.EMPTY, features, healthMetric, sink, @@ -136,6 +139,31 @@ public ClientStatsAggregator( includeEndpointInMetrics); } + ClientStatsAggregator( + WellKnownTags wellKnownTags, + Set ignoredResources, + AdditionalTagsSchema additionalTagsSchema, + DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, + Sink sink, + int maxAggregates, + int queueSize, + boolean includeEndpointInMetrics) { + this( + wellKnownTags, + ignoredResources, + additionalTagsSchema, + features, + healthMetric, + sink, + maxAggregates, + queueSize, + 10, + SECONDS, + includeEndpointInMetrics); + } + + /** Test-only: defaults to no additional tags schema. */ ClientStatsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, @@ -148,7 +176,34 @@ public ClientStatsAggregator( TimeUnit timeUnit, boolean includeEndpointInMetrics) { this( + wellKnownTags, ignoredResources, + AdditionalTagsSchema.EMPTY, + features, + healthMetric, + sink, + maxAggregates, + queueSize, + reportingInterval, + timeUnit, + includeEndpointInMetrics); + } + + ClientStatsAggregator( + WellKnownTags wellKnownTags, + Set ignoredResources, + AdditionalTagsSchema additionalTagsSchema, + DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, + Sink sink, + int maxAggregates, + int queueSize, + long reportingInterval, + TimeUnit timeUnit, + boolean includeEndpointInMetrics) { + this( + ignoredResources, + additionalTagsSchema, features, healthMetric, sink, @@ -160,6 +215,7 @@ public ClientStatsAggregator( includeEndpointInMetrics); } + /** Test-only: defaults to no additional tags schema. */ ClientStatsAggregator( Set ignoredResources, DDAgentFeaturesDiscovery features, @@ -171,7 +227,34 @@ public ClientStatsAggregator( long reportingInterval, TimeUnit timeUnit, boolean includeEndpointInMetrics) { + this( + ignoredResources, + AdditionalTagsSchema.EMPTY, + features, + healthMetric, + sink, + metricWriter, + maxAggregates, + queueSize, + reportingInterval, + timeUnit, + includeEndpointInMetrics); + } + + ClientStatsAggregator( + Set ignoredResources, + AdditionalTagsSchema additionalTagsSchema, + DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, + Sink sink, + MetricWriter metricWriter, + int maxAggregates, + int queueSize, + long reportingInterval, + TimeUnit timeUnit, + boolean includeEndpointInMetrics) { this.ignoredResources = ignoredResources; + this.additionalTagsSchema = additionalTagsSchema; this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); this.features = features; @@ -185,7 +268,8 @@ public ClientStatsAggregator( reportingInterval, timeUnit, healthMetric, - this::resetCardinalityHandlers); + this::resetCardinalityHandlers, + additionalTagsSchema); this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator); this.reportingInterval = reportingInterval; this.reportingIntervalTimeUnit = timeUnit; @@ -337,6 +421,8 @@ private boolean publish(CoreSpan span, boolean isTopLevel, PeerTagSchema peer spanPeerTagSchema = null; } + String[] additionalTagValues = captureAdditionalTagValues(span); + SpanSnapshot snapshot = new SpanSnapshot( span.getResourceName(), @@ -353,6 +439,7 @@ private boolean publish(CoreSpan span, boolean isTopLevel, PeerTagSchema peer httpMethod, httpEndpoint, grpcStatusCode, + additionalTagValues, tagAndDuration); if (!inbox.offer(snapshot)) { healthMetrics.onStatsInboxFull(); @@ -361,6 +448,31 @@ private boolean publish(CoreSpan span, boolean isTopLevel, PeerTagSchema peer return error; } + /** + * Captures the span's additional-metric-tag values into a {@code String[]} parallel to {@code + * additionalTagsSchema.names}. Returns {@code null} when no additional tags are configured or + * none of the configured keys are set on the span. Raw values only -- length cap and + * canonicalization run on the aggregator thread. + */ + private String[] captureAdditionalTagValues(CoreSpan span) { + String[] names = additionalTagsSchema.names; + int n = names.length; + if (n == 0) { + return null; + } + String[] values = null; + for (int i = 0; i < n; i++) { + Object v = span.unsafeGetTag(names[i]); + if (v != null) { + if (values == null) { + values = new String[n]; + } + values[i] = v.toString(); + } + } + return values; + } + /** * One-time producer-side bootstrap of {@link #cachedPeerTagSchema}. Synchronized double-check * guards against two producers racing on the very first publish; after this returns, {@code diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java index f7d91343d4b..70d1b9787bd 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricCardinalityLimits.java @@ -70,4 +70,10 @@ private MetricCardinalityLimits() {} * peer tag gets its own handler at this limit. */ static final int PEER_TAG_VALUE = 512; + + /** + * Distinct values per additional-tag key (e.g. distinct values of a span-derived primary tag). + * Each configured additional tag gets its own {@link TagCardinalityHandler} at this limit. + */ + static final int ADDITIONAL_TAG_VALUE = 512; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java index 6c6b6c53060..258e38177d8 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java @@ -38,9 +38,9 @@ * the indexing even if the current schema is replaced between capture and consumption. * *

Thread-safety: all mutable state ({@link TagCardinalityHandler}s, the warn-once set, - * and {@link #state}) is exercised only on the aggregator thread. {@link - * #names} and {@link #handlers} are final and safe to read from any thread; producer threads access - * them through the volatile {@code cachedPeerTagSchema} reference in {@link ClientStatsAggregator}. + * and {@link #state}) is exercised only on the aggregator thread. {@link #names} and {@link + * #handlers} are final and safe to read from any thread; producer threads access them through the + * volatile {@code cachedPeerTagSchema} reference in {@link ClientStatsAggregator}. */ final class PeerTagSchema { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 2bd7ea54887..f20b2364014 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -43,6 +43,7 @@ public final class SerializingMetricWriter implements MetricWriter { private static final byte[] IS_TRACE_ROOT = "IsTraceRoot".getBytes(ISO_8859_1); private static final byte[] SPAN_KIND = "SpanKind".getBytes(ISO_8859_1); private static final byte[] PEER_TAGS = "PeerTags".getBytes(ISO_8859_1); + private static final byte[] ADDITIONAL_METRIC_TAGS = "AdditionalMetricTags".getBytes(ISO_8859_1); private static final byte[] HTTP_METHOD = "HTTPMethod".getBytes(ISO_8859_1); private static final byte[] HTTP_ENDPOINT = "HTTPEndpoint".getBytes(ISO_8859_1); private static final byte[] GRPC_STATUS_CODE = "GRPCStatusCode".getBytes(ISO_8859_1); @@ -157,12 +158,15 @@ public void add(AggregateEntry entry) { final boolean hasHttpEndpoint = entry.hasHttpEndpoint(); final boolean hasServiceSource = entry.hasServiceSource(); final boolean hasGrpcStatusCode = entry.hasGrpcStatusCode(); + final UTF8BytesString[] additionalTags = entry.getAdditionalTags(); + final boolean hasAdditionalTags = additionalTags.length > 0; final int mapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) - + (hasGrpcStatusCode ? 1 : 0); + + (hasGrpcStatusCode ? 1 : 0) + + (hasAdditionalTags ? 1 : 0); writer.startMap(mapSize); @@ -198,6 +202,18 @@ public void add(AggregateEntry entry) { writer.writeUTF8(peerTag); } + // Emit AdditionalMetricTags as repeated string of pre-built "key:value" UTF8BytesStrings, in + // schema (alphabetical-by-key) order. Skip null slots (tags the span didn't set). The whole + // field is omitted when no non-null slots exist so customers who don't configure additional + // metric tags pay zero payload overhead. + if (hasAdditionalTags) { + writer.writeUTF8(ADDITIONAL_METRIC_TAGS); + writer.startArray(additionalTags.length); + for (UTF8BytesString slot : additionalTags) { + writer.writeUTF8(slot); + } + } + if (hasServiceSource) { writer.writeUTF8(SERVICE_SOURCE); writer.writeUTF8(entry.getServiceSource()); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java index 7b44029cfcd..33bd496c342 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -40,6 +40,14 @@ final class SpanSnapshot implements InboxItem { final String httpEndpoint; final String grpcStatusCode; + /** + * Additional metric tag values captured from the span, parallel to {@code + * additionalTagsSchema.names}. A {@code null} entry means the span didn't have that tag set. + * {@code null} (the whole array) when no additional tags are configured or none were set on the + * span. Length cap is applied on the aggregator thread; the producer carries raw values only. + */ + final String[] additionalTagValues; + /** Duration in nanoseconds, OR-ed with {@code ERROR_TAG} / {@code TOP_LEVEL_TAG} as needed. */ final long tagAndDuration; @@ -58,6 +66,7 @@ final class SpanSnapshot implements InboxItem { String httpMethod, String httpEndpoint, String grpcStatusCode, + String[] additionalTagValues, long tagAndDuration) { this.resourceName = resourceName; this.serviceName = serviceName; @@ -73,6 +82,7 @@ final class SpanSnapshot implements InboxItem { this.httpMethod = httpMethod; this.httpEndpoint = httpEndpoint; this.grpcStatusCode = grpcStatusCode; + this.additionalTagValues = additionalTagValues; this.tagAndDuration = tagAndDuration; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index 6f9a263f593..efe9b72383f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -107,6 +107,14 @@ public void onStatsInboxFull() {} */ public void onTagCardinalityBlocked(String tag, long count) {} + /** + * Fires once per additional-metric-tag value that gets masked with {@code blocked_by_tracer} -- + * either because it exceeded the per-value length cap, or because the bucket's stat-entry + * cardinality cap was reached. {@code tagKey} identifies which configured key the masked value + * belonged to. + */ + public void onAdditionalTagValueCardinalityBlocked(String tagKey) {} + /** * @return Human-readable summary of the current health metrics. */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index c00ef708abf..1a09c311bd4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -99,6 +99,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder statsAggregateDropped = new LongAdder(); private final LongAdder statsInboxFull = new LongAdder(); + private final LongAdder statsAdditionalTagBlocked = new LongAdder(); private final StatsDClient statsd; private final long interval; @@ -368,6 +369,11 @@ public void onTagCardinalityBlocked(String tag, long count) { statsd.count("stats.tag_cardinality_blocked", count, new String[] {"tag:" + tag}); } + @Override + public void onAdditionalTagValueCardinalityBlocked(String tagKey) { + statsAdditionalTagBlocked.increment(); + } + @Override public void close() { if (null != cancellation) { @@ -387,7 +393,7 @@ private static class Flush implements AgentTaskScheduler.Task(Arrays.asList("region", "tenant_id", "az"))); + assertArrayEquals(new String[] {"az", "region", "tenant_id"}, schema.names); + } + + @Test + void schemaDedupesAndCapsAtMaxTagKeys() { + LinkedHashSet configured = new LinkedHashSet<>(); + // 12 distinct keys, more than MAX_ADDITIONAL_TAG_KEYS (10). Sort by alphabetical, drop the + // last 2. + for (int i = 0; i < 12; i++) { + configured.add(String.format("tag%02d", i)); + } + AdditionalTagsSchema schema = AdditionalTagsSchema.from(configured); + assertEquals(AdditionalTagsSchema.MAX_ADDITIONAL_TAG_KEYS, schema.size()); + assertArrayEquals( + new String[] { + "tag00", "tag01", "tag02", "tag03", "tag04", "tag05", "tag06", "tag07", "tag08", "tag09" + }, + schema.names); + } + + @Test + void blockedSentinelsArePerKey() { + AdditionalTagsSchema schema = + AdditionalTagsSchema.from(new LinkedHashSet<>(Arrays.asList("region", "tenant_id"))); + assertEquals("region:blocked_by_tracer", schema.blockedSentinel(0).toString()); + assertEquals("tenant_id:blocked_by_tracer", schema.blockedSentinel(1).toString()); + } + + @Test + void rejectsEmptyAndColonContainingKeys() { + AdditionalTagsSchema schema = + AdditionalTagsSchema.from( + new LinkedHashSet<>(Arrays.asList("region", "", "bad:key", "tenant_id"))); + // Empty key and "bad:key" are dropped; only the two valid keys remain. + assertArrayEquals(new String[] {"region", "tenant_id"}, schema.names); + } + + @Test + void allInvalidKeysReturnsEmptySchema() { + AdditionalTagsSchema schema = + AdditionalTagsSchema.from(new LinkedHashSet<>(Arrays.asList("", "also:bad"))); + assertSame(AdditionalTagsSchema.EMPTY, schema); + } + + @Test + void emptySchemaHasZeroSizeAndEmptyArrays() { + AdditionalTagsSchema schema = AdditionalTagsSchema.EMPTY; + assertEquals(0, schema.size()); + assertTrue(schema.names.length == 0); + assertTrue(schema.blockedSentinels.length == 0); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java index 7d1aa023a69..a48e7d94d3e 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateEntryTest.java @@ -41,7 +41,7 @@ void clearResetsAllCounters() { entry.recordOneDuration(5L); entry.recordOneDuration(ERROR_TAG | 6L); entry.recordOneDuration(TOP_LEVEL_TAG | 7L); - entry.clear(); + entry.clearAggregate(); assertEquals(0, entry.getDuration()); assertEquals(0, entry.getErrorCount()); assertEquals(0, entry.getTopLevelCount()); diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableAdditionalTagsTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableAdditionalTagsTest.java new file mode 100644 index 00000000000..20402a662c5 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableAdditionalTagsTest.java @@ -0,0 +1,84 @@ +package datadog.trace.common.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; + +import datadog.metrics.agent.AgentMeter; +import datadog.metrics.api.statsd.StatsDClient; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.metrics.impl.MonitoringImpl; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +class AggregateTableAdditionalTagsTest { + + @BeforeAll + static void initAgentMeter() { + MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS); + AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY); + monitoring.newTimer("test.init"); + } + + @Test + void distinctAdditionalTagValuesYieldDistinctEntries() { + AdditionalTagsSchema schema = schemaFor("region"); + AggregateTable table = newTable(schema); + + AggregateEntry usEast = table.findOrInsert(snapshot(schema, "us-east-1")); + AggregateEntry euWest = table.findOrInsert(snapshot(schema, "eu-west-1")); + + assertNotNull(usEast); + assertNotNull(euWest); + assertNotSame(usEast, euWest); + assertEquals(2, table.size()); + } + + @Test + void sameAdditionalTagValuesShareEntry() { + AdditionalTagsSchema schema = schemaFor("region"); + AggregateTable table = newTable(schema); + + AggregateEntry first = table.findOrInsert(snapshot(schema, "us-east-1")); + AggregateEntry second = table.findOrInsert(snapshot(schema, "us-east-1")); + + assertSame(first, second); + assertEquals(1, table.size()); + } + + // ---------- helpers ---------- + + private static AdditionalTagsSchema schemaFor(String... names) { + return AdditionalTagsSchema.from(new LinkedHashSet<>(Arrays.asList(names))); + } + + private static AggregateTable newTable(AdditionalTagsSchema schema) { + return new AggregateTable(256, schema); + } + + private static SpanSnapshot snapshot(AdditionalTagsSchema schema, String regionValue) { + String[] values = new String[schema.size()]; + values[0] = regionValue; + return new SpanSnapshot( + "resource", + "service", + "operation", + null, + "web", + (short) 200, + false, + true, + "client", + null, + null, + null, + null, + null, + values, + 0L); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java index 8694892ea84..f9597a91c38 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AggregateTableTest.java @@ -292,6 +292,7 @@ private static SpanSnapshot nullServiceKindSnapshot(String service, String spanK null, null, null, + null, 0L); } @@ -312,6 +313,7 @@ private static SpanSnapshot nullableSnapshot( null, null, null, + null, 0L); } @@ -368,6 +370,7 @@ SpanSnapshot build() { null, null, null, + null, tagAndDuration); } } diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java new file mode 100644 index 00000000000..c29e880ec90 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java @@ -0,0 +1,225 @@ +package datadog.trace.common.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import datadog.metrics.agent.AgentMeter; +import datadog.metrics.api.Histograms; +import datadog.metrics.api.statsd.StatsDClient; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.metrics.impl.MonitoringImpl; +import datadog.trace.api.WellKnownTags; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; + +/** + * Verifies the {@code AdditionalMetricTags} wire field: shape is {@code repeated string} of {@code + * "key:value"} entries; field is omitted when no slots are populated; null slots within a populated + * array are skipped. + */ +class SerializingMetricWriterAdditionalTagsTest { + + @BeforeAll + static void initAgentMeter() { + MonitoringImpl monitoring = new MonitoringImpl(StatsDClient.NO_OP, 1, TimeUnit.SECONDS); + AgentMeter.registerIfAbsent(StatsDClient.NO_OP, monitoring, DDSketchHistograms.FACTORY); + monitoring.newTimer("test.init"); + Histograms.register(DDSketchHistograms.FACTORY); + } + + @Test + void additionalMetricTagsEmittedWhenSet() throws Exception { + AdditionalTagsSchema schema = + AdditionalTagsSchema.from(new LinkedHashSet<>(Arrays.asList("region", "tenant_id"))); + AggregateTable table = newTable(schema); + + AggregateEntry entry = table.findOrInsert(snapshot(schema, "us-east-1", "acme-corp")); + entry.recordOneDuration(1L); + + List additionalTags = parseAdditionalMetricTags(writeBucket(table)); + assertEquals(2, additionalTags.size()); + // Order matches schema (alphabetical): region first, then tenant_id. + assertEquals("region:us-east-1", additionalTags.get(0)); + assertEquals("tenant_id:acme-corp", additionalTags.get(1)); + } + + @Test + void additionalMetricTagsFieldOmittedWhenNoneSet() throws Exception { + // Schema configured, but the span doesn't set any of the configured tags. + AdditionalTagsSchema schema = + AdditionalTagsSchema.from(new LinkedHashSet<>(Arrays.asList("region"))); + AggregateTable table = newTable(schema); + + AggregateEntry entry = table.findOrInsert(snapshot(schema, new String[] {null})); + entry.recordOneDuration(1L); + + assertFalse( + containsKey(writeBucket(table), "AdditionalMetricTags"), + "AdditionalMetricTags should be omitted when no slots are populated"); + } + + @Test + void additionalMetricTagsSkipsNullSlots() throws Exception { + AdditionalTagsSchema schema = + AdditionalTagsSchema.from(new LinkedHashSet<>(Arrays.asList("region", "tenant_id"))); + AggregateTable table = newTable(schema); + + // Set only tenant_id; leave region null. + AggregateEntry entry = + table.findOrInsert( + snapshot( + schema, + new String[] { + /*region*/ + null, /*tenant_id*/ "acme-corp" + })); + entry.recordOneDuration(1L); + + List additionalTags = parseAdditionalMetricTags(writeBucket(table)); + assertEquals(1, additionalTags.size()); + assertEquals("tenant_id:acme-corp", additionalTags.get(0)); + } + + // ---------- helpers ---------- + + private static AggregateTable newTable(AdditionalTagsSchema schema) { + return new AggregateTable(64, schema); + } + + private static SpanSnapshot snapshot(AdditionalTagsSchema schema, String... values) { + String[] padded = new String[schema.size()]; + if (values != null) { + System.arraycopy(values, 0, padded, 0, Math.min(values.length, padded.length)); + } + return new SpanSnapshot( + "resource", + "service", + "operation", + null, + "web", + (short) 200, + false, + true, + "client", + null, + null, + null, + null, + null, + padded, + 0L); + } + + /** + * Serializes a single-bucket payload via {@link SerializingMetricWriter} into a {@link + * ByteBuffer}. The test's {@link CapturingSink} keeps the produced buffer for unpack. + */ + private static ByteBuffer writeBucket(AggregateTable table) { + CapturingSink sink = new CapturingSink(); + SerializingMetricWriter writer = + new SerializingMetricWriter( + new WellKnownTags("rid", "host", "env", "svc", "ver", "lang"), sink, 64 * 1024); + writer.startBucket(table.size(), 0L, TimeUnit.SECONDS.toNanos(10)); + table.forEach(writer::add); + writer.finishBucket(); + return sink.buffer; + } + + private static List parseAdditionalMetricTags(ByteBuffer payload) throws Exception { + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(payload); + // Top-level map: skip to the per-stat entry. Structure mirrors SerializingMetricWriterTest. + int topMapSize = unpacker.unpackMapHeader(); + for (int i = 0; i < topMapSize; i++) { + String key = unpacker.unpackString(); + if ("Stats".equals(key)) { + // Stats is a 1-element array of buckets; each bucket has Start/Duration/Stats(=array of + // per-metric maps). + unpacker.unpackArrayHeader(); + int bucketMapSize = unpacker.unpackMapHeader(); + for (int j = 0; j < bucketMapSize; j++) { + String bucketKey = unpacker.unpackString(); + if ("Stats".equals(bucketKey)) { + int statsCount = unpacker.unpackArrayHeader(); + // Take the first stat entry and walk its map looking for AdditionalMetricTags. + for (int k = 0; k < statsCount; k++) { + int entryMapSize = unpacker.unpackMapHeader(); + for (int m = 0; m < entryMapSize; m++) { + String entryKey = unpacker.unpackString(); + if ("AdditionalMetricTags".equals(entryKey)) { + int n = unpacker.unpackArrayHeader(); + List result = new ArrayList<>(n); + for (int p = 0; p < n; p++) { + result.add(unpacker.unpackString()); + } + return result; + } else { + unpacker.skipValue(); + } + } + if (k == 0) break; // only inspecting the first stat entry + } + } else { + unpacker.skipValue(); + } + } + } else { + unpacker.skipValue(); + } + } + return new ArrayList<>(); + } + + private static boolean containsKey(ByteBuffer payload, String soughtKey) throws Exception { + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(payload); + int topMapSize = unpacker.unpackMapHeader(); + for (int i = 0; i < topMapSize; i++) { + String key = unpacker.unpackString(); + if ("Stats".equals(key)) { + unpacker.unpackArrayHeader(); + int bucketMapSize = unpacker.unpackMapHeader(); + for (int j = 0; j < bucketMapSize; j++) { + String bucketKey = unpacker.unpackString(); + if ("Stats".equals(bucketKey)) { + int statsCount = unpacker.unpackArrayHeader(); + for (int k = 0; k < statsCount; k++) { + int entryMapSize = unpacker.unpackMapHeader(); + for (int m = 0; m < entryMapSize; m++) { + String entryKey = unpacker.unpackString(); + if (soughtKey.equals(entryKey)) { + return true; + } + unpacker.skipValue(); + } + if (k == 0) return false; // checked the only entry + } + } else { + unpacker.skipValue(); + } + } + } else { + unpacker.skipValue(); + } + } + return false; + } + + private static final class CapturingSink implements Sink { + ByteBuffer buffer; + + @Override + public void register(EventListener listener) {} + + @Override + public void accept(int messageCount, ByteBuffer buffer) { + this.buffer = buffer.duplicate(); + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 93509159392..bed41fc69f9 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -414,6 +414,7 @@ import static datadog.trace.api.config.GeneralConfig.TRACER_METRICS_BUFFERING_ENABLED; import static datadog.trace.api.config.GeneralConfig.TRACER_METRICS_ENABLED; import static datadog.trace.api.config.GeneralConfig.TRACER_METRICS_IGNORED_RESOURCES; +import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_ADDITIONAL_TAGS; import static datadog.trace.api.config.GeneralConfig.TRACER_METRICS_MAX_AGGREGATES; import static datadog.trace.api.config.GeneralConfig.TRACER_METRICS_MAX_PENDING; import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_CARDINALITY_LIMITS_ENABLED; @@ -5171,6 +5172,10 @@ public Set getMetricsIgnoredResources() { return tryMakeImmutableSet(configProvider.getList(TRACER_METRICS_IGNORED_RESOURCES)); } + public Set getTraceStatsAdditionalTags() { + return tryMakeImmutableSet(configProvider.getList(TRACE_STATS_ADDITIONAL_TAGS)); + } + public String getEnv() { // intentionally not thread safe if (env == null) { diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index 7f230aa65db..a5c0317e578 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -10609,6 +10609,14 @@ "aliases": [] } ], + "DD_TRACE_STATS_ADDITIONAL_TAGS": [ + { + "version": "A", + "type": "list", + "default": null, + "aliases": [] + } + ], "DD_TRACE_STATUS404DECORATOR_ENABLED": [ { "version": "A",