Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .claude/scheduled_tasks.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"sessionId":"0776f672-ce66-4052-b399-2ac0d76a9665","pid":91166,"procStart":"Thu May 7 08:09:44 2026","acquiredAt":1778142475272}
102 changes: 96 additions & 6 deletions playbooks/robusta_playbooks/common_actions.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,71 @@
from collections import defaultdict
from string import Template
from typing import Dict, Optional
import logging
import re
from typing import Dict, List, Optional

from robusta.api import ActionParams, ExecutionBaseEvent, Finding, FindingSeverity, action
from pydantic import BaseModel

from robusta.api import (
ActionParams,
EnrichmentType,
ExecutionBaseEvent,
Finding,
FindingSeverity,
SlackAnnotations,
TableBlock,
action,
)
from robusta.core.reporting.blocks import TableBlockFormat
from robusta.utils.parsing import format_event_templated_string


class FindingLabelRule(BaseModel):
"""
A single rule that derives a label value from finding subject fields, modelled after Prometheus relabel_config.

:var source_fields: One or more source fields whose values are concatenated (with ``separator``) to form
the input string. Supported fields: ``namespace``, ``name``, ``node``, ``kind``, or dotted paths
``labels.<key>``, ``annotations.<key>``.
:var regex: A Python ``re.fullmatch`` pattern tested against the concatenated source value.
Capture groups are available as ``$1``, ``$2`` … in ``replacement``.
:var target_label: The label key to set on the finding when the regex matches.
:var replacement: The value to write to ``target_label``. Use ``$1`` / ``$2`` … for capture groups.
:var separator: String used to join multiple source values before matching. Defaults to ``;``.
:example source_fields: ["namespace"]
:example regex: "infra-.*|kube-system"
:example target_label: "team"
:example replacement: "infra-team"
"""

source_fields: List[str]
regex: str
target_label: str
replacement: str
separator: str = ";"


class FindingOverrides(ActionParams):
"""
:var title: Overriding finding title. Title can be templated with name/namespace/kind/node of the resource, if applicable
:var description: Overriding finding description. Description can be templated with name/namespace/kind/node of the resource, if applicable
:var severity: Overriding finding severity. Allowed values: DEBUG, INFO, LOW, HIGH
:var finding_label_rules: A list of rules (modelled after Prometheus ``relabel_config``) that derive label
values from finding subject fields and attach them to the finding. Rules are evaluated in order; later
rules may overwrite earlier ones. Supported source fields: ``namespace``, ``name``, ``node``, ``kind``,
``labels.<key>``, ``annotations.<key>``.
:example severity: DEBUG
:example title: Resource $kind/$namespace/$name is in trouble
:example finding_label_rules: |
- source_fields: [namespace]
regex: 'infra-.*|kube-system'
target_label: team
replacement: infra-team
"""

title: Optional[str] = None
description: Optional[str] = None
severity: Optional[str] = None
aggregation_key: Optional[str] = None
finding_label_rules: Optional[List[FindingLabelRule]] = None


@action
Expand All @@ -35,11 +82,54 @@ def customise_finding(event: ExecutionBaseEvent, params: FindingOverrides):
subject = event.get_subject()
title = format_event_templated_string(subject, params.title) if params.title else None
description = format_event_templated_string(subject, params.description) if params.description else None

aggregation_key = format_event_templated_string(subject, params.aggregation_key) if params.aggregation_key else None

event.override_finding_attributes(title, description, severity, aggregation_key)

if params.finding_label_rules:
source_map: Dict[str, str] = {
"namespace": subject.namespace or "",
"name": subject.name or "",
"node": subject.node or "",
"kind": subject.subject_type.value if subject.subject_type else "",
**{f"labels.{k}": v for k, v in subject.labels.items()},
**{f"annotations.{k}": v for k, v in subject.annotations.items()},
}

labels_to_inject: Dict[str, str] = {}
for rule in params.finding_label_rules:
source_value = rule.separator.join(source_map.get(src, "") for src in rule.source_fields)
try:
m = re.fullmatch(rule.regex, source_value)
except re.error:
logging.warning(
"[customise_finding] invalid finding_label_rules regex %r for target_label %r — skipping",
rule.regex,
rule.target_label,
)
continue
if m:
replacement = rule.replacement
for i, group in enumerate(m.groups(), 1):
replacement = replacement.replace(f"${i}", group or "")
labels_to_inject[rule.target_label] = replacement

