Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions examples/consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,31 @@ async def main() -> None:
# We publish a single message
await js.publish("stream.example.test", "message for stream")

# We use messages() to get async iterator which we
# use to get messages for push_consumer.
async for push_message in await push_consumer.messages():
print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201
await push_message.ack()
break
async with push_consumer.consume() as messages:
async for push_message in messages:
print(f"[FROM_PUSH] {push_message.payload!r}") # noqa: T201
break

# Pull consumers have to request batches of messages.
# Pull consumers have 2 different APIs.
# 1. You can use fetch directly.
# 2. Use async iterator API.

# Here's how to call pull-consumer fetch method.
# It returns a batch of messages.
# However, please be careful, this method has worse opentelemetry
# instrumentation. Because essentailly it's the same as just calling a function.
# with no scope.
for pull_message in await pull_consumer.fetch(max_messages=10):
print(f"[FROM_PULL] {pull_message.payload!r}") # noqa: T201
await pull_message.ack()

# This API is more prefered, because it has better
# Opentelemetry instrumentation.
async with pull_consumer.consume() as messages:
async for message in messages:
print(f"[FROM_PULL] {message.payload!r}") # noqa: T201
break

# Cleanup
await stream.consumers.delete(push_consumer.name)
await stream.consumers.delete(pull_consumer.name)
Expand Down
16 changes: 16 additions & 0 deletions examples/opentelemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from natsrpy.instrumentation import NatsrpyInstrumentor

NatsrpyInstrumentor().instrument(
# If true, then message payload will be attached
# to some spans.
capture_body=False,
# If true, then message headers will be attached
# to some spans.
capture_headers=False,
)

# We also support zero-code instrumentation.
# In case if you're using it, you can specify those parameters
# by setting the following environment variables:
# * `OTEL_PYTHON_NATSRPY_CAPTURE_BODY=true`
# * `OTEL_PYTHON_NATSRPY_CAPTURE_HEADERS=true`
2 changes: 1 addition & 1 deletion python/natsrpy/_natsrpy_rs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SubscriptionCtxManager(Generic[_T]):
"""

def __aenter__(self) -> Future[_T]: ...
async def __aexit__(
def __aexit__(
self,
_exc_type: type[BaseException] | None = None,
_exc_val: BaseException | None = None,
Expand Down
52 changes: 48 additions & 4 deletions python/natsrpy/_natsrpy_rs/js/consumers.pyi
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from asyncio import Future
from datetime import timedelta
from types import TracebackType
from typing import final

from natsrpy._natsrpy_rs.js import JetStreamMessage
Expand All @@ -12,8 +13,11 @@ __all__ = [
"PriorityPolicy",
"PullConsumer",
"PullConsumerConfig",
"PullConsumerContextManager",
"PullConsumerFetcher",
"PushConsumer",
"PushConsumerConfig",
"PushConsumerContextManager",
"ReplayPolicy",
]

Expand Down Expand Up @@ -283,6 +287,28 @@ class MessagesIterator:
def __aiter__(self) -> Self: ...
def __anext__(self) -> Future[JetStreamMessage]: ...

@final
class PushConsumerContextManager:
"""
Context manager for consuming messages from push-based consumer.

This class is used to scope the message consumption.
Mostly used for opentelemetry support.
"""

def __aenter__(self) -> Future[MessagesIterator]:
"""Get an async iterator for consuming messages.

:return: an async iterator over JetStream messages.
"""

def __aexit__(
self,
_exc_type: type[BaseException] | None = None,
_exc_val: BaseException | None = None,
_exc_tb: TracebackType | None = None,
) -> Future[None]: ...

@final
class PushConsumer:
"""A push-based JetStream consumer.
Expand All @@ -298,11 +324,26 @@ class PushConsumer:
def stream_name(self) -> str:
"""Get stream name that this consumer attached to."""

def messages(self) -> Future[MessagesIterator]:
"""Get an async iterator for consuming messages.
def consume(self) -> PushConsumerContextManager:
"""Start consuming messages."""

:return: an async iterator over JetStream messages.
"""
@final
class PullConsumerFetcher:
def __aiter__(self) -> Self:
"""Returns this very object."""

def __anext__(self) -> Future[JetStreamMessage]:
"""Get a next message from the stream."""

@final
class PullConsumerContextManager:
def __aenter__(self) -> Future[PullConsumerFetcher]: ...
def __aexit__(
self,
_exc_type: type[BaseException] | None = None,
_exc_val: BaseException | None = None,
_exc_tb: TracebackType | None = None,
) -> Future[None]: ...

