Under the Hood

How does pure Python hit 73 Gbit/s? No C extensions, no Cython, no ctypes FFI. The answer is multiprocessing.shared_memory, memoryview, and a ring buffer that never touches the data more than it has to.

This page walks through the hot path -- the code that runs on every frame in a PYTHUSA pipeline.


Ring buffer memory layout

Everything starts with SharedRingBuffer, a subclass of multiprocessing.shared_memory.SharedMemory. A single POSIX shared-memory segment holds two regions: a uint64 header for coordination metadata and a byte payload for the actual ring data.

# pythusa/src/pythusa/_buffers/ring.py

class SharedRingBuffer(SharedMemory):
    def __init__(self, name, create, size, num_readers, reader, ...):
        ...
        super().__init__(name=name, create=create, size=self.shared_mem_size)

        # Header: a numpy uint64 array backed by the first bytes of
        # the shared-memory buffer. Fields: ring size, pressure counter,
        # dropped-byte counter, write position, max writable, num_readers,
        # and per-reader triples (position, alive flag, last_seen timestamp).
        self.header = np.ndarray(
            (header_u64_length(num_readers),),
            np.uint64,
            memoryview(self.buf[0:self.header_size]),
            0,
        )

        ...

        # Payload: a memoryview slice over the rest of the segment.
        # This IS the ring -- readers and writers operate directly on
        # this view. No intermediate buffers, no copies to get here.
        self.ring_buffer = memoryview(
            self.buf[self.header_size:self.header_size + self.ring_buffer_size]
        )

The header layout is computed at compile time and optionally cache-line aligned to avoid false sharing between the header and payload regions:

# pythusa/src/pythusa/_shared_memory/layout.py

HEADER_STATIC_FIELDS = 6   # size, pressure, dropped, write_pos, max_writable, num_readers
READER_FIELDS = 3           # position, alive, last_seen -- per reader slot
UINT64_BYTES = 8

def header_u64_length(num_readers: int) -> int:
    return HEADER_STATIC_FIELDS + (num_readers * READER_FIELDS)

def compute_header_size(num_readers, *, cache_align=False, cache_size=64):
    header_size = UINT64_BYTES * header_u64_length(num_readers)
    if not cache_align:
        return header_size
    return align_size(header_size, cache_size)

def reader_slot(reader: int) -> int:
    return HEADER_STATIC_FIELDS + (reader * READER_FIELDS)

One shared-memory segment. One memoryview. No serialization, no pickling.


Zero-copy writer path

When a producer wants to write a frame, it asks the ring for a direct memoryview into the payload region. The ring computes how much space is available and returns either one contiguous view or a split pair for wrap-around:

# pythusa/src/pythusa/_buffers/ring.py

def expose_writer_mem_view(self, size: int) -> RingView:
    self.compute_max_amount_writable()
    if self.max_amount_writable >= size:
        size_writeable = size
    else:
        size_writeable = self.max_amount_writable

    write_pos = self.int_to_pos(int(self.header[self.write_pos_index]))

    if write_pos + size_writeable <= self.ring_buffer_size:
        # Contiguous -- single memoryview slice, no copy needed.
        mv1 = memoryview(self.ring_buffer[write_pos: write_pos + size_writeable])
        mv2 = None
        wrap_around = False
    else:
        # Wrap-around -- two views: tail of ring + head of ring.
        mv1 = memoryview(self.ring_buffer[write_pos:])
        mv2 = memoryview(
            self.ring_buffer[0:(size_writeable - (self.ring_buffer_size - write_pos))]
        )
        wrap_around = True

    return (mv1, mv2, size_writeable, wrap_around)

The write_array convenience method writes a numpy array in one call -- still zero-copy on the contiguous path:

def write_array(self, arr: np.ndarray) -> int:
    src = memoryview(arr).cast("B")
    nbytes = src.nbytes
    mv1, mv2, size_writeable, wrap_around = self.expose_writer_mem_view(nbytes)
    if size_writeable < nbytes:
        return 0
    self.simple_write((mv1, mv2, size_writeable, wrap_around), src)
    self.inc_writer_pos(nbytes)
    return nbytes