if labels_to_inject:
event.inject_finding_labels(labels_to_inject)
event.add_enrichment(
[
TableBlock(
[[k, v] for (k, v) in labels_to_inject.items()],
["label", "value"],
table_format=TableBlockFormat.vertical,
table_name="*labels*",
),
],
annotations={SlackAnnotations.ATTACHMENT: True},
enrichment_type=EnrichmentType.alert_labels,
title="labels",
)


class FindingFields(ActionParams):
"""
Expand All @@ -52,7 +142,7 @@ class FindingFields(ActionParams):
|
Generally, each instance of create_finding in your playbooks should specify a unique Aggregation Key, like "Crashing Pod" or "OOMKill".
|
Aggregation Keys should generally not include Pod names or other strings that change. If you include dynamic data in the Aggregation Key, each unique Aggregation Key will create its own grouping.
Aggregation Keys should generally not include Pod names or other strings that change. If you include dynamic data in the Aggregation Key, each unique Aggregation Key will create it's own grouping.
:var description: Finding description. Description can be templated
:var severity: Finding severity. Allowed values: DEBUG, INFO, LOW, HIGH

Expand Down
5 changes: 5 additions & 0 deletions src/robusta/core/model/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ def override_finding_attributes(
if aggregation_key:
finding.aggregation_key = aggregation_key

def inject_finding_labels(self, labels: Dict[str, str]):
for sink in self.named_sinks:
for finding in self.sink_findings[sink]:
finding.subject.labels.update(labels)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def extend_description(self, text: str):
for sink in self.named_sinks:
for finding in self.sink_findings[sink]:
Expand Down
2 changes: 2 additions & 0 deletions src/robusta/core/reporting/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ def __init__(
add_silence_url: bool = False,
silence_labels: Dict[Any, Any] = None,
) -> None:
if subject is None:
subject = FindingSubject()
self.id: uuid.UUID = uuid.uuid4()
self.title = title
self.finding_type = finding_type
Expand Down
216 changes: 216 additions & 0 deletions tests/test_customise_finding_relabel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
from robusta.core.model.events import ExecutionBaseEvent
from robusta.core.reporting.base import Finding, FindingSeverity, FindingSubject
from robusta.core.reporting.consts import FindingSubjectType
from playbooks.robusta_playbooks.common_actions import FindingLabelRule, FindingOverrides, customise_finding


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _make_event(subject: FindingSubject) -> ExecutionBaseEvent:
finding = Finding(
title="Test Finding",
aggregation_key="TestKey",
severity=FindingSeverity.HIGH,
subject=subject,
)
event = ExecutionBaseEvent(named_sinks=["default"])
event.sink_findings["default"].append(finding)
event.get_subject = lambda: subject
return event


def _finding(event: ExecutionBaseEvent) -> Finding:
return event.sink_findings["default"][0]


def _run(subject: FindingSubject, rules: list) -> Finding:
event = _make_event(subject)
params = FindingOverrides(finding_label_rules=[FindingLabelRule(**r) for r in rules])
customise_finding(event, params)
return _finding(event)


# ---------------------------------------------------------------------------
# Basic match / no-match
# ---------------------------------------------------------------------------

def test_namespace_match_sets_label():
subject = FindingSubject(name="pod", namespace="infra-monitoring")
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "team", "replacement": "infra-team"},
])
assert finding.subject.labels["team"] == "infra-team"


def test_namespace_no_match_leaves_label_absent():
subject = FindingSubject(name="pod", namespace="app-prod")
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "team", "replacement": "infra-team"},
])
assert "team" not in finding.subject.labels


def test_alternation_regex_matches_second_branch():
subject = FindingSubject(name="pod", namespace="kube-system")
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-.*|kube-system", "target_label": "team", "replacement": "infra-team"},
])
assert finding.subject.labels["team"] == "infra-team"


# ---------------------------------------------------------------------------
# Capture-group replacement
# ---------------------------------------------------------------------------

def test_capture_group_replacement():
subject = FindingSubject(name="pod", namespace="team-backend")
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "team-(.*)", "target_label": "team", "replacement": "$1"},
])
assert finding.subject.labels["team"] == "backend"


def test_multiple_capture_groups():
subject = FindingSubject(name="pod", namespace="eu-backend-prod")
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "(\\w+)-(\\w+)-(\\w+)", "target_label": "env", "replacement": "$3-$1"},
])
assert finding.subject.labels["env"] == "prod-eu"


# ---------------------------------------------------------------------------
# Source: name, kind, node
# ---------------------------------------------------------------------------

def test_source_from_name():
subject = FindingSubject(name="payments-worker", namespace="default")
finding = _run(subject, [
{"source_fields": ["name"], "regex": "payments-.*", "target_label": "team", "replacement": "payments"},
])
assert finding.subject.labels["team"] == "payments"


def test_source_from_kind():
subject = FindingSubject(name="pod", namespace="default", subject_type=FindingSubjectType.TYPE_POD)
finding = _run(subject, [
{"source_fields": ["kind"], "regex": "pod", "target_label": "resource_type", "replacement": "pod"},
])
assert finding.subject.labels["resource_type"] == "pod"


# ---------------------------------------------------------------------------
# Source: pod labels, annotations, namespace_labels
# ---------------------------------------------------------------------------

def test_source_from_pod_label():
subject = FindingSubject(name="pod", namespace="default", labels={"app": "payments"})
finding = _run(subject, [
{"source_fields": ["labels.app"], "regex": "payments", "target_label": "team", "replacement": "payments-team"},
])
assert finding.subject.labels["team"] == "payments-team"


def test_source_from_annotation():
subject = FindingSubject(name="pod", namespace="default", annotations={"owner": "platform"})
finding = _run(subject, [
{"source_fields": ["annotations.owner"], "regex": "platform", "target_label": "team", "replacement": "platform-team"},
])
assert finding.subject.labels["team"] == "platform-team"


# ---------------------------------------------------------------------------
# Multiple source_fields (concatenation)
# ---------------------------------------------------------------------------

def test_multiple_source_fields_concatenated():
subject = FindingSubject(name="pod", namespace="eu", labels={"env": "prod"})
finding = _run(subject, [
{"source_fields": ["namespace", "labels.env"], "regex": "eu;prod", "target_label": "region", "replacement": "eu-prod"},
])
assert finding.subject.labels["region"] == "eu-prod"


def test_custom_separator():
subject = FindingSubject(name="pod", namespace="eu", labels={"env": "prod"})
finding = _run(subject, [
{"source_fields": ["namespace", "labels.env"], "separator": "/", "regex": "eu/prod", "target_label": "region", "replacement": "eu-prod"},
])
assert finding.subject.labels["region"] == "eu-prod"


# ---------------------------------------------------------------------------
# Multiple rules (ordering & overwrite)
# ---------------------------------------------------------------------------

def test_multiple_rules_both_match():
subject = FindingSubject(name="pod", namespace="infra-db")
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "team", "replacement": "infra"},
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "cost_center", "replacement": "cc-infra"},
])
assert finding.subject.labels["team"] == "infra"
assert finding.subject.labels["cost_center"] == "cc-infra"


def test_later_rule_overwrites_earlier():
subject = FindingSubject(name="pod", namespace="infra-db")
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "team", "replacement": "first"},
{"source_fields": ["namespace"], "regex": "infra-db", "target_label": "team", "replacement": "second"},
])
assert finding.subject.labels["team"] == "second"


def test_only_matching_rules_apply():
subject = FindingSubject(name="pod", namespace="app-prod")
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "team", "replacement": "infra"},
{"source_fields": ["namespace"], "regex": "app-.*", "target_label": "team", "replacement": "app"},
])
assert finding.subject.labels["team"] == "app"


# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------

def test_no_namespace_does_not_crash():
subject = FindingSubject(name="pod", namespace=None)
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "team", "replacement": "infra"},
])
assert "team" not in finding.subject.labels


def test_missing_source_label_key_treated_as_empty():
subject = FindingSubject(name="pod", namespace="default")
finding = _run(subject, [
{"source_fields": ["labels.nonexistent"], "regex": "", "target_label": "team", "replacement": "x"},
])
assert finding.subject.labels["team"] == "x"


def test_existing_pod_label_not_overwritten_when_no_match():
subject = FindingSubject(name="pod", namespace="app-prod", labels={"team": "original"})
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "team", "replacement": "infra"},
])
assert finding.subject.labels["team"] == "original"


def test_no_rules_is_noop():
subject = FindingSubject(name="pod", namespace="infra-x", labels={"team": "original"})
event = _make_event(subject)
customise_finding(event, FindingOverrides())
assert _finding(event).subject.labels["team"] == "original"


def test_invalid_regex_is_skipped_and_does_not_crash():
subject = FindingSubject(name="pod", namespace="infra-db", labels={"team": "original"})
finding = _run(subject, [
{"source_fields": ["namespace"], "regex": "infra-[", "target_label": "team", "replacement": "broken"},
{"source_fields": ["namespace"], "regex": "infra-.*", "target_label": "team", "replacement": "infra"},
])
assert finding.subject.labels["team"] == "infra"
Loading