Runtime
The runtime is the lower-level execution layer underneath Pipeline.
If Pipeline is the factory plan, the runtime is the actual machinery: the rings, worker processes, bootstrap logic, and synchronization objects that make the line move.
Most users should start with Pipeline.
Drop to the runtime only when you need direct control over ring sizing, task startup order, raw shared-memory access, or custom orchestration.
What Runtime Means
At the runtime level, PYTHUSA is built from a small set of explicit parts:
Manager: owns live rings, events, and worker processesRingSpec: describes one shared-memory ringTaskSpec: describes one worker taskEventSpec: describes one process-shared eventSharedRingBuffer: the actual shared-memory transport objectWorkerEvent: the actual process-shared event object
Pipeline.compile() eventually creates these lower-level objects for you.
Using the runtime directly means building them yourself.
The tradeoff is simple:
Pipelineis smaller, safer, and easier to reason about- the runtime is more explicit and more flexible, but easier to misuse
Runtime Object Model
Manager
Manager is the runtime owner.
It keeps strong references to:
- live creator-side ring handles
- live event objects
- registered task specs
- spawned worker processes
- sampled process metrics
At a high level, Manager does four things:
- register rings, events, and tasks
- assign reader slots to task processes
- build a task bootstrap for each process
- start, stop, join, and monitor those processes
Unlike Pipeline, Manager is imperative.
You register objects and start named tasks yourself.
RingSpec
RingSpec is pure configuration for one ring buffer:
namesizein bytesnum_readerscache_aligncache_sizemin_reader_pos_refresh_intervalmin_reader_pos_refresh_s
Important: at the runtime level, ring size is always in bytes.
The raw ring does not know about NumPy frame shape or dtype beyond whatever the caller chooses to interpret.
The two refresh fields control how often the writer rescans reader positions to recompute the cached slowest-reader boundary.
Defaults are 64 writes and 0.005 seconds.
TaskSpec
TaskSpec declares one worker process:
namefnreading_ringswriting_ringseventsargskwargs
The runtime starts one TaskSpec per process.
The task callable is invoked exactly once.
If a task should run continuously, the loop belongs inside the task function.
EventSpec
EventSpec is pure configuration:
nameinitial_state
It tells the manager to create a named WorkerEvent.
SharedRingBuffer
SharedRingBuffer is the actual shared-memory transport.
It is a fixed-size byte ring with one writer position and one reader position per registered reader slot.
The ring is concerned only with bytes.
Shape and dtype interpretation are the caller's responsibility.
It accepts the same min-reader refresh knobs as RingSpec, with the same defaults.
WorkerEvent
WorkerEvent is the runtime control primitive used across processes.
It combines:
- a process-shared
Eventfor blocking wakeup - a process-shared
Semaphorefor counted activations - a shared counter and lock to keep state consistent
That makes it more than a plain binary flag.
How Ring Buffers Work Under The Hood
SharedRingBuffer stores two things in one shared-memory block:
- a header
- the ring payload bytes
The header tracks:
- logical ring size
- pressure
- dropped size
- global write position
- computed writable capacity
- number of readers
- per-reader position, active flag, and last-seen slot data
Conceptually:
- the writer only advances the write position
- each reader advances its own reader position
- writable capacity is determined by the slowest active reader
That last point is the backpressure rule. If one active reader falls behind, it limits how far the writer may advance.
Writer Path
The writer:
- asks how many bytes are writable
- obtains one or two memoryviews into the ring payload
- copies bytes into those views
- advances the write position
If there is not enough room, the write fails and returns 0.
The caller decides whether to retry, wait, or drop.
Reader Path
The reader:
- checks how many bytes are readable
- obtains one or two memoryviews into the ring payload
- copies or views the readable bytes
- advances its own reader position
Each reader has its own slot. Readers do not contend by rewriting a shared reader cursor.
Wrap-Around
If a read or write spans the end of the payload, the ring exposes two views:
- one to the tail of the buffer
- one to the beginning of the buffer
The helper methods handle this for you.
Pressure
Ring pressure is the complement of writable space, expressed as a percentage. If little writable space remains, pressure rises. The monitor thread uses this to report stalled or overloaded stages.
How Tasks Work Under The Hood
The task runtime path is:
- you register a
TaskSpec Manager.start(name)validates the task- the manager assigns reader slots for the task's input rings
- it builds a
TaskBootstrap - a child process is spawned with that bootstrap as the target
- the child opens its reader and writer ring handles
- the child enters those ring handles as context managers
- worker-local context is installed
- the worker callable is invoked exactly once
The callable itself owns its loop. The runtime does not inject a scheduler around normal tasks.
This is an important design point:
- the runtime provides transport and process orchestration
- task behavior still belongs to the task function
Worker Execution Model
The worker process sees the runtime through module-level context. Inside a worker, helpers like:
pythusa.get_reader(...)pythusa.get_writer(...)pythusa.get_event(...)
resolve objects that were installed by bootstrap.
That installation happens once per child process. It is safe because each worker is its own process with its own module state.
The bootstrap also enters every ring under an ExitStack, so reader activation and ring cleanup follow the ring object's context-manager lifecycle.
One platform note:
- on POSIX,
SIGTERMcan exit a worker cleanly through the installed handler - on Windows, process termination is effectively hard termination, so you should prefer normal task completion and explicit lifecycle management where possible
Minimal Raw Runtime Example
This is the simplest end-to-end example using Manager, RingSpec, and TaskSpec directly.
from __future__ import annotations
import time
import numpy as np
import pythusa
FRAME = np.arange(8, dtype=np.float32)
FRAME_NBYTES = FRAME.nbytes
def source() -> None:
writer = pythusa.get_writer("samples")
while writer.write_array(FRAME) == 0:
time.sleep(0.001)
def sink() -> None:
reader = pythusa.get_reader("samples")
while True:
frame = reader.read_array(FRAME_NBYTES, dtype=np.float32)
if frame.size == 0:
time.sleep(0.001)
continue
print(frame.reshape(8))
return
def main() -> None:
with pythusa.Manager() as manager:
manager.create_ring(
pythusa.RingSpec(
name="samples",
size=FRAME_NBYTES * 32,
num_readers=1,
)
)
manager.create_task(
pythusa.TaskSpec(
name="source",
fn=source,
writing_rings=("samples",),
)
)
manager.create_task(
pythusa.TaskSpec(
name="sink",
fn=sink,
reading_rings=("samples",),
)
)
manager.start("sink")
manager.start("source")
manager.join_all()
if __name__ == "__main__":
main()
Notes:
- the ring size is declared in bytes
- the sink is started before the source so the reader is alive first
read_array(...)andwrite_array(...)are raw byte-oriented helpers- on Windows and other
spawn-based environments, themain()guard is required for the same reason as the pipeline examples: child processes re-import the script module during startup
Raw Ring API
The main raw ring methods are:
write_array(arr) -> intread_array(nbytes, dtype) -> np.ndarrayexpose_writer_mem_view(size)expose_reader_mem_view(size)simple_write(...)simple_read(...)
The first two are the most common low-level entry points.
write_array
write_array(arr):
- treats the NumPy array as raw bytes
- writes it if enough space exists
- returns the number of bytes written
- returns
0if insufficient space exists
It does not validate application-level semantics such as expected shape.
read_array
read_array(nbytes, dtype):
- reads exactly
nbytesif available - returns an empty array if insufficient data exists
- interprets the bytes as
dtype - leaves shape reconstruction to the caller
Example:
frame = reader.read_array(FRAME_NBYTES, dtype=np.float32)
if frame.size:
frame = frame.reshape(8)
View And Copy Semantics
One important caveat:
- in the non-wrap case,
read_array(...)may return an array backed by the shared-memory view - in the wrap case, it must copy into a temporary contiguous buffer first
This means the raw path is powerful but less forgiving. If you keep arrays backed by shared memory alive for too long, shutdown and cleanup become easier to get wrong.
That is why the higher-level pipeline binding read() returns an owned copy by default.
Raw Ring Access From Pipeline
If you like Pipeline but still want the raw path in a specific task, use .raw or .ring from a stream binding:
def worker(samples, fft) -> None:
raw_reader = samples.raw
raw_writer = fft.raw
frame_nbytes = 4096 * np.dtype(np.float32).itemsize
while True:
frame = raw_reader.read_array(frame_nbytes, dtype=np.float32)
if frame.size == 0:
time.sleep(0.001)
continue
frame = frame.reshape(4096)
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
if raw_writer.write_array(spectrum) == spectrum.nbytes:
return
This is the escape hatch for users who want Pipeline orchestration with lower-level ring semantics inside selected tasks.
Convenience Stream Bindings
The pipeline stream bindings are wrappers over raw rings.
Reader bindings provide:
read()read_into(out)look()increment()set_blocking(bool)is_blocking().raw.ring
Writer bindings provide:
write(array)look()increment().raw.ring
Examples:
read():
def worker(samples, fft) -> None:
while True:
frame = samples.read()
if frame is None:
time.sleep(0.001)
continue
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
if fft.write(spectrum):
return
read_into(out):
def worker(samples, fft) -> None:
frame = np.empty((4096,), dtype=np.float32)
while True:
if not samples.read_into(frame):
time.sleep(0.001)
continue
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
if fft.write(spectrum):
return
look() and increment() on the reader:
def worker(samples, fft) -> None:
while True:
frame_view = samples.look()
if frame_view is None:
time.sleep(0.001)
continue
frame = np.frombuffer(frame_view, dtype=np.float32).reshape((4096,))
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
samples.increment()
if fft.write(spectrum):
return
set_blocking() and is_blocking():
def worker(samples) -> None:
samples.set_blocking(False)
if not samples.is_blocking():
frame_view = samples.look()
if frame_view is not None:
print(np.frombuffer(frame_view, dtype=np.float32))
samples.increment()
samples.set_blocking(True)
write():
def worker(samples, fft) -> None:
frame = samples.read()
if frame is None:
return
spectrum = np.fft.rfft(frame).astype(np.complex64, copy=False)
fft.write(spectrum)
look() and increment() on the writer:
def worker(samples, fft) -> None:
frame = samples.read()
if frame is None:
return
fft_view = fft.look()
if fft_view is None:
return
spectrum = np.frombuffer(fft_view, dtype=np.complex64).reshape((2049,))
spectrum[:] = np.fft.rfft(frame).astype(np.complex64, copy=False)
fft.increment()
look() returns a memoryview for the next contiguous frame and leaves the reader position unchanged. Call increment() after you are done with the view. If the next frame is wrapped across the ring boundary, look() returns None rather than copying.
Writers have the same pattern: look() returns a writable memoryview for the next contiguous frame, and increment() commits that frame once you are done filling it. If the next slot would wrap, look() returns None rather than copying.
If you need a stream to count as having a reader but do not want that reader to do any work, use pipe.add_task.terminator(...) at the pipeline level. It marks the bound readers inactive and then exits.
Blocking And Backpressure
By default, each reader participates in backpressure. The writer's available space is limited by the slowest active reader.
That is the runtime's default safety model:
- if a reader is alive and active, the writer must respect it
You can opt a reader out temporarily:
samples.set_blocking(False)
This marks the reader inactive. Inactive readers do not constrain the writer's writable space.
When you re-enable it:
samples.set_blocking(True)
the reader jumps to the current writer position. Unread backlog is discarded.
This is useful for "latest frame wins" consumers, but it is not a lossless mode.
Events And Controlled Tasks
At the runtime level, WorkerEvent exposes:
signal()wait(timeout=None)reset()is_open()pending
Semantics:
signal()increments pending activation count and opens the eventwait()blocks on the wake eventreset()consumes one pending activation- the wake event is only cleared when pending count reaches zero
Example:
def controller() -> None:
event = pythusa.get_event("flush")
event.signal()
def consumer() -> None:
event = pythusa.get_event("flush")
event.wait()
event.reset()
print("flush requested")
Important limitation:
- a
WorkerEventis not a general-purpose multi-consumer counted queue
The wake primitive is still event-like. If many consumers all wait on the same event, the ownership and reset semantics become ambiguous.
Recommended pattern:
- many signalers is fine
- one reset-owning consumer is the intended model
- if many downstream tasks need separate activations, use many events
Controlled Tasks
The pipeline control helpers sit above this runtime event model:
switchable: wait, then rerun without resettingtoggleable: wait, reset, then run once per activation
These wrappers do not change the worker model. They are still just runtime loops around the task callable.
Monitoring And Metrics
Manager.start_monitor(...) launches a daemon monitor thread that samples:
- ring pressure
- worker CPU usage
- worker RSS
- worker nice level
Per-task snapshots are exposed as ProcessMetrics.
This is operational visibility, not deterministic timing analysis. Sampling can tell you that a stage is falling behind or starving, but it is not a substitute for application-level latency measurement.
Errors And Current Limits
The runtime layer is intentionally narrow and explicit.
Important constraints:
- one
TaskSpecmaps to one process - one ring has one writer handle and one reader slot per registered reader
- raw rings move bytes, not semantic frames
- the runtime does not provide automatic fan-in synchronization
- grouped multi-task workers are not currently a public feature
Some important error cases:
RingSpec(size <= 0)raisesValueErrorRingSpec(num_readers < 1)raisesValueErrorTaskSpecraisesValueErrorif the same ring appears in bothreading_ringsandwriting_ringsManager.create_task(...)raises on duplicate task names- starting a task that references an unregistered ring or event fails during bootstrap construction
Be especially careful on the raw path with:
- dtype mismatches
- shape mismatches
- assuming
read_array(...)returns owned memory - forgetting that ring size is declared in bytes
When To Bypass Pipeline
Use the runtime directly when you need:
- manual ring sizing in bytes
- direct ring header and pressure behavior
- custom startup order
- custom worker composition experiments
- benchmark or debugging scenarios that should avoid the higher-level binding layer
Stay with Pipeline when:
- your graph fits the machine-and-conveyor-belt model
- typed frame bindings are enough
- you do not need to manage raw ring semantics directly
The runtime is the right place for power users.
Pipeline is still the right default for almost everyone else.