Pipeline API
The pipeline API is the main public entry point for building DSP graphs in PYTHUSA.
If the low-level runtime is the set of motors, belts, and bearings, Pipeline is the factory plan that tells those parts what to build.
Overview
A pipeline represents the flow of data and execution used by PYTHUSA. The simplest mental model is a factory line:
- tasks are the machines
- streams are the conveyor belts between machines
- events are control switches that tell certain machines when they may run
Each task runs in its own worker process. Each stream is a fixed-shape, fixed-dtype shared-memory channel. When you compile the pipeline, PYTHUSA turns that declaration into live rings, events, and worker processes.
Use Pipeline when you want:
- a declarative graph of your processing stages
- typed NumPy frame transport between processes
- automatic ring and worker setup
- a smaller public API than the raw
Managerpath
Use the lower-level runtime directly only when you need custom orchestration or direct ring control.
Basic Workflow
Most pipelines follow the same sequence:
- Create a
Pipeline. - Declare one or more streams with
add_stream(...). - Declare any control events with
add_event(...). - Register tasks with
add_task(...). - Start the pipeline with
start()orrun(). - Stop, join, or close it explicitly, or use it as a context manager.
In factory-line terms:
- you name the machines
- you define the conveyor belts between them
- you wire any start/stop switches
- then you turn the line on
Quick Example
from __future__ import annotations
import time
import numpy as np
import pythusa
FRAME = np.arange(8, dtype=np.float32)
def source(samples) -> None:
samples.write(FRAME)
def scale(samples, doubled) -> None:
while True:
frame = samples.read()
if frame is None:
time.sleep(0.001)
continue
doubled.write((frame * 2.0).astype(np.float32, copy=False))
return
def sink(doubled) -> None:
while True:
frame = doubled.read()
if frame is None:
time.sleep(0.001)
continue
print(frame)
return
def main() -> None:
with pythusa.Pipeline("demo") as pipe:
pipe.add_stream("samples", shape=(8,), dtype=np.float32)
pipe.add_stream("doubled", shape=(8,), dtype=np.float32)
pipe.add_task("source", fn=source, writes={"samples": "samples"})
pipe.add_task(
"scale",
fn=scale,
reads={"samples": "samples"},
writes={"doubled": "doubled"},
)
pipe.add_task("sink", fn=sink, reads={"doubled": "doubled"})
pipe.run()
if __name__ == "__main__":
main()
This is a three-machine line:
sourcepublishes framesscaletransforms themsinkconsumes the result
On Windows and other spawn-based multiprocessing environments, pipe.start() and pipe.run() must live behind an if __name__ == "__main__": guard or the child process will re-import the module and try to create the same shared-memory rings again. This does not change pipeline semantics on Linux or macOS; it is simply the correct portable multiprocessing pattern for standalone scripts.
Creating A Pipeline
Create a pipeline by naming it:
pipe = pythusa.Pipeline("radar")
The pipeline owns a runtime Manager internally and is the lifecycle owner for the graph you declare.
It can be used directly or as a context manager:
with pythusa.Pipeline("radar") as pipe:
...
The context-manager form is the recommended default because it gives deterministic cleanup.
Streams
Declare a stream with:
pipe.add_stream(
"samples",
shape=(4096,),
dtype=np.float32,
frames=64,
cache_align=True,
min_reader_pos_refresh_interval=64,
min_reader_pos_refresh_s=0.005,
)
Parameters:
name: unique stream name within the pipelineshape: frame shape passed to task bindingsdtype: NumPy dtype for one frameframes: number of frames to allocate in the backing ring buffercache_align: whether compile-time ring sizing should apply cache-line alignmentmin_reader_pos_refresh_interval: rescan the slowest-reader cache after this many writesmin_reader_pos_refresh_s: rescan the slowest-reader cache after this many seconds even if the writer is otherwise idle
Conceptually, a stream is the conveyor belt between two machines.
At compile time, that declaration becomes a shared-memory ring sized for 32 frames by default, or the explicit frames= value you provide.
By default, the writer refreshes cached backpressure state every 64 writes or every 5 ms, whichever comes first.
You only need to tune these refresh knobs if you are trading off writer-side scan overhead against how quickly external reader progress becomes visible.
Current rules:
- one stream has exactly one writer task
- one stream may have zero or more reader tasks at declaration time
- compile requires each stream to have at least one reader
If you register the same stream name twice, add_stream(...) raises ValueError.
Events
Declare an event with:
pipe.add_event("shutdown")
pipe.add_event("armed", initial_state=True)
Events are process-shared control primitives. In the factory-line analogy, they are the switches and gates that tell machines whether they may run.
Current guidance:
- many signalers is acceptable
- one consumer-side owner is the intended pattern
- many consumers on one event are discouraged
If an event is bound into more than two tasks, compile emits a warning because that usually indicates a design that should be split into separate events.
Tasks
Register a task with:
pipe.add_task(
"fft_worker",
fn=fft_worker,
reads={"samples": "raw_adc"},
writes={"fft": "spectra"},
events={"shutdown": "shutdown"},
)
Parameters:
name: unique task name within the pipelinefn: callable that runs in the worker processreads: mapping from local function argument names to stream nameswrites: mapping from local function argument names to stream namesevents: mapping from local function argument names to event names
The mapping direction matters:
- keys are the local argument names seen by the function
- values are the real stream or event names registered on the pipeline
So this:
reads={"samples": "raw_adc"}
means:
- the pipeline stream is named
raw_adc - the task function receives it as
samples=...
That lets reusable task functions stay stable even when pipeline names change.
Decorator Form
add_task(...) also supports decorator registration for plain tasks:
@pipe.add_task(
"sink",
reads={"samples": "samples"},
)
def sink(samples) -> None:
...
This decorator form is only for plain add_task(...).
Controlled-task registration uses explicit fn=....
Controlled Tasks
Two controlled task forms are available:
pipe.add_task.switchable(...)pipe.add_task.toggleable(...)pipe.add_task.terminator(...)
Example:
def flush_buffer(output) -> None:
output.write(np.zeros(8, dtype=np.float32))
pipe.add_event("flush")
pipe.add_stream("output", shape=(8,), dtype=np.float32)
pipe.add_task.toggleable(
"flush_once",
activate_on="flush",
fn=flush_buffer,
writes={"output": "output"},
events={"flush": "flush"},
)
Behavior:
switchable: waits for the event, then keeps rerunning without resetting ittoggleable: waits for the event, resets it, then runs once per activation
Rules:
activate_onmust be one of the task's bound event names- controlled tasks are event-driven wrappers around the task function
- the control event is not passed through to the task function itself
If activate_on is missing from the task's event bindings, registration raises ValueError.
terminator(...) is a built-in no-op reader task. It binds one or more streams, marks those readers inactive, and then returns. Use it when a stream needs a reader for compilation but has no real downstream consumer.
Stream Bindings Inside Tasks
At runtime, tasks do not receive raw arrays directly. They receive stream binding objects.
Reader bindings support:
read() -> np.ndarray | Noneread_into(out) -> boollook() -> memoryview | Noneincrement()set_blocking(bool)is_blocking().raw.ring
Writer bindings support:
write(array) -> boollook() -> memoryview | Noneincrement().raw.ring
Examples:
read():
def worker(samples, fft) -> None:
while True:
frame = samples.read()
if frame is None:
time.sleep(0.001)
continue
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
if fft.write(spectrum):
return
read_into(out):
def worker(samples, fft) -> None:
frame = np.empty((4096,), dtype=np.float32)
while True:
if not samples.read_into(frame):
time.sleep(0.001)
continue
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
if fft.write(spectrum):
return
look() and increment() on the reader:
def worker(samples, fft) -> None:
while True:
frame_view = samples.look()
if frame_view is None:
time.sleep(0.001)
continue
frame = np.frombuffer(frame_view, dtype=np.float32).reshape((4096,))
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
samples.increment()
if fft.write(spectrum):
return
set_blocking() and is_blocking():
def worker(samples) -> None:
samples.set_blocking(False)
if not samples.is_blocking():
latest = samples.look()
if latest is not None:
print(np.frombuffer(latest, dtype=np.float32))
samples.increment()
samples.set_blocking(True)
write():
def worker(samples, fft) -> None:
frame = samples.read()
if frame is None:
return
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
fft.write(spectrum)
look() and increment() on the writer:
def worker(samples, fft) -> None:
frame = samples.read()
if frame is None:
return
fft_view = fft.look()
if fft_view is None:
return
spectrum = np.frombuffer(fft_view, dtype=np.complex64).reshape((2049,))
spectrum[:] = np.fft.rfft(frame).astype(np.complex64, copy=False)
fft.increment()
Notes:
read()returns an owned NumPy array copyread_into(...)avoids that allocation by filling a provided arraylook()returns a zero-copy memoryview for the next contiguous frame and does not advance the reader- call
increment()after you finish using the view fromlook() look()returnsNonewhen the next frame is not available or is wrapped across the ring boundarywriter.look()returns a zero-copy writable memoryview for the next contiguous frame and does not advance the writer- call
writer.increment()after you fill the view fromwriter.look() writer.look()returnsNonewhen the next frame would wrap across the ring boundarywrite(...)validates shape and dtype before publishing.rawand.ringexpose the underlying shared-memory ring for direct low-level access
Blocking And Backpressure
Readers participate in writer backpressure by default. That is usually what you want.
If you need a reader to stop holding writers back:
samples.set_blocking(False)
When a reader is made non-blocking:
- it is marked inactive
- writers stop treating it as a backpressure participant
When it is re-enabled:
samples.set_blocking(True)
the reader jumps to the current writer position. Unread backlog is discarded rather than replayed.
This is an advanced control. It is useful for "latest frame only" style consumers, but it is not lossless.
Lifecycle Methods
The main lifecycle methods are:
compile()start()run()stop()join(timeout=None)close()
Typical usage:
with pythusa.Pipeline("demo") as pipe:
...
pipe.start()
pipe.join()
or, for short-running pipelines:
with pythusa.Pipeline("demo") as pipe:
...
pipe.run()
Method behavior:
compile(): validates the graph and registers runtime objectsstart(): compiles if needed, then starts tasks in reverse topological orderrun(): convenience method forstart()followed byjoin()stop(): requests shutdown of running worker processesjoin(): waits for worker processes to exitclose(): stops, joins, and closes the owned runtime manager
Current lifecycle model:
- a pipeline instance is compile-once
- a pipeline instance is start-once
compile()twice raisesRuntimeErrorstart()twice raisesRuntimeErrorclose()is idempotent
Metrics And Monitoring
Start monitoring with:
pipe.start_monitor(interval_s=0.05)
Read metrics with:
all_metrics = pipe.metrics()
worker_metrics = pipe.metrics("worker")
Metrics expose snapshots of:
- PID
- CPU percent
- RSS memory
- nice level
- ring pressure
Use this for operational visibility, not as a hard real-time timing guarantee.
If you ask for an unknown task name, metrics(task_name) raises KeyError.
Saving And Reconstructing Pipelines
Pipelines can be serialized to TOML:
pipe.save("radar.toml")
restored = pythusa.Pipeline.reconstruct("radar.toml")
This persists the declaration, not a live runtime.
Saved content includes:
- pipeline name
- stream declarations
- event declarations
- task bindings
- controlled-task metadata
- callable module and qualified name
Important limitation:
- saved task callables must be importable top-level functions
- lambdas, nested functions, and other non-importable callables are not supported
Errors And Current Limits
PYTHUSA v0 is intentionally narrow. The constraints are there to keep the execution model explicit.
Current unsupported or constrained patterns:
- cyclic task graphs are not supported
- multiple writers to one stream are not supported
- arbitrary Python objects as stream payloads are not supported
- variable-shape frames on one stream are not supported
- automatic fan-in coordination is not provided
- many independent consumers on one event are not the intended event model
Some important consequences:
- if a task reads from multiple input streams, that task is responsible for deciding how to synchronize them
- if two upstream producers run at different rates, the downstream task must define the join logic
- if you need counted per-activation semantics, design the event ownership carefully instead of treating one event like a broadcast queue
Common failure modes:
- duplicate stream, task, or event names raise
ValueError - invalid task bindings raise
ValueError - compile-time topology problems raise
ValueError - trying to compile or start the same pipeline twice raises
RuntimeError
When To Drop Lower
Pipeline should be the default.
Drop to the low-level runtime only when you specifically need:
- direct
Managercontrol - manual ring construction
- custom bootstrap behavior
- experiments that depend on raw ring semantics
If your application still fits the factory-line model of machines plus conveyor belts, stay with Pipeline.
If you need to rewire the motors themselves, use the runtime layer.