Skip to content
43 changes: 42 additions & 1 deletion docs/architecture/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,48 @@ In Kolibri, on the ``FacilityDataset`` model, we generate the certificate as a f
There's flexibility in the application layer for determining the validity of a root certificate, and it's specified on a per-profile basis. For the ``facilitydata`` profile, Kolibri leverages its ``auth`` models for this.


Streaming architecture
----------------------

Morango includes a streaming architecture for memory-efficient processing of sync data. This architecture is implemented in the ``morango.sync.stream`` module and provides a modular, ETL-like pipeline pattern for processing data records one-by-one, significantly reducing memory overhead compared to batch processing approaches.

The streaming architecture is built around several core concepts:

**Stream modules**
Abstract base classes that form the foundation of the streaming pipeline:

- ``Source``: The starting point of a pipeline that yields data items
- ``PipelineModule``: Transform-like modules that process data items
- ``Sink``: Terminal modules that consume data items without yielding further output
- ``ReaderModule``: Modules that can be connected to other modules via the ``pipe()`` method

**Pipeline composition**
Modules are connected using a fluent interface via the ``pipe()`` method, creating a directed flow of data:

.. code-block:: python

source.pipe(transform1).pipe(transform2).end(sink)

**Key pipeline modules**
Several specialized pipeline modules are provided:

- ``Transform``: Applies a 1:1 transformation to each item
- ``FlatMap``: Maps each item to zero or more output items
- ``Buffer``: Collects items into fixed-size chunks for batch operations
- ``Unbuffer``: Flattens chunks back into individual items

**Serialization pipeline**
The serialization process uses this streaming architecture through the ``serialize_into_store()`` function, which constructs a pipeline that:

1. Reads dirty app models from the database (``AppModelSource``)
2. Buffers records for efficient database lookups (``Buffer``)
3. Looks up corresponding store records (``StoreLookup``)
4. Updates store records with new data (``StoreUpdate``)
5. Buffers by model type for efficient bulk operations (``ModelPartitionBuffer``)
6. Writes changes to the database (``WriteSink``)

This streaming approach ensures that memory usage remains constant regardless of dataset size, making Morango suitable for large-scale deployments with limited resources.

Session controller, contexts, and operations
--------------------------------------------

Expand All @@ -142,4 +184,3 @@ A unidirectional sync has several stages: ``INITIALIZING``, ``SERIALIZING``, ``Q
.. image:: ./session-controller-seq.png

The list of operations for each stage are configured through Django settings. The configuration key for each stage follows the pattern ``MORANGO_%STAGE%_OPERATIONS``, so the list/tuple of operations for the ``QUEUING`` stage access the ``MORANGO_QUEUING_OPERATIONS`` configuration value. Built-in operations implement a callable ``BaseOperation`` class by overriding a ``handle`` method. The ``BaseOperation`` class supports raising an ``AssertionError`` to defer responsibility to the next operation.

3 changes: 1 addition & 2 deletions docs/syncing/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Process

Syncing is the actual exchange of data in a sync session. The general steps for syncing data are:

1. **Serialization** - serializing data that is associated with Django models in the Application layer, and storing it in JSON format in a record in the Store
1. **Serialization** - serializing data that is associated with Django models in the Application layer, and storing it in JSON format in a record in the Store. This process uses a streaming architecture that processes records one-by-one through a modular pipeline, ensuring constant memory usage regardless of dataset size.
2. **Queuing/Buffering** - storing serialized records and their modification history to a separate Buffers data structure
3. **Transfer/chunking of data** - the actual transfer of data over a request/response cycle in chunks of 500 records at a time
4. **Dequeuing** - merging the data received in the receiving buffers to the receiving store and record-max counter
Expand Down Expand Up @@ -70,4 +70,3 @@ For a push or pull sync lifecycle, the order of the fired signals would be as fo
7) Dequeuing started
8) Dequeuing completed
9) Session completed

4 changes: 4 additions & 0 deletions morango/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,10 @@ class RecordMaxCounter(AbstractCounter):

store_model = models.ForeignKey(Store, on_delete=models.CASCADE)

@property
def unique_key(self):
return f"{self.instance_id}:{self.store_model_id}"

class Meta:
unique_together = ("store_model", "instance_id")

