Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

* Added Stream Realtime Database Methods and Component
* Added type hints to all public methods and functions

### Changed
Expand Down
131 changes: 131 additions & 0 deletions src/compas_xr/ghpython/components/Cx_StreamRealtimeDatabase/code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# r: compas_xr>=2.0.0
"""
Component to stream data from Realtime Database.

Streams updates from Firebase Realtime Database using a background worker.

COMPAS XR v1.0.0
"""

import time

import Grasshopper
from compas_eve.ghpython import BackgroundWorker

from compas_xr.realtime_database import RealtimeDatabase


def start_rtdb_stream(worker, config_filepath, rtdb_path, return_full_data):
worker.config_filepath = config_filepath
worker.rtdb_path = rtdb_path
worker.return_full_data = bool(return_full_data)
worker.db = RealtimeDatabase(config_filepath)
worker.stream_id = None
worker.update_count = 0
worker.display_message("Connecting...")

def on_message(message):
evt = message.get("event")
pth = message.get("path")
raw_dat = message.get("data")

if worker.return_full_data:
# Pull the full current subtree from the subscribed path.
try:
dat = worker.db.get_data(worker.rtdb_path)
except Exception:
dat = raw_dat
else:
# Use raw event delta payload.
dat = raw_dat

worker.update_count += 1
worker.update_result((evt, pth, dat), delay=1)

mode = "full" if worker.return_full_data else "delta"
worker.display_message("Received Update #{} ({})".format(worker.update_count, mode))

stream_obj = worker.db.stream_data(rtdb_path, on_message)
worker.stream_id = getattr(stream_obj, "_stream_id", None)

mode = "full" if worker.return_full_data else "delta"
worker.display_message("Streaming... waiting for updates ({})".format(mode))

while not worker.has_requested_cancellation():
time.sleep(0.1)

return None


def stop_rtdb_stream(worker):
# Called by worker.dispose().
if hasattr(worker, "db") and worker.db and hasattr(worker, "stream_id") and worker.stream_id:
try:
worker.db.close_stream(worker.stream_id)
except Exception as e:
worker.display_message("Stop error: {}".format(e))
worker.display_message("Stopped")


class StreamRealtimeDatabaseComponent(Grasshopper.Kernel.GH_ScriptInstance):
def RunScript(self, config_filepath, path, stream, return_full_data):
event = None
event_path = None
data = None
status = "stopped"

if stream is None:
stream = False

if return_full_data is None:
return_full_data = True

if not stream:
BackgroundWorker.stop_instance_by_component(ghenv) # noqa: F821
return event, event_path, data, status

if not config_filepath or not path:
status = "error: provide config_filepath and path"
return event, event_path, data, status

worker = BackgroundWorker.instance_by_component(
ghenv, # noqa: F821
start_rtdb_stream,
dispose_function=stop_rtdb_stream,
auto_set_done=False,
force_new=False,
args=(config_filepath, path, return_full_data),
)

must_restart = False
if hasattr(worker, "config_filepath") and hasattr(worker, "rtdb_path"):
if worker.config_filepath != config_filepath or worker.rtdb_path != path:
must_restart = True
elif hasattr(worker, "return_full_data") and worker.return_full_data != bool(return_full_data):
must_restart = True

if must_restart:
worker = BackgroundWorker.instance_by_component(
ghenv, # noqa: F821
start_rtdb_stream,
dispose_function=stop_rtdb_stream,
auto_set_done=False,
force_new=True,
args=(config_filepath, path, return_full_data),
)

if not worker.is_working():
worker.start_work()

if hasattr(worker, "result") and worker.result:
event, event_path, data = worker.result

if worker.is_working():
mode = "full" if bool(return_full_data) else "delta"
status = "streaming ({}, {})".format(getattr(worker, "stream_id", None), mode)
elif worker.is_done():
status = "done"
else:
status = "idle"

return event, event_path, data, status
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"name": "Stream RealtimeDatabase",
"nickname": "RTDB Stream",
"category": "COMPAS XR",
"subcategory": "0 Firebase",
"description": "Stream data from Firebase Realtime Database with optional full-node output.",
"exposure": 2,

