Skip to content

ParaGroup/MoStream

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

15 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

License: LGPL v3 Donate MoStream Logo

MoStream

MoStream is a Mojo library for building stream-processing pipelines with sequential and parallel stages. A pipeline is composed from user-defined stages connected by inter-stage communicators, then executed by Mojo async tasks.

The library is intended for workloads that naturally fit a flow such as:

source -> stage -> stage -> ... -> sink

Each stage declares its input and output message types, and can be run as a sequential node (thread), or as a collection of parallel nodes (threads each running a replica of the stage).

Features

  • Source, transform, one-to-many transform, and sink stages.
  • Sequential nodes with seq(stage).
  • Replicated parallel nodes with parallel(stage, degree).
  • Optional CPU pinning through a small C helper library.
  • Configurable communicators (e.g., customizable queue size).

Requirements

  • Mojo toolchain (version >= 0.26.3)
  • A C compiler such as gcc for the CPU-affinity helper.
  • Linux-style pthread CPU affinity support for thread pinning.

The current runtime expects the helper library at:

$MOSTREAM_HOME/MoStream/lib/libFuncC.so

Setup

Build the C helper and point MOSTREAM_HOME at the repository root:

make -C MoStream/lib
export MOSTREAM_HOME="$PWD"

Then run one of the included examples:

mojo -O3 -I. Tests/test_pipe_1.mojo

If MOSTREAM_HOME is not set, MoStream falls back to the current directory.

Quick Example

from std.collections import Optional
from MoStream import Pipeline, StageKind, StageTrait, seq, parallel

struct NumberSource(StageTrait):
    comptime kind = StageKind.SOURCE
    comptime InType = Int
    comptime OutType = Int
    comptime name = "NumberSource"
    var next: Int

    def __init__(out self):
        self.next = 1

    def next_element(mut self) -> Optional[Int]:
        if self.next > 100:
            return None
        var value = self.next
        self.next = self.next + 1
        return value

@fieldwise_init
struct AddOne(StageTrait):
    comptime kind = StageKind.TRANSFORM
    comptime InType = Int
    comptime OutType = Int
    comptime name = "AddOne"

    def compute(mut self, var input: Int) -> Optional[Int]:
        return input + 1

@fieldwise_init
struct PrintSink(StageTrait):
    comptime kind = StageKind.SINK
    comptime InType = Int
    comptime OutType = Int
    comptime name = "PrintSink"

    def consume_element(mut self, var input: Int):
        print(input)

def main() raises:
    var source = NumberSource()
    var add_one = AddOne()
    var sink = PrintSink()

    var pipeline = Pipeline((
        seq(source),
        parallel(add_one, 4),
        seq(sink),
    ))

    pipeline.setPinning(False)
    pipeline.run()

Defining Stages

Every stage implements StageTrait and sets these compile-time fields:

comptime kind: Int
comptime InType: MessageTrait
comptime OutType: MessageTrait
comptime name: String

Supported stage kinds:

  • StageKind.SOURCE: produces stream elements with next_element().
  • StageKind.TRANSFORM: consumes one input and returns zero or one output with compute().
  • StageKind.TRANSFORM_MANY: consumes one input and emits zero or more outputs with compute_many(input, emitter).
  • StageKind.SINK: consumes stream elements with consume_element().

Stages can optionally implement:

def received_eos(mut self):
    ...

MoStream calls this hook when a stage observes the end of the stream.

One-to-Many Transforms

TRANSFORM_MANY stages use Emitter to produce any number of output elements for a single input:

from MoStream import Emitter

@fieldwise_init
struct Duplicate(StageTrait):
    comptime kind = StageKind.TRANSFORM_MANY
    comptime InType = Int
    comptime OutType = Int
    comptime name = "Duplicate"

    def compute_many(mut self, var input: Int, mut emitter: Emitter[Int]):
        emitter.emit(input)
        emitter.emit(input)

Parallelism

Use seq(stage) for a single stage instance and parallel(stage, degree) for replicated execution:

var pipeline = Pipeline((
    seq(source),
    parallel(filter, 4),
    parallel(mapper, 8),
    seq(sink),
))

The source stage must be the first pipeline stage, and the sink stage must be the last. The total number of node replicas must fit within Mojo's available async runtime parallelism.

The source stage must be the first pipeline stage, and the sink stage must be the last. The total number of node replicas must fit within Mojo's available async runtime parallelism.

Runtime Backends

MoStream provides two execution runtimes: the standard runtime and the cooperative runtime.

Standard Runtime

The standard runtime is selected with:

pipeline.run()

In this runtime, each pipeline node replica is executed by one long-lived Mojo async task. For example:

var pipeline = Pipeline((
    seq(source),
    parallel(stage_a, 2),
    parallel(stage_b, 4),
    seq(sink),
))

pipeline.run()

creates one task for the source, two tasks for stage_a, four tasks for stage_b, and one task for the sink.

Each task repeatedly executes the logic of its stage and communicates with the next/previous stage through bounded communicators. This runtime is simple and direct, but each node replica occupies one task for the whole lifetime of the pipeline. Therefore, the total number of node replicas should not exceed the parallelism available in Mojo's async runtime.

Use the standard runtime when the pipeline parallelism degree naturally matches the number of available runtime threads.

Cooperative Runtime

The cooperative runtime is selected with:

pipeline.run_cooperative(n_workers)

where n_workers is the number of scheduler workers used to execute the pipeline actors.

In this runtime, each node replica is represented as an actor. Actors do not own a runtime thread permanently. Instead, a smaller number of scheduler workers repeatedly pick ready actors, run one non-blocking activation, and then either reschedule the actor or park it if it cannot make progress.

For example:

var pipeline = Pipeline((
    seq(source),
    parallel(stage_a, 8),
    parallel(stage_b, 8),
    seq(sink),
))

pipeline.run_cooperative(4)

creates many logical actors, but only four scheduler workers execute them. This allows MoStream to experiment with pipeline configurations where the number of logical stage replicas is larger than the number of runtime worker threads.

The cooperative runtime is especially useful when some stages frequently block on input or output. In that case, blocked actors do not need to occupy a worker thread while waiting.

Runtime Configuration

Queue Size

MoStream currently adopts just one communicator type. This uses bounded MPMC queues between stages. The default queue size is 1024. Queue sizes must be powers of two and at least 2.

pipeline.setQueueSize(2048)

CPU Pinning

CPU pinning is disabled by default.

pipeline.setPinning(True)

By default, MoStream assigns CPU IDs from 0 to the number of detected CPUs. You can provide a custom CPU order with MOSTREAM_PINNING:

export MOSTREAM_PINNING="0,2,4,6"

Benchmarks

The image benchmark demonstrates a deeper stream-processing pipeline:

TimedImageSource -> Grayscale -> GaussianBlur -> Sharpen -> ImageSink

Run it with the parallelism degree for each transform stage:

mojo Benchmarks/ImagePipeline/test_image_pipeline.mojo 2 4 2

License

Source file headers state that MoStream is distributed under the GNU Lesser General Public License version 3.

Requests for Modifications

If you are using MoStream for your purposes and you are interested in specific modifications of the API (or of the runtime system), please send an email to the maintainer.

Contributors

The main developer and maintainer of MoStream is Gabriele Mencagli (Department of Computer Science, University of Pisa, Italy).

Releases

No releases published

Packages

 
 
 

Contributors