The user-facing API is even simpler. StreamWriter.look() returns a writable memoryview directly into shared memory; increment() publishes it:

# pythusa/src/pythusa/_pipeline/_stream_io.py

class StreamWriter:
    def look(self) -> memoryview | None:
        mv1, mv2, size_writeable, wrap_around = self.raw.expose_writer_mem_view(
            self.frame_nbytes
        )
        if size_writeable < self.frame_nbytes or wrap_around or mv2 is not None:
            return None
        return mv1

    def increment(self) -> None:
        self.raw.inc_writer_pos(self.frame_nbytes)

Fill the view, call increment. That's the entire writer hot path.


Zero-copy reader path

The reader side mirrors the writer. expose_reader_mem_view returns a view into the payload at the reader's current position:

# pythusa/src/pythusa/_buffers/ring.py

def expose_reader_mem_view(self, size: int) -> RingView:
    write_pos = int(self.header[self.write_pos_index])
    read_pos = int(self.header[self.reader_pos_index])
    max_amount_readable = write_pos - read_pos

    if max_amount_readable > self.ring_buffer_size:
        # Reader fell behind beyond one full ring. Resync to writer.
        self.jump_to_writer()
        read_pos = int(self.header[self.reader_pos_index])
        max_amount_readable = 0

    ...

    reader_pos = self.int_to_pos(read_pos)
    if reader_pos + size_readable <= self.ring_buffer_size:
        mv1 = memoryview(self.ring_buffer[reader_pos: reader_pos + size_readable])
        mv2 = None
        wrap_around = False
    else:
        mv1 = memoryview(self.ring_buffer[reader_pos:])
        remaining = size_readable - (self.ring_buffer_size - reader_pos)
        mv2 = memoryview(self.ring_buffer[0:remaining])
        wrap_around = True

    return (mv1, mv2, size_readable, wrap_around)

On the non-wrapped path, read_array uses np.frombuffer -- a view, not a copy -- directly into the shared-memory ring:

def read_array(self, nbytes: int, dtype: np.dtype) -> np.ndarray:
    mv1, mv2, size_readable, wrap_around = self.expose_reader_mem_view(nbytes)
    if size_readable < nbytes:
        return np.empty(0, dtype=dtype)
    if not wrap_around:
        arr = np.frombuffer(mv1, dtype=dtype)   # <-- view into ring memory
    else:
        buf = bytearray(size_readable)
        self.simple_read((mv1, mv2, size_readable, wrap_around), memoryview(buf))
        arr = np.frombuffer(buf, dtype=dtype)
    self.inc_reader_pos(size_readable)
    return arr

At the stream level, StreamReader.look() gives you the same direct view:

# pythusa/src/pythusa/_pipeline/_stream_io.py

class StreamReader:
    def look(self) -> memoryview | None:
        mv1, mv2, size_readable, wrap_around = self.raw.expose_reader_mem_view(
            self.frame_nbytes
        )
        if size_readable < self.frame_nbytes or wrap_around or mv2 is not None:
            return None
        return mv1

    def increment(self) -> None:
        self.raw.inc_reader_pos(self.frame_nbytes)

Inspect the view, call increment. Data never leaves shared memory unless you explicitly ask for a copy.


Backpressure without locks on the data path

The write path needs to know how much ring space is available. That requires knowing the position of the slowest reader. Scanning all reader slots on every write would be O(readers) per frame -- too expensive at 140,000 frames/s.

PYTHUSA uses a cached min-reader position that amortizes the scan:

# pythusa/src/pythusa/_buffers/ring.py

def _scan_min_reader_pos(self) -> int:
    min_reader_pos = int(self.header[self.write_pos_index])
    for i in range(6, len(self.header), 3):
        reader_pos = int(self.header[i])
        reader_alive = int(self.header[i + 1])
        if reader_pos < min_reader_pos and reader_alive:
            min_reader_pos = reader_pos
    return min_reader_pos