"ghpython": {
"isAdvancedMode": true,
"iconDisplay": 2,
"inputParameters": [
{
"name": "config_filepath",
"description": "Path to Firebase Realtime Database configuration JSON file."
},
{
"name": "path",
"description": "Realtime Database path to stream."
},
{
"name": "stream",
"description": "Turn stream ON or OFF.",
"typeHintID": "bool"
},
{
"name": "return_full_data",
"description": "If true, output full subtree at subscribed path; if false, output raw event delta.",
"typeHintID": "bool"
}
],
"outputParameters": [
{
"name": "event",
"description": "Stream event type (for example put/patch)."
},
{
"name": "event_path",
"description": "Path in the streamed node where the event occurred."
},
{
"name": "data",
"description": "Event payload (delta or full subtree depending on return_full_data)."
},
{
"name": "status",
"description": "Worker and stream status."
}
]
}
}
Binary file modified src/compas_xr/ghpython/components/ghuser/Cx_AppSettings.ghuser
Binary file not shown.
Binary file modified src/compas_xr/ghpython/components/ghuser/Cx_Firebase_Config.ghuser
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file modified src/compas_xr/ghpython/components/ghuser/Cx_XrOptions.ghuser
Binary file not shown.
85 changes: 84 additions & 1 deletion src/compas_xr/realtime_database/realtime_database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
import uuid
from typing import Any
from typing import Callable

Expand Down Expand Up @@ -36,6 +37,7 @@ class RealtimeDatabase:

def __init__(self, config_path: str):
self.config_path = config_path
self._active_streams = {}
self._ensure_database()

def _ensure_database(self) -> None:
Expand Down Expand Up @@ -119,8 +121,71 @@ def get_data_from_reference(self, database_reference: pyrebase.pyrebase.Database
data_dict = dict(data)
return data_dict

@staticmethod
def normalize_stream_message(message: Any) -> dict:
"""Normalize a Firebase stream message to a predictable payload.

Parameters
----------
message : Any
Raw stream message from pyrebase.

Returns
-------
dict
Dictionary with keys: ``event``, ``path``, ``data``, ``message``.

"""
if isinstance(message, dict):
return {
"event": message.get("event"),
"path": message.get("path"),
"data": message.get("data"),
"message": message,
}

return {
"event": getattr(message, "event", None),
"path": getattr(message, "path", None),
"data": getattr(message, "data", None),
"message": message,
}

@staticmethod
def default_stream_callback(message: dict) -> None:
"""Default callback for quickly inspecting stream events."""
print("Event: {}".format(message.get("event")))
print("Path: {}".format(message.get("path")))
print("Data: {}".format(message.get("data")))
print("Message: {}".format(message.get("message")))
print("-" * 40)

def stream_data_from_reference(self, callback: Callable, database_reference: pyrebase.pyrebase.Database) -> Any:
raise NotImplementedError("Function Under Developement")
"""Stream data from a previously constructed database reference.

Parameters
----------
callback : callable
Callable that accepts one normalized message dictionary.
database_reference : pyrebase.pyrebase.Database
Database reference to stream from.

Returns
-------
Any
Stream object returned by pyrebase.

"""
self._ensure_database()

def wrapped_callback(raw_message):
callback(self.normalize_stream_message(raw_message))

stream = database_reference.stream(wrapped_callback)
stream_id = str(uuid.uuid4())
self._active_streams[stream_id] = stream
setattr(stream, "_stream_id", stream_id)
return stream

def stream_data(self, path: str, callback: Callable) -> Any:
"""
Expand All @@ -142,6 +207,24 @@ def stream_data(self, path: str, callback: Callable) -> Any:
database_reference = self.construct_reference(path)
return self.stream_data_from_reference(callback, database_reference)

def close_stream(self, stream_id: str) -> bool:
"""Close one active stream by its stream id."""
stream = self._active_streams.get(stream_id)
if not stream:
return False

stream.close()
del self._active_streams[stream_id]
return True

def close_all_streams(self) -> int:
"""Close all active streams and return the number closed."""
count = len(self._active_streams)
for stream in list(self._active_streams.values()):
stream.close()
self._active_streams.clear()
return count

def upload_data_to_reference(self, data: Any, database_reference: pyrebase.pyrebase.Database) -> None:
"""
Method for uploading data to a constructed database reference.
Expand Down
Loading