@final
class PullConsumer:
Expand All @@ -319,6 +360,9 @@ class PullConsumer:
def stream_name(self) -> str:
"""Get stream name that this consumer attached to."""

def consume(self) -> PullConsumerContextManager:
"""Start consuming messages."""

def fetch(
self,
max_messages: int | None = None,
Expand Down
6 changes: 3 additions & 3 deletions python/natsrpy/_natsrpy_rs/js/kv.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ class KVConfig:
"""

bucket: str
description: str
description: str | None
max_value_size: int | None
history: int | None
max_age: float | None
max_age: timedelta | None
max_bytes: int | None
storage: StorageType | None
num_replicas: int | None
Expand All @@ -135,7 +135,7 @@ class KVConfig:
mirror_direct: bool | None
compression: bool | None
placement: Placement | None
limit_markers: float | None
limit_markers: timedelta | None

def __new__(
cls,
Expand Down
4 changes: 1 addition & 3 deletions python/natsrpy/_natsrpy_rs/js/stream.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ class SubjectTransform:
source: str
destination: str

def __new__(cls, source: str, destination: str) -> Self: ...

@final
class Source:
"""Configuration for a stream source or mirror origin.
Expand Down Expand Up @@ -454,7 +452,7 @@ class StreamInfo:
"""

config: StreamConfig
created: float
created: int
state: StreamState
cluster: ClusterInfo | None
mirror: SourceInfo | None
Expand Down
35 changes: 34 additions & 1 deletion python/natsrpy/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ async def main() -> None:
"""

import logging
import os
from collections.abc import Collection
from importlib import metadata
from typing import Any

from .js_consumer import JSConsumerInstrumentation
from .js_publish import JSPublishInstrumentation
from .nats_core import NatsCoreInstrumentator

try:
Expand Down Expand Up @@ -68,12 +71,42 @@ def instrumentation_dependencies(self) -> Collection[str]:

def _instrument(self, **kwargs: Any) -> None:
tracer_provider = kwargs.get("tracer_provider")
capture_body = (
os.environ.get(
"OTEL_PYTHON_NATSRPY_CAPTURE_BODY",
str(kwargs.get("capture_body", False)),
).lower()
== "true"
)
capture_headers = (
os.environ.get(
"OTEL_PYTHON_NATSRPY_CAPTURE_HEADERS",
str(kwargs.get("capture_headers", False)),
).lower()
== "true"
)
tracer = trace.get_tracer(
_INSTRUMENTATION_MODULE_NAME,
metadata.version("natsrpy"),
tracer_provider,
)
NatsCoreInstrumentator(tracer).instrument()
NatsCoreInstrumentator(
tracer,
capture_body=capture_body,
capture_headers=capture_headers,
).instrument()
JSConsumerInstrumentation(
tracer,
capture_body=capture_body,
capture_headers=capture_headers,
).instrument()
JSPublishInstrumentation(
tracer,
capture_body=capture_body,
capture_headers=capture_headers,
).instrument()

def _uninstrument(self, **kwargs: Any) -> None:
NatsCoreInstrumentator.uninstrument()
JSConsumerInstrumentation.uninstrument()
JSPublishInstrumentation.uninstrument()
42 changes: 42 additions & 0 deletions python/natsrpy/instrumentation/ctx_manager_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from collections.abc import Callable
from typing import Any, Concatenate

from typing_extensions import ParamSpec
from wrapt import ObjectProxy

_P = ParamSpec("_P")


class AsyncCtxManagerProxy(ObjectProxy): # type: ignore
"""
Proxy object for context managers.

This class wraps a context manager,
wrapping returned values on __aenter__,
and calling __cancel_ctx__ at the exit.
"""

def __init__(
self,
wrapped: Any,
sub_wrappers: dict[type[Any], Callable[Concatenate[Any, _P], Any]],
*args: _P.args,
**kwargs: _P.kwargs,
) -> None:
super().__init__(wrapped)
self._self_sub_args = args
self._self_sub_kwargs = kwargs
self._self_sub = None
self._self_subwrappers = sub_wrappers

async def __aenter__(self) -> Any:
sub: Any = await self.__wrapped__.__aenter__()
sub_wrapper = self._self_subwrappers.get(type(sub))
if sub_wrapper:
sub = sub_wrapper(sub, *self._self_sub_args, **self._self_sub_kwargs)
self._self_sub = sub
return sub

async def __aexit__(self, *args: object, **kwargs: dict[Any, Any]) -> Any:
if self._self_sub and hasattr(self._self_sub, "__cancel_ctx__"):
self._self_sub.__cancel_ctx__(*args, **kwargs)
Loading
Loading