Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ src/scaffolding.config
*.sln.iml

# Visual Studio Code
.vscode
.vscode
2 changes: 2 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<PackageVersion Include="GitHubActionsTestLogger" Version="3.0.1" />
<PackageVersion Include="HdrHistogram" Version="2.5.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.21" />
<PackageVersion Include="MongoDB.Driver" Version="3.6.0" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="8.0.21" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="8.0.2" />
Expand Down Expand Up @@ -78,6 +79,7 @@
<PackageVersion Include="System.Reactive.Linq" Version="6.0.1" />
<PackageVersion Include="System.Reflection.MetadataLoadContext" Version="8.0.1" />
<PackageVersion Include="System.ServiceProcess.ServiceController" Version="8.0.1" />
<PackageVersion Include="Testcontainers.MongoDb" Version="4.3.0" />
<PackageVersion Include="Validar.Fody" Version="1.9.0" />
<PackageVersion Include="Yarp.ReverseProxy" Version="2.3.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;

/// <summary>
/// Stores message bodies on the file system.
/// Useful when message bodies should not be stored in the database.
/// </summary>
class FileSystemBodyStorage : IBodyStorage
{
// TODO: Implement file system body storage
// - Store bodies as files in a configurable directory
// - Use bodyId as filename (with appropriate sanitization)
// - Handle expiration via file timestamps or separate cleanup process

public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
=> throw new NotImplementedException("File system body storage not yet implemented");

public Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
=> throw new NotImplementedException("File system body storage not yet implemented");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using Collections;
using Documents;
using global::MongoDB.Driver;

/// <summary>
/// Reads message bodies from the messageBodies collection.
/// Bodies are written asynchronously by BodyStorageWriter via a channel.
/// </summary>
class MongoBodyStorage(IMongoClientProvider clientProvider) : IBodyStorage
{
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
{
// Bodies are written by BodyStorageWriter, not through IBodyStorage.Store()
return Task.CompletedTask;
}

public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
{
var collection = clientProvider.Database
.GetCollection<MessageBodyDocument>(CollectionNames.MessageBodies);

var filter = Builders<MessageBodyDocument>.Filter.Eq(d => d.Id, bodyId);
var document = await collection.Find(filter)
.FirstOrDefaultAsync(cancellationToken)
.ConfigureAwait(false);

if (document == null)
{
return new StreamResult { HasResult = false };
}

byte[] bodyBytes;
if (document.TextBody != null)
{
bodyBytes = System.Text.Encoding.UTF8.GetBytes(document.TextBody);
}
else if (document.BinaryBody != null)
{
bodyBytes = document.BinaryBody;
}
else
{
return new StreamResult { HasResult = false };
}

return new StreamResult
{
HasResult = true,
Stream = new MemoryStream(bodyBytes),
ContentType = document.ContentType ?? "text/plain",
BodySize = document.BodySize,
Etag = bodyId
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Auditing.BodyStorage;

/// <summary>
/// A no-op body storage implementation used when body storage is disabled.
/// </summary>
class NullBodyStorage : IBodyStorage
{
public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
=> Task.CompletedTask;

public Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
=> Task.FromResult(new StreamResult { HasResult = false });
}
}
23 changes: 23 additions & 0 deletions src/ServiceControl.Audit.Persistence.MongoDB/BodyStorageType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace ServiceControl.Audit.Persistence.MongoDB
{
/// <summary>
/// Specifies where message bodies should be stored.
/// </summary>
public enum BodyStorageType
{
/// <summary>
/// Message bodies are not stored.
/// </summary>
None,

/// <summary>
/// Message bodies are stored in the MongoDB database.
/// </summary>
Database,

/// <summary>
/// Message bodies are stored on the file system.
/// </summary>
FileSystem
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ServiceControl.Audit.Persistence.MongoDB.Collections
{
/// <summary>
/// Constants for MongoDB collection names.
/// </summary>
static class CollectionNames
{
public const string ProcessedMessages = "processedMessages";
public const string KnownEndpoints = "knownEndpoints";
public const string SagaSnapshots = "sagaSnapshots";
public const string FailedAuditImports = "failedAuditImports";
public const string MessageBodies = "messageBodies";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
namespace ServiceControl.Audit.Persistence.MongoDB.CustomChecks
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using global::MongoDB.Bson;

using Microsoft.Extensions.Logging;
using NServiceBus.CustomChecks;

class CheckMongoDbCachePressure(
IMongoClientProvider clientProvider,
MinimumRequiredStorageState stateHolder,
ILogger<CheckMongoDbCachePressure> logger)
: CustomCheck("MongoDB Storage Pressure", "ServiceControl.Audit Health", TimeSpan.FromSeconds(5))
{
public override async Task<CheckResult> PerformCheck(CancellationToken cancellationToken = default)
{
try
{
if (clientProvider.ProductCapabilities.SupportsWiredTigerCacheMetrics)
{
return await CheckWiredTigerCachePressure(cancellationToken).ConfigureAwait(false);
}

return await CheckWriteLatency(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to check MongoDB storage pressure");

// On failure, allow ingestion to continue — don't block on monitoring errors
stateHolder.CanIngestMore = true;
return CheckResult.Failed($"Unable to check MongoDB storage pressure: {ex.Message}");
}
}

async Task<CheckResult> CheckWiredTigerCachePressure(CancellationToken cancellationToken)
{
var serverStatus = await clientProvider.Database
.RunCommandAsync<BsonDocument>(new BsonDocument("serverStatus", 1), cancellationToken: cancellationToken)
.ConfigureAwait(false);

var cache = serverStatus["wiredTiger"].AsBsonDocument["cache"].AsBsonDocument;

var dirtyBytes = cache["tracked dirty bytes in the cache"].ToDouble();
var totalBytes = cache["bytes currently in the cache"].ToDouble();
var maxBytes = cache["maximum bytes configured"].ToDouble();

var dirtyPercentage = maxBytes > 0 ? dirtyBytes / maxBytes * 100 : 0;
var usedPercentage = maxBytes > 0 ? totalBytes / maxBytes * 100 : 0;

logger.LogDebug(
"MongoDB WiredTiger cache - Dirty: {DirtyPercentage:F1}%, Used: {UsedPercentage:F1}%, Dirty bytes: {DirtyBytes:N0}, Total bytes: {TotalBytes:N0}, Max bytes: {MaxBytes:N0}",
dirtyPercentage, usedPercentage, dirtyBytes, totalBytes, maxBytes);

if (dirtyPercentage >= DirtyThresholdPercentage)
{
logger.LogWarning(
"Audit message ingestion paused. MongoDB WiredTiger dirty cache at {DirtyPercentage:F1}% (threshold: {Threshold}%). This indicates write pressure is exceeding the storage engine's ability to flush to disk",
dirtyPercentage, DirtyThresholdPercentage);

stateHolder.CanIngestMore = false;
return CheckResult.Failed(
$"MongoDB WiredTiger dirty cache at {dirtyPercentage:F1}% (threshold: {DirtyThresholdPercentage}%). Ingestion paused to allow the storage engine to recover.");
}

stateHolder.CanIngestMore = true;
return CheckResult.Pass;
}

async Task<CheckResult> CheckWriteLatency(CancellationToken cancellationToken)
{
var sw = Stopwatch.StartNew();
_ = await clientProvider.Database
.RunCommandAsync<BsonDocument>(new BsonDocument("ping", 1), cancellationToken: cancellationToken)
.ConfigureAwait(false);
sw.Stop();

var latencyMs = sw.Elapsed.TotalMilliseconds;
RecordLatency(latencyMs);

var sampleCount = Math.Min(latencyIndex, LatencyWindowSize);
var avgLatency = GetAverageLatency();

logger.LogDebug(
"MongoDB ping latency: {LatencyMs:F0}ms, Rolling average: {AvgLatency:F0}ms (samples: {SampleCount}/{WindowSize})",
latencyMs, avgLatency, sampleCount, LatencyWindowSize);

if (sampleCount >= MinSamplesBeforeThrottling && avgLatency >= LatencyThresholdMs)
{
logger.LogWarning(
"Audit message ingestion paused. MongoDB average response latency at {AvgLatency:F0}ms (threshold: {Threshold}ms). This indicates the database is under pressure",
avgLatency, LatencyThresholdMs);

stateHolder.CanIngestMore = false;
return CheckResult.Failed(
$"MongoDB average response latency at {avgLatency:F0}ms (threshold: {LatencyThresholdMs}ms). Ingestion paused to allow the database to recover.");
}

stateHolder.CanIngestMore = true;
return CheckResult.Pass;
}

void RecordLatency(double latencyMs)
{
latencyWindow[latencyIndex % LatencyWindowSize] = latencyMs;
latencyIndex++;
}

double GetAverageLatency()
{
var count = Math.Min(latencyIndex, LatencyWindowSize);
if (count == 0)
{
return 0;
}

double sum = 0;
for (var i = 0; i < count; i++)
{
sum += latencyWindow[i];
}

return sum / count;
}

// WiredTiger thresholds
const double DirtyThresholdPercentage = 15;

// Latency thresholds
const int LatencyWindowSize = 6; // 30 seconds of history at 5-second intervals
const int MinSamplesBeforeThrottling = 3; // Need at least 15 seconds of data before throttling
const double LatencyThresholdMs = 500;
readonly double[] latencyWindow = new double[LatencyWindowSize];
int latencyIndex;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace ServiceControl.Audit.Persistence.MongoDB.Documents
{
using System.Collections.Generic;
using global::MongoDB.Bson;
using global::MongoDB.Bson.Serialization.Attributes;

class FailedAuditImportDocument
{
[BsonId]
public ObjectId Id { get; set; }

[BsonElement("messageId")]
public string MessageId { get; set; }

[BsonElement("headers")]
public Dictionary<string, string> Headers { get; set; }

[BsonElement("body")]
public byte[] Body { get; set; }

[BsonElement("exceptionInfo")]
public string ExceptionInfo { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace ServiceControl.Audit.Persistence.MongoDB.Documents
{
using System;
using global::MongoDB.Bson.Serialization.Attributes;

class KnownEndpointDocument
{
[BsonId]
public string Id { get; set; }

[BsonElement("name")]
public string Name { get; set; }

[BsonElement("hostId")]
[BsonGuidRepresentation(global::MongoDB.Bson.GuidRepresentation.Standard)]
public Guid HostId { get; set; }

[BsonElement("host")]
public string Host { get; set; }

[BsonElement("lastSeen")]
public DateTime LastSeen { get; set; }

[BsonElement("expiresAt")]
public DateTime ExpiresAt { get; set; }
}
}
Loading