def compute_max_amount_writable(self, force_rescan=False) -> int:
    write_pos = int(self.header[self.write_pos_index])

    if (
        force_rescan
        or self._min_reader_pos_cache is None
        or self._reader_positions_dirty
        or self._writes_since_min_scan >= self._min_reader_pos_refresh_interval
        or (time.perf_counter() - self._last_min_scan_t)
            >= self._min_reader_pos_refresh_s
    ):
        min_reader_pos = self._scan_min_reader_pos()
        self._min_reader_pos_cache = min_reader_pos
        self._writes_since_min_scan = 0
        self._reader_positions_dirty = False
        self._last_min_scan_t = time.perf_counter()
    else:
        min_reader_pos = self._min_reader_pos_cache

    used = write_pos - min_reader_pos
    self.max_amount_writable = self.ring_buffer_size - used
    return self.max_amount_writable

By default, the cache refreshes every 64 writes or every 5 ms, whichever comes first. Those thresholds are now configurable through RingSpec(...) and Pipeline.add_stream(...). Between refreshes the cached value is conservative -- it can only under-report writable space, never over-report. No locks, no atomics, no syscalls on the fast path.


Worker bootstrap

Each PYTHUSA worker is a separate OS process spawned via multiprocessing. The bootstrap overhead is minimal: re-attach shared-memory rings by name, install the per-process context, run the user function.

# pythusa/src/pythusa/_workers/bootstrap.py

@dataclass(slots=True)
class TaskBootstrap:
    name: str
    fn: Callable[..., Any]
    reading_ring_kwargs: dict[str, dict[str, Any]]
    writing_ring_kwargs: dict[str, dict[str, Any]]
    events: dict[str, WorkerEvent]
    args: tuple[Any, ...]
    kwargs: dict[str, Any]

    def __call__(self) -> None:
        reading_rings = {
            name: SharedRingBuffer(**kw)
            for name, kw in self.reading_ring_kwargs.items()
        }
        writing_rings = {
            name: SharedRingBuffer(**kw)
            for name, kw in self.writing_ring_kwargs.items()
        }

        with ExitStack() as stack:
            for ring in {**reading_rings, **writing_rings}.values():
                stack.enter_context(ring)
            context._install(reading_rings, writing_rings, self.events)
            Worker(fn=self._run_task)()

context._install populates the process-local registry that get_reader, get_writer, and get_event look up at runtime:

# pythusa/src/pythusa/_core/context.py

_reading_rings: dict[str, SharedRingBuffer] = {}
_writing_rings: dict[str, SharedRingBuffer] = {}
_events: dict[str, WorkerEvent] = {}

def get_reader(name: str) -> SharedRingBuffer:
    return _reading_rings[name]

def get_writer(name: str) -> SharedRingBuffer:
    return _writing_rings[name]

def _install(reading_rings, writing_rings, events) -> None:
    _reading_rings.update(reading_rings)
    _writing_rings.update(writing_rings)
    _events.update(events)

Module-level state is safe here because each worker is its own process. No shared mutable state, no locks, no coordination beyond the ring header.


Putting it together

A user-facing PYTHUSA pipeline compiles down to:

  1. Shared-memory segments -- one per stream, sized at frame_bytes * ring_depth.
  2. Header arrays -- uint64 metadata living at the start of each segment, optionally cache-aligned.
  3. memoryview slices -- writers and readers get direct pointers into the payload region.
  4. Cached backpressure -- writers amortize the min-reader scan, staying lock-free on the data path.
  5. Process-local registries -- each child re-attaches rings by name and runs with zero per-frame coordination overhead.

The result: 73 Gbit/s of FFT signal payload across 49 signals on a MacBook Air M2, with Python doing the orchestration and NumPy doing the math.

See the Showcase Demos for end-to-end benchmark results, or the Pipeline API to start building your own pipeline.