Expand Down
10 changes: 10 additions & 0 deletions morango/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import inspect
import sys
from collections import OrderedDict
from typing import Generator

from django.db.models import QuerySet
from django.db.models.fields.related import ForeignKey

from morango.constants import transfer_stages
Expand Down Expand Up @@ -82,6 +84,14 @@ def get_models(self, profile):
self.check_models_ready(profile)
return list(self.profile_models.get(profile, {}).values())

def get_model_querysets(self, profile) -> Generator[QuerySet, None, None]:
"""
Method for future enhancement to iterate over model's and their querysets in a fashion
(particularly, an order) that is aware of FK dependencies.
"""
for model in self.get_models(profile):
yield model.syncing_objects.all()

def _insert_model_in_dependency_order(self, model, profile):
# When we add models to be synced, we need to make sure
# that models that depend on other models are synced AFTER
Expand Down
26 changes: 7 additions & 19 deletions morango/sync/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,35 @@
from morango.constants import transfer_statuses
from morango.registry import session_middleware
from morango.sync.operations import _deserialize_from_store
from morango.sync.operations import _serialize_into_store
from morango.sync.operations import OperationLogger
from morango.sync.stream.serialize import serialize_into_store
from morango.sync.utils import SyncSignalGroup
from morango.utils import _assert


logger = logging.getLogger(__name__)


def _self_referential_fk(klass_model):
"""
Return whether this model has a self ref FK, and the name for the field
"""
for f in klass_model._meta.concrete_fields:
if f.related_model:
if issubclass(klass_model, f.related_model):
return f.attname
return None


class MorangoProfileController(object):
def __init__(self, profile):
_assert(profile, "profile needs to be defined.")
self.profile = profile

def serialize_into_store(self, filter=None):
def serialize_into_store(self, sync_filter=None):
"""
Takes data from app layer and serializes the models into the store.
"""
with OperationLogger("Serializing records", "Serialization complete"):
_serialize_into_store(self.profile, filter=filter)
serialize_into_store(self.profile, sync_filter=sync_filter)

def deserialize_from_store(self, skip_erroring=False, filter=None):
def deserialize_from_store(self, skip_erroring=False, sync_filter=None):
"""
Takes data from the store and integrates into the application.
"""
with OperationLogger("Deserializing records", "Deserialization complete"):
# we first serialize to avoid deserialization merge conflicts
_serialize_into_store(self.profile, filter=filter)
serialize_into_store(self.profile, sync_filter=sync_filter)
_deserialize_from_store(
self.profile, filter=filter, skip_erroring=skip_erroring
self.profile, filter=sync_filter, skip_erroring=skip_erroring
)

def create_network_connection(self, base_url, **kwargs):
Expand Down Expand Up @@ -217,7 +205,7 @@ def proceed_to_and_wait_for(
if tries >= max_interval_tries:
sleep(max_interval)
else:
sleep(0.3 * (2 ** tries - 1))
sleep(0.3 * (2**tries - 1))
result = self.proceed_to(target_stage, context=context)
tries += 1
if callable(callback):
Expand Down
39 changes: 39 additions & 0 deletions morango/sync/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
from contextlib import contextmanager

from django.db import connection
from django.db import transaction

from morango.sync.backends.utils import load_backend
from morango.sync.utils import lock_partitions


logger = logging.getLogger(__name__)

DBBackend = load_backend(connection)


@contextmanager
def begin_transaction(sync_filter, isolated=False, shared_lock=False):
"""
Starts a transaction, sets the transaction isolation level to repeatable read, and locks
affected partitions

:param sync_filter: The filter for filtering applicable records of the sync
:type sync_filter: morango.models.certificates.Filter|None
:param isolated: Whether to alter the transaction isolation to repeatable-read
:type isolated: bool
:param shared_lock: Whether the advisory lock should be exclusive or shared
:type shared_lock: bool
"""
if isolated:
# when isolation is requested, we modify the transaction isolation of the connection for the
# duration of the transaction
with DBBackend._set_transaction_repeatable_read():
with transaction.atomic(savepoint=False):
lock_partitions(DBBackend, sync_filter=sync_filter, shared=shared_lock)
yield
else:
with transaction.atomic():
lock_partitions(DBBackend, sync_filter=sync_filter, shared=shared_lock)
yield
Loading