Skip to content

Remove channel registry#1371

Open
simonvoelcker wants to merge 16 commits intofrequenz-floss:v1.x.xfrom
simonvoelcker:remove-channel-registry
Open

Remove channel registry#1371
simonvoelcker wants to merge 16 commits intofrequenz-floss:v1.x.xfrom
simonvoelcker:remove-channel-registry

Conversation

@simonvoelcker
Copy link
Copy Markdown
Contributor

@simonvoelcker simonvoelcker commented Mar 5, 2026

This removes the ChannelRegistry class.

Background

This is part of an initiative to close unused channels to save resources. The ChannelRegistry was a de-facto singleton class for storing channels by name. Request objects such as ComponentMetricRequest or ReportRequest have a get_channel_name() method, and the returned channel name acted as a contract between sender and receiver - With the name, they could obtain the same channel from the ChannelRegistry. By getting rid of the registry, we are moving towards a target state where only references to senders and receivers, but no channels are being held.

Implementation

ComponentMetricRequest

ComponentMetricRequest objects represent a request to receive metrics from a component - Sometimes also referred to as telemetry stream. To serve such a request without relying on the ChannelRegistry, the request got a new attribute: telem_stream_sender: Sender[Receiver[Sample[Quantity]]]. This is the sender of a one-shot channel that is used to send the receiver of the telemetry stream back to the original requester - Similar to a callback function. The receiver of a component metric request should therefore create (and own) the channel that sends the metric data, and use the one-shot channel to send a receiver of that channel "back".

In a few places, component metric requests were created and sent from synchronous functions which returned the receiver. To make this work with the round-trip outlined above, an additional channel was added that merely forwards the metric data via a Pipe. This allows us to immediately return a sender while the channel that serves the metric data is being constructed asynchronously.

ReportRequest

ReportRequest objects are a different kind of request that was handled similarly.

Channel ownership

The receivers of request objects create the channels and own them. For instance, in case of ComponentMetricRequests, MicrogridApiSource now maintains a dictionary (channel_lookup: dict[str, Broadcast]) as a replacement for the channel registry. It is necessary to have this lookup dictionary to avoid creating duplicate channels for similar metric requests. In this case, a new receiver is created from an existing channel found in the lookup dictionary.

Forwarding channels are owned by their creator (e.g. GridFrequency). These (and the Pipes that connect them to the other channels) are currently never closed. Closing these when all receivers or senders are closed is a TODO for a later iteration.

@github-actions github-actions bot added part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.) part:data-pipeline Affects the data pipeline part:core Affects the SDK core components (data structures, etc.) part:microgrid Affects the interactions with the microgrid labels Mar 5, 2026
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch 2 times, most recently from f4e6125 to 9186ad8 Compare March 5, 2026 19:16
@github-actions github-actions bot added the part:docs Affects the documentation label Mar 5, 2026
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch 4 times, most recently from a878b7a to 22cd7c5 Compare March 6, 2026 14:42
pyproject.toml Outdated
"frequenz-microgrid-component-graph >= 0.3.4, < 0.4",
"frequenz-client-common >= 0.3.6, < 0.4.0",
"frequenz-channels >= 1.6.1, < 2.0.0",
"frequenz-channels @ git+https://github.com/shsms/frequenz-channels-python.git@oneshot",
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I won't merge before this is merged 👀

@simonvoelcker simonvoelcker marked this pull request as ready for review March 6, 2026 15:17
@simonvoelcker simonvoelcker requested a review from a team as a code owner March 6, 2026 15:17
@simonvoelcker simonvoelcker requested review from florian-wagner-frequenz and removed request for a team March 6, 2026 15:17
@llucax
Copy link
Copy Markdown
Contributor

llucax commented Mar 9, 2026

Sorry to be a party pooper. I just started with the review and looking at the data sourcing actor and I think we might be overcomplicating things.

Why can't we just reverse the ownership, and make the requester own the "data receiving channel"? So instead of creating a oneshot channel1, the requester creates a normal channel and gives the other end the sender. That's it! No back and forth sending and receiving receivers.

The only disadvantage of this approach is we can't de-duplicate channels so to speak, so no broadcasting like we do now (the data sourcing actor creates one channel per metric-cid combination I think, and create new receivers for any new party asking for data). We can still de-duplicate by subscribing to the microgrid API once and send sending to multiple senders, but we'll still have a channel per sender. But for broadcast channels we actually use one queue per receiver, so I guess in terms of memory usage it shouldn't change much.

So, this is a very early thought, I didn't think of all the other potential issues, but by looking at the changes it really feels to me that we are introducing non-trivial complexities with this change. For example the new requirement to being async to subscribe to data is gone with this approach. I think that's my main concern, we are converting something that is inherently sync (create a new receiver to receive data) into something async, and that's very disruptive.

I will continue with my review, but I wanted to mention this early, as we are planning to build on top of this, I want to raise it as soon as possible.

Footnotes

  1. See @sahas-subramanian-frequenz? I didn't thought about it and wrote basically create_oneshot_channel(), that must be a good name :trollface:

@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch 2 times, most recently from 38fb0ab to a28fab2 Compare March 9, 2026 13:56
@florian-wagner-frequenz florian-wagner-frequenz requested review from llucax and shsms and removed request for florian-wagner-frequenz March 9, 2026 13:56
@simonvoelcker
Copy link
Copy Markdown
Contributor Author

@llucax I completely agree that this change adds regrettable complexity, and we should explore ways of avoiding it.

It is my understanding that channels must be de-duplicated somehow because we might be dealing with hundreds of components in practice. The channels must then be owned centrally, whether that's near the source or in a singleton registry. Maybe we can revisit the registry idea, but store senders there instead of channels? That would avoid the oneshot channels.

Regarding sync-vs-async: The creation of channels was async before and stays async, no? We're sending ComponentMetricRequests either way, it's just that in one case we discussed today (GridFrequency.new_receiver) a receiver must be returned synchronously, so we have to use the forwarding channel.

@llucax
Copy link
Copy Markdown
Contributor

llucax commented Mar 9, 2026

The problem with the singleton channel registry for me is when things go wrong it is very hard to find out when a channel was first created, who requested it, etc. This problem came up and was very hard to debug. This is why I created the option to log when a channel is created in the registry (including the stack trace), but that turns out to pollute the logs excessively when enabled, as you can't really filter it by channel name, so it is printed for all channels.

This is why I think getting rid of the channel registry is a good goal, but I think we agree that not at all cost.

About the de-duplication, as I mentioned, it might be worth taking a deeper look at how much overhead really means having one channel per metric-cid vs. the current approach of having one receiver per metric-cid. As for broadcast channels we have a queue per receiver, and the sender just iterates over all receiver and put the new message there, I'm not sure it will be a big difference between having 1 broadcast channel with N receivers vs having N "unicast channels" (for now I guess we'll use a broadcast channel too.

Note

We explored in the past implementing a unicast channel but I think it was as slow as the broadcast channel so we kept the broadcast channel only. I guess for unicast we can just use a asyncio queue, not sure if that was what we used to implement the unicast channel.

Copy link
Copy Markdown
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will send this partial review, as all commits were split while I was mid-review. Will wait until things settle down to continue the review.

@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch from 905bf05 to 11d0e26 Compare March 16, 2026 13:43
Copy link
Copy Markdown
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this now needs rebasing and it is using the old oneshot names, so I will wait until it is updated for the final review. Can you requrest a review from me again when it is ready @simonvoelcker?

Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
…lected

Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
Signed-off-by: Simon Völcker <simon.voelcker@frequenz.com>
@simonvoelcker simonvoelcker force-pushed the remove-channel-registry branch from f455d48 to 4eb6278 Compare March 17, 2026 13:21
@simonvoelcker
Copy link
Copy Markdown
Contributor Author

@llucax rebased. Regarding the "old" names, IIRC the plan was to make the changes in two steps. And I'll have to wait until something is merged in the channels repo anyways, so I can update the dependency.

Copy link
Copy Markdown
Contributor

@llucax llucax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, done with the review. Mostly minor comments but a few important too:

  1. Make sure we always reply requests to the oneshot channel (unless I'm missing something)
  2. Add tests for 1
  3. Avoid importing private symbols (underscore prefix)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, did you run the benchmark before and after the changes to see if there are any significant performance changes?


from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError
from frequenz.channels import Broadcast, OneshotChannel, Receiver, ReceiverStoppedError
from frequenz.channels._broadcast import BroadcastReceiver
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from frequenz.channels._broadcast import BroadcastReceiver
from frequenz.channels import BroadcastReceiver

Always use public imports. If the symbols are not exported publicly, we need to fix it in the channels library.

Comment on lines +9 to +10
from frequenz.channels._broadcast import BroadcastReceiver
from frequenz.channels._oneshot import OneshotSender
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once more, I will stop it here as I guess you get the point, but this should be fixed in all files.

Suggested change
from frequenz.channels._broadcast import BroadcastReceiver
from frequenz.channels._oneshot import OneshotSender
from frequenz.channels import BroadcastReceiver
from frequenz.channels import OneshotSender

name=channel_name
)
self._channels[channel_name] = telem_stream
await request.telem_stream_sender.send(telem_stream.new_receiver())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be outside of the if block? If the channel already exists, we should probably send the receiver too via the oneshot channel right? Or am I missing something?

If what I say is correct, we probably need to add a test for this case.

requests: dict[Metric | TransitionalMetric, list[ComponentMetricRequest]],
) -> list[tuple[Callable[[Any], float], list[Sender[Sample[Quantity]]]]]:
) -> list[tuple[Callable[[Any], float], list[BroadcastSender[Sample[Quantity]]]]]:
"""Get channel senders from the channel registry for each requested metric.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Channel registry" here. It might be worth doing a global search for "channel.*registry" or something like that to make sure to remove all mentions of the channel registry.

# If we are already handling this request, answer the request by sending a
# new receiver from the existing channel.
if data_sink_channel := self._data_sink_channels.get(request_channel_name):
await request.telem_stream_sender.send(data_sink_channel.new_receiver())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to send the reply only in one place, to make it more obvious that we always do, but I guess in this case it would mean removing this short-circuit and put all the following code inside the if block instead, just to send the reply after the if block, so I'm not sure it is worth it/it makes it more readable, so I'm good with leaving it as it is, just bringing it up in case you have other ideas.

self._priority = priority
self._report_stream_receiver: Receiver[Receiver[_Report]] | None = None
# Keep a reference to prevent garbage collector from destroying pipe
self._pipe: Pipe[_Report] | None = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so the battery pool also needs a pipe, is not just the grid frequency? I see the create_task to hack our way outside the sync world already existed so I guess it is the same case as for grid frequency, but here we are wrapping a @Property, so we'll need to change it to a method too to make it async in the future.

So, there is also no stopping of the _pipe for now, ever, right? I guess there is no stopping of the pools either, so we can pretend the problem doesn't exist 😆

Also I see this happens for all the pools.

)
await self._request_sender.send(component_metric_request)
receiver = await telem_stream_receiver.receive()
return self._map_frequency_samples(receiver)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this a bit confusing, I thought you were somehow mapping values here (so the mapping happened right now, not for each received value), not calling receiver.map().

I would change the function so you can write this instead

Suggested change
return self._map_frequency_samples(receiver)
return receiver.map(receiver, self._map_frequency_samples)

That makes the intention more clear IMHO.

await resampling_actor._resampler.stop() # pylint: disable=protected-access


async def test_resubscribe(fake_time: time_machine.Coordinates) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it seems we need this test also for the MicrogridApiSource (although it is going away, so maybe just manual validation is OK for now), and the power manager.


- Fixed stopping formulas: It will now also stop the evaluator and sub-formulas correctly.

- Removed ChannelRegistry. Streams are not being set up using one-shot channels and owned by the data source.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users should not be affected by this change, so we probably should skip adding release notes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

part:core Affects the SDK core components (data structures, etc.) part:data-pipeline Affects the data pipeline part:docs Affects the documentation part:microgrid Affects the interactions with the microgrid part:tests Affects the unit, integration and performance (benchmarks) tests part:tooling Affects the development tooling (CI, deployment, dependency management, etc.)

Projects

Status: To do

Development

Successfully merging this pull request may close these issues.

3 participants