Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ThisBuild / organization := "app.softnetwork"

name := "notification"

ThisBuild / version := "0.9.2"
ThisBuild / version := "0.9-SNAPSHOT"

ThisBuild / scalaVersion := scala212

Expand Down
10 changes: 10 additions & 0 deletions common/src/main/protobuf/model/notifications.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ message Mail{
repeated NotificationStatusResult results = 18;
required NotificationType type = 19 [default = MAIL_TYPE];
repeated Attachment attachments = 20;
// Story 13.7 — cross-service correlation id, set by the producer (e.g. license-server
// mailNotification) from the originating checkout/issuance flow. Read at send time to emit the
// notification_sent / notification_failed audit line. ScalaPB → correlationId: Option[String].
optional string correlation_id = 21;
}

message SMS{
Expand All @@ -112,6 +116,8 @@ message SMS{
required NotificationStatus status = 17 [default = Pending];
repeated NotificationStatusResult results = 18;
required NotificationType type = 19 [default = SMS_TYPE];
// Story 13.7 — cross-service correlation id (see Mail.correlation_id).
optional string correlation_id = 20;
}

message Push{
Expand All @@ -136,6 +142,8 @@ message Push{
required int32 badge = 22 [default = 0];
optional string sound = 23;
optional string application = 24;
// Story 13.7 — cross-service correlation id (see Mail.correlation_id).
optional string correlation_id = 25;
}

message Ws{
Expand All @@ -156,6 +164,8 @@ message Ws{
repeated NotificationStatusResult results = 18;
required NotificationType type = 19 [default = WS_TYPE];
optional string channel = 20;
// Story 13.7 — cross-service correlation id (see Mail.correlation_id).
optional string correlation_id = 21;
}

message PushPayload{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ trait Notification extends State with NotificationDecorator {

def results: Seq[NotificationStatusResult]

/** Story 13.7 — cross-service correlation id, set by the producer (e.g. license-server
* mailNotification) from the originating flow; read at send time to emit the audit line. Backed
* by the `optional string correlation_id` proto field on each notification message.
*/
def correlationId: Option[String]

def removeOnSuccess(): Option[Boolean] = None

def removeAfterMaxTries(): Option[Boolean] = None
Expand Down Expand Up @@ -66,6 +72,8 @@ trait NotificationDecorator { _: Notification =>
} else {
recipients().mkString(",")
}

def withCorrelationId(correlationId: String): Notification with NotificationDecorator
}

trait NotificationAckDecorator { _: NotificationAck =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import app.softnetwork.scheduler.message.SchedulerEvents.{
}
import app.softnetwork.scheduler.message.{AddSchedule, RemoveSchedule}
import app.softnetwork.scheduler.model.Schedule
import app.softnetwork.persistence.audit.AuditLog
import app.softnetwork.persistence.now
import app.softnetwork.persistence.typed._
import app.softnetwork.notification.message._
Expand Down Expand Up @@ -45,6 +46,11 @@ trait NotificationBehavior[T <: Notification]

private[this] val delay = 1

/** Story 13.7 — structured audit trail. service = "notification"; correlationId is threaded as
* data on the notification (proto field, exposed on the Notification trait), never via MDC.
*/
private[this] lazy val audit: AuditLog = AuditLog("notification")

/** Set event tags, which will be used in persistence query
*
* @param entityId
Expand Down Expand Up @@ -469,18 +475,30 @@ trait NotificationBehavior[T <: Notification]
)
Effect
.persist(events)
.thenRun(_ =>
{
.thenRun(_ => {
// Story 13.7 — emit the terminal audit line, but only when an actual send/ack happened this
// round (skip deferred / already-sent no-ops, and the non-terminal Pending outcome). The
// correlation id rides the notification (proto field on the Notification trait); the channel
// is its NotificationType.
if (maybeAckWithNumberOfRetries.isDefined) {
val cid = updatedNotification.correlationId.getOrElse("-")
val channel = updatedNotification.`type`.name
updatedNotification.status match {
case Rejected => NotificationRejected(entityId, updatedNotification.results)
case Undelivered => NotificationUndelivered(entityId, updatedNotification.results)
case Sent => NotificationSent(entityId, updatedNotification.results)
case Delivered => NotificationDelivered(entityId, updatedNotification.results)
case _ => NotificationPending(entityId, updatedNotification.results)
case Sent | Delivered =>
audit.event(cid, "notification_sent", "channel" -> channel)
case Undelivered | Rejected =>
audit.event(cid, "notification_failed", "channel" -> channel)
case _ => // still pending — no terminal audit line yet
}
}
~> replyTo
)
(updatedNotification.status match {
case Rejected => NotificationRejected(entityId, updatedNotification.results)
case Undelivered => NotificationUndelivered(entityId, updatedNotification.results)
case Sent => NotificationSent(entityId, updatedNotification.results)
case Delivered => NotificationDelivered(entityId, updatedNotification.results)
case _ => NotificationPending(entityId, updatedNotification.results)
}) ~> replyTo
})
}

}
4 changes: 2 additions & 2 deletions project/Versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ object Versions {

val logback = "1.4.14"

val genericPersistence = "0.8.5"
val genericPersistence = "0.9-SNAPSHOT"

val scheduler = "0.8.0"
val scheduler = "0.8-SNAPSHOT"

val scalatest = "3.2.16"
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package app.softnetwork.notification.handlers
import app.softnetwork.notification.message._
import app.softnetwork.notification.model.Attachment
import app.softnetwork.notification.scalatest.SimpleMailNotificationsTestKit
import app.softnetwork.persistence.audit.AuditLog
import ch.qos.logback.classic.{Logger => LogbackLogger}
import ch.qos.logback.classic.spi.ILoggingEvent
import ch.qos.logback.core.read.ListAppender
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -142,6 +146,34 @@ class SimpleMailNotificationsHandlerSpec
assert(probe.receiveMessage().schedule.uuid == s"MailNotification#$uuid#NotificationTimerKey")
}

"emit a notification_sent audit line carrying the correlation id (Story 13.7)" in {
val cid = "notif-corr-13-7"
val auditLogger = LoggerFactory.getLogger(AuditLog.LoggerName).asInstanceOf[LogbackLogger]
val appender = new ListAppender[ILoggingEvent]()
appender.start()
auditLogger.addAppender(appender)
try {
val uuid = "auditedSend"
this ? (uuid, SendNotification(generateMail(uuid).withCorrelationId(cid))) await {
case n: NotificationSent => n.uuid shouldBe uuid
case _ => fail()
}
// the emission runs in the behavior's thenRun BEFORE the reply, so it is captured by now
val sentLine =
appender.list.toArray.toList.collect { case e: ILoggingEvent => e }.find { e =>
val fields = e.getArgumentArray.map(_.toString).toSet
fields.contains("event_type=notification_sent") && fields.contains(
s"correlation_id=$cid"
)
}
assert(sentLine.isDefined, "expected a notification_sent audit line carrying the cid")
sentLine.get.getArgumentArray.map(_.toString) should contain("service=notification")
} finally {
auditLogger.detachAppender(appender)
appender.stop()
}
}

}

}
Loading