This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
make build # Build server binary to bin/snmcp
make docker-build # Build local Docker image (both streamnative/mcp-server and streamnative/snmcp tags)
make docker-build-push # Build and push multi-platform image (linux/amd64,linux/arm64)
make docker-build-multiplatform # Build multi-platform image locally
make docker-buildx-setup # Setup Docker buildx for multi-platform builds
make license-check # Check license headers
make license-fix # Fix license headers
go test -race ./... # Run all tests with race detection
go test -race ./pkg/mcp/builders/... # Run specific package tests
go test -v -run TestName ./pkg/... # Run a single testThe StreamNative MCP Server implements the Model Context Protocol using the mark3labs/mcp-go library to enable AI agents to interact with Apache Kafka, Apache Pulsar, and StreamNative Cloud resources.
Client Request → MCP Server (pkg/mcp/server.go)
↓
SSE/stdio transport layer (pkg/cmd/mcp/)
↓
Tool Handler (from builders)
↓
Context Functions (pkg/mcp/ctx.go)
↓
Service Client (Kafka/Pulsar/SNCloud)
-
Server & Sessions (
pkg/mcp/server.go)Serverstruct holdsMCPServer,KafkaSession,PulsarSession, andSNCloudSession- Sessions provide lazy-initialized clients for each service
- Context functions (
pkg/mcp/ctx.go) inject/retrieve sessions from request context
-
Tool Builders Framework (
pkg/mcp/builders/)ToolBuilderinterface:GetName(),GetRequiredFeatures(),BuildTools(),Validate()BaseToolBuilderprovides common feature validation logicToolRegistrymanages all tool builders with concurrent-safe registrationToolBuildConfigspecifies build parameters (ReadOnly, Features, Options)ToolMetadatadescribes tool information (Name, Version, Description, Category, Tags)
-
Tool Builders Organization
builders/kafka/- Kafka-specific tool builders (connect, consume, groups, partitions, produce, schema_registry, topics)builders/pulsar/- Pulsar-specific tool builders (brokers, brokers_stats, cluster, functions, functions_worker, namespace, namespace_policy, nsisolationpolicy, packages, resourcequotas, schema, sinks, sources, subscription, tenant, topic, topic_policy)builders/streamnative/- StreamNative Cloud tool builders
-
Tool Registration (
pkg/mcp/*_tools.go)- Each
*_tools.gofile creates a builder, builds tools, and adds them to the server - Tools are conditionally registered based on
--featuresflag - Feature constants defined in
pkg/mcp/features.go
- Each
-
PFTools - Functions as Tools (
pkg/mcp/pftools/)PulsarFunctionManagerdynamically converts Pulsar Functions to MCP tools- Polls for function changes and auto-registers/unregisters tools
- Circuit breaker pattern (
circuit_breaker.go) for fault tolerance - Schema conversion (
schema.go) for input/output handling
-
Session Management (
pkg/mcp/session/)pulsar_session_manager.go- LRU session cache with TTL cleanup for multi-session mode
-
Transport Layer (
pkg/cmd/mcp/)sse.go- SSE transport with health endpoints (/healthz,/readyz) and auth middlewareserver.go- Stdio transport and common server initialization
- Builder Pattern: Tool builders create tools based on features and read-only mode
- Registry Pattern: ToolRegistry provides centralized management of all builders
- Context Injection: Sessions passed via
context.Contextusing typed keys - Feature Flags: Tools enabled/disabled via string feature identifiers
- Circuit Breaker: PFTools uses failure thresholds to prevent cascading failures
- Multi-Session Pattern: Per-user Pulsar sessions with LRU caching for SSE mode
-
Create Builder in
pkg/mcp/builders/kafka/orpkg/mcp/builders/pulsar/:type MyToolBuilder struct { *builders.BaseToolBuilder } func NewMyToolBuilder() *MyToolBuilder { metadata := builders.ToolMetadata{ Name: "my_tool", Description: "Tool description", Category: "kafka_admin", } features := []string{"kafka-admin", "all", "all-kafka"} return &MyToolBuilder{ BaseToolBuilder: builders.NewBaseToolBuilder(metadata, features), } } func (b *MyToolBuilder) BuildTools(ctx context.Context, config builders.ToolBuildConfig) ([]server.ServerTool, error) { if !b.HasAnyRequiredFeature(config.Features) { return nil, nil } // Build and return tools }
-
Add Feature Constant in
pkg/mcp/features.goif needed -
Create Registration File
pkg/mcp/my_tools.go:func AddMyTools(s *server.MCPServer, readOnly bool, features []string) { builder := kafkabuilders.NewMyToolBuilder() config := builders.ToolBuildConfig{ReadOnly: readOnly, Features: features} tools, _ := builder.BuildTools(context.Background(), config) for _, tool := range tools { s.AddTool(tool.Tool, tool.Handler) } }
-
Get Session in Handler:
session := mcp.GetKafkaSession(ctx) // or GetPulsarSession if session == nil { return mcp.NewToolResultError("session not found"), nil } admin, err := session.GetAdminClient()
Handlers receive sessions via context (see pkg/mcp/ctx.go):
mcp.GetKafkaSession(ctx)→*kafka.Sessionmcp.GetPulsarSession(ctx)→*pulsar.Sessionmcp.GetSNCloudSession(ctx)→*config.Sessionmcp.GetSNCloudOrganization(ctx)→ organization stringmcp.GetSNCloudInstance(ctx)→ instance stringmcp.GetSNCloudCluster(ctx)→ cluster string
From sessions:
session.GetAdminClient()/session.GetAdminV3Client()for Pulsar adminsession.GetPulsarClient()for Pulsar messagingsession.GetAdminClient()for Kafka admin (via franz-go/kadm)
- StreamNative Cloud:
--organization+--key-file - External Kafka:
--use-external-kafka+ Kafka params - External Pulsar:
--use-external-pulsar+ Pulsar params - Multi-Session Pulsar (SSE only):
--use-external-pulsar+--multi-session-pulsar
Pre-configured context: --pulsar-instance + --pulsar-cluster disables context management tools.
When --multi-session-pulsar is enabled (SSE server with external Pulsar only):
- No global PulsarSession: Each request must provide its own token via
Authorization: Bearer <token>header - HTTP 401 on auth failure: Requests without valid tokens are rejected with HTTP 401 Unauthorized
- Per-user session caching: Sessions are cached using LRU with configurable size and TTL
- Session management: See
pkg/mcp/session/pulsar_session_manager.go
Key files:
pkg/cmd/mcp/sse.go- Auth middleware wraps SSEHandler()/MessageHandler(), health endpointspkg/mcp/session/pulsar_session_manager.go- LRU session cache with TTL cleanuppkg/cmd/mcp/server.go- Skips global PulsarSession when multi-session enabled
SSE server exposes health check endpoints:
GET /mcp/healthz- Liveness probe (always returns "ok")GET /mcp/readyz- Readiness probe (always returns "ready")
Available feature flags (defined in pkg/mcp/features.go):
| Feature | Description |
|---|---|
all |
Enable all features |
all-kafka |
All Kafka features |
all-pulsar |
All Pulsar features |
kafka-client |
Kafka produce/consume |
kafka-admin |
Kafka admin operations (all admin tools) |
kafka-admin-schema-registry |
Schema Registry |
kafka-admin-kafka-connect |
Kafka Connect |
kafka-admin-topics |
Manage Kafka topics |
kafka-admin-partitions |
Manage Kafka partitions |
kafka-admin-groups |
Manage Kafka consumer groups |
pulsar-admin |
Pulsar admin operations (all admin tools) |
pulsar-client |
Pulsar produce/consume |
pulsar-admin-brokers |
Manage Pulsar brokers |
pulsar-admin-brokers-status |
Pulsar broker status |
pulsar-admin-broker-stats |
Access Pulsar broker statistics |
pulsar-admin-clusters |
Manage Pulsar clusters |
pulsar-admin-functions |
Manage Pulsar Functions |
pulsar-admin-functions-worker |
Manage Pulsar Function workers |
pulsar-admin-namespaces |
Manage Pulsar namespaces |
pulsar-admin-namespace-policy |
Configure namespace policies |
pulsar-admin-ns-isolation-policy |
Manage namespace isolation policies |
pulsar-admin-packages |
Manage Pulsar packages |
pulsar-admin-resource-quotas |
Configure resource quotas |
pulsar-admin-schemas |
Manage Pulsar schemas |
pulsar-admin-subscriptions |
Manage Pulsar subscriptions |
pulsar-admin-tenants |
Manage Pulsar tenants |
pulsar-admin-topics |
Manage Pulsar topics |
pulsar-admin-sinks |
Manage Pulsar IO sinks |
pulsar-admin-sources |
Manage Pulsar Sources |
pulsar-admin-topic-policy |
Configure topic policies |
streamnative-cloud |
StreamNative Cloud context management |
functions-as-tools |
Dynamic Pulsar Functions as MCP tools |
The project includes a Helm chart for Kubernetes deployment at charts/snmcp/:
# Basic installation
helm install snmcp ./charts/snmcp \
--set pulsar.webServiceURL=http://pulsar.example.com:8080
# With TLS
helm install snmcp ./charts/snmcp \
--set pulsar.webServiceURL=https://pulsar:8443 \
--set pulsar.tls.enabled=true \
--set pulsar.tls.secretName=pulsar-tlsThe chart runs MCP Server in Multi-Session Pulsar mode with authentication via Authorization: Bearer <token> header.
The project includes generated SDK packages:
sdk/sdk-apiserver/- StreamNative Cloud API server clientsdk/sdk-kafkaconnect/- Kafka Connect client
- Wrap errors:
fmt.Errorf("failed to X: %w", err) - Return tool errors:
mcp.NewToolResultError("message") - Check session nil before operations
- For PFTools, use circuit breaker to handle repeated failures