diff --git a/CHANGELOG.md b/CHANGELOG.md index e3ffb82b..5b27aad2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +* Added Stream Realtime Database Methods and Component * Added type hints to all public methods and functions ### Changed diff --git a/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/code.py b/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/code.py new file mode 100644 index 00000000..5eb74187 --- /dev/null +++ b/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/code.py @@ -0,0 +1,131 @@ +# r: compas_xr>=2.0.0 +""" +Component to stream data from Realtime Database. + +Streams updates from Firebase Realtime Database using a background worker. + +COMPAS XR v1.0.0 +""" + +import time + +import Grasshopper +from compas_eve.ghpython import BackgroundWorker + +from compas_xr.realtime_database import RealtimeDatabase + + +def start_rtdb_stream(worker, config_filepath, rtdb_path, return_full_data): + worker.config_filepath = config_filepath + worker.rtdb_path = rtdb_path + worker.return_full_data = bool(return_full_data) + worker.db = RealtimeDatabase(config_filepath) + worker.stream_id = None + worker.update_count = 0 + worker.display_message("Connecting...") + + def on_message(message): + evt = message.get("event") + pth = message.get("path") + raw_dat = message.get("data") + + if worker.return_full_data: + # Pull the full current subtree from the subscribed path. + try: + dat = worker.db.get_data(worker.rtdb_path) + except Exception: + dat = raw_dat + else: + # Use raw event delta payload. + dat = raw_dat + + worker.update_count += 1 + worker.update_result((evt, pth, dat), delay=1) + + mode = "full" if worker.return_full_data else "delta" + worker.display_message("Received Update #{} ({})".format(worker.update_count, mode)) + + stream_obj = worker.db.stream_data(rtdb_path, on_message) + worker.stream_id = getattr(stream_obj, "_stream_id", None) + + mode = "full" if worker.return_full_data else "delta" + worker.display_message("Streaming... waiting for updates ({})".format(mode)) + + while not worker.has_requested_cancellation(): + time.sleep(0.1) + + return None + + +def stop_rtdb_stream(worker): + # Called by worker.dispose(). + if hasattr(worker, "db") and worker.db and hasattr(worker, "stream_id") and worker.stream_id: + try: + worker.db.close_stream(worker.stream_id) + except Exception as e: + worker.display_message("Stop error: {}".format(e)) + worker.display_message("Stopped") + + +class StreamRealtimeDatabaseComponent(Grasshopper.Kernel.GH_ScriptInstance): + def RunScript(self, config_filepath, path, stream, return_full_data): + event = None + event_path = None + data = None + status = "stopped" + + if stream is None: + stream = False + + if return_full_data is None: + return_full_data = True + + if not stream: + BackgroundWorker.stop_instance_by_component(ghenv) # noqa: F821 + return event, event_path, data, status + + if not config_filepath or not path: + status = "error: provide config_filepath and path" + return event, event_path, data, status + + worker = BackgroundWorker.instance_by_component( + ghenv, # noqa: F821 + start_rtdb_stream, + dispose_function=stop_rtdb_stream, + auto_set_done=False, + force_new=False, + args=(config_filepath, path, return_full_data), + ) + + must_restart = False + if hasattr(worker, "config_filepath") and hasattr(worker, "rtdb_path"): + if worker.config_filepath != config_filepath or worker.rtdb_path != path: + must_restart = True + elif hasattr(worker, "return_full_data") and worker.return_full_data != bool(return_full_data): + must_restart = True + + if must_restart: + worker = BackgroundWorker.instance_by_component( + ghenv, # noqa: F821 + start_rtdb_stream, + dispose_function=stop_rtdb_stream, + auto_set_done=False, + force_new=True, + args=(config_filepath, path, return_full_data), + ) + + if not worker.is_working(): + worker.start_work() + + if hasattr(worker, "result") and worker.result: + event, event_path, data = worker.result + + if worker.is_working(): + mode = "full" if bool(return_full_data) else "delta" + status = "streaming ({}, {})".format(getattr(worker, "stream_id", None), mode) + elif worker.is_done(): + status = "done" + else: + status = "idle" + + return event, event_path, data, status diff --git a/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/icon.png b/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/icon.png new file mode 100644 index 00000000..459834b5 Binary files /dev/null and b/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/icon.png differ diff --git a/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/metadata.json b/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/metadata.json new file mode 100644 index 00000000..6a552d6a --- /dev/null +++ b/src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/metadata.json @@ -0,0 +1,51 @@ +{ + "name": "Stream RealtimeDatabase", + "nickname": "RTDB Stream", + "category": "COMPAS XR", + "subcategory": "0 Firebase", + "description": "Stream data from Firebase Realtime Database with optional full-node output.", + "exposure": 2, + + "ghpython": { + "isAdvancedMode": true, + "iconDisplay": 2, + "inputParameters": [ + { + "name": "config_filepath", + "description": "Path to Firebase Realtime Database configuration JSON file." + }, + { + "name": "path", + "description": "Realtime Database path to stream." + }, + { + "name": "stream", + "description": "Turn stream ON or OFF.", + "typeHintID": "bool" + }, + { + "name": "return_full_data", + "description": "If true, output full subtree at subscribed path; if false, output raw event delta.", + "typeHintID": "bool" + } + ], + "outputParameters": [ + { + "name": "event", + "description": "Stream event type (for example put/patch)." + }, + { + "name": "event_path", + "description": "Path in the streamed node where the event occurred." + }, + { + "name": "data", + "description": "Event payload (delta or full subtree depending on return_full_data)." + }, + { + "name": "status", + "description": "Worker and stream status." + } + ] + } +} diff --git a/src/compas_xr/ghpython/components/ghuser/Cx_AppSettings.ghuser b/src/compas_xr/ghpython/components/ghuser/Cx_AppSettings.ghuser index 47c943c9..7bdb14b6 100644 Binary files a/src/compas_xr/ghpython/components/ghuser/Cx_AppSettings.ghuser and b/src/compas_xr/ghpython/components/ghuser/Cx_AppSettings.ghuser differ diff --git a/src/compas_xr/ghpython/components/ghuser/Cx_Firebase_Config.ghuser b/src/compas_xr/ghpython/components/ghuser/Cx_Firebase_Config.ghuser index 0c276289..d7266d54 100644 Binary files a/src/compas_xr/ghpython/components/ghuser/Cx_Firebase_Config.ghuser and b/src/compas_xr/ghpython/components/ghuser/Cx_Firebase_Config.ghuser differ diff --git a/src/compas_xr/ghpython/components/ghuser/Cx_GetTrajectoryRequest.ghuser b/src/compas_xr/ghpython/components/ghuser/Cx_GetTrajectoryRequest.ghuser index 059c4c28..cd7fdf74 100644 Binary files a/src/compas_xr/ghpython/components/ghuser/Cx_GetTrajectoryRequest.ghuser and b/src/compas_xr/ghpython/components/ghuser/Cx_GetTrajectoryRequest.ghuser differ diff --git a/src/compas_xr/ghpython/components/ghuser/Cx_MqttTrajectoryResult.ghuser b/src/compas_xr/ghpython/components/ghuser/Cx_MqttTrajectoryResult.ghuser index bbf24acc..a119e9f6 100644 Binary files a/src/compas_xr/ghpython/components/ghuser/Cx_MqttTrajectoryResult.ghuser and b/src/compas_xr/ghpython/components/ghuser/Cx_MqttTrajectoryResult.ghuser differ diff --git a/src/compas_xr/ghpython/components/ghuser/Cx_PlanningServiceResponse.ghuser b/src/compas_xr/ghpython/components/ghuser/Cx_PlanningServiceResponse.ghuser index 536ca5da..171697c4 100644 Binary files a/src/compas_xr/ghpython/components/ghuser/Cx_PlanningServiceResponse.ghuser and b/src/compas_xr/ghpython/components/ghuser/Cx_PlanningServiceResponse.ghuser differ diff --git a/src/compas_xr/ghpython/components/ghuser/Cx_SendTrajectory.ghuser b/src/compas_xr/ghpython/components/ghuser/Cx_SendTrajectory.ghuser index ba35f58c..f28bff4a 100644 Binary files a/src/compas_xr/ghpython/components/ghuser/Cx_SendTrajectory.ghuser and b/src/compas_xr/ghpython/components/ghuser/Cx_SendTrajectory.ghuser differ diff --git a/src/compas_xr/ghpython/components/ghuser/Cx_StreamRealtimeDatabase.ghuser b/src/compas_xr/ghpython/components/ghuser/Cx_StreamRealtimeDatabase.ghuser new file mode 100644 index 00000000..c9059820 Binary files /dev/null and b/src/compas_xr/ghpython/components/ghuser/Cx_StreamRealtimeDatabase.ghuser differ diff --git a/src/compas_xr/ghpython/components/ghuser/Cx_XrOptions.ghuser b/src/compas_xr/ghpython/components/ghuser/Cx_XrOptions.ghuser index 329655ff..74687f52 100644 Binary files a/src/compas_xr/ghpython/components/ghuser/Cx_XrOptions.ghuser and b/src/compas_xr/ghpython/components/ghuser/Cx_XrOptions.ghuser differ diff --git a/src/compas_xr/realtime_database/realtime_database.py b/src/compas_xr/realtime_database/realtime_database.py index 2319ba30..8d54daaa 100644 --- a/src/compas_xr/realtime_database/realtime_database.py +++ b/src/compas_xr/realtime_database/realtime_database.py @@ -1,5 +1,6 @@ import json import os +import uuid from typing import Any from typing import Callable @@ -36,6 +37,7 @@ class RealtimeDatabase: def __init__(self, config_path: str): self.config_path = config_path + self._active_streams = {} self._ensure_database() def _ensure_database(self) -> None: @@ -119,8 +121,71 @@ def get_data_from_reference(self, database_reference: pyrebase.pyrebase.Database data_dict = dict(data) return data_dict + @staticmethod + def normalize_stream_message(message: Any) -> dict: + """Normalize a Firebase stream message to a predictable payload. + + Parameters + ---------- + message : Any + Raw stream message from pyrebase. + + Returns + ------- + dict + Dictionary with keys: ``event``, ``path``, ``data``, ``message``. + + """ + if isinstance(message, dict): + return { + "event": message.get("event"), + "path": message.get("path"), + "data": message.get("data"), + "message": message, + } + + return { + "event": getattr(message, "event", None), + "path": getattr(message, "path", None), + "data": getattr(message, "data", None), + "message": message, + } + + @staticmethod + def default_stream_callback(message: dict) -> None: + """Default callback for quickly inspecting stream events.""" + print("Event: {}".format(message.get("event"))) + print("Path: {}".format(message.get("path"))) + print("Data: {}".format(message.get("data"))) + print("Message: {}".format(message.get("message"))) + print("-" * 40) + def stream_data_from_reference(self, callback: Callable, database_reference: pyrebase.pyrebase.Database) -> Any: - raise NotImplementedError("Function Under Developement") + """Stream data from a previously constructed database reference. + + Parameters + ---------- + callback : callable + Callable that accepts one normalized message dictionary. + database_reference : pyrebase.pyrebase.Database + Database reference to stream from. + + Returns + ------- + Any + Stream object returned by pyrebase. + + """ + self._ensure_database() + + def wrapped_callback(raw_message): + callback(self.normalize_stream_message(raw_message)) + + stream = database_reference.stream(wrapped_callback) + stream_id = str(uuid.uuid4()) + self._active_streams[stream_id] = stream + setattr(stream, "_stream_id", stream_id) + return stream def stream_data(self, path: str, callback: Callable) -> Any: """ @@ -142,6 +207,24 @@ def stream_data(self, path: str, callback: Callable) -> Any: database_reference = self.construct_reference(path) return self.stream_data_from_reference(callback, database_reference) + def close_stream(self, stream_id: str) -> bool: + """Close one active stream by its stream id.""" + stream = self._active_streams.get(stream_id) + if not stream: + return False + + stream.close() + del self._active_streams[stream_id] + return True + + def close_all_streams(self) -> int: + """Close all active streams and return the number closed.""" + count = len(self._active_streams) + for stream in list(self._active_streams.values()): + stream.close() + self._active_streams.clear() + return count + def upload_data_to_reference(self, data: Any, database_reference: pyrebase.pyrebase.Database) -> None: """ Method for uploading data to a constructed database reference.