A Functional Streaming DSL embedded in Python

A brief description of your post.
Author
Published

May 5, 2026

Introduction

In a previous post I covered the basics of functional stream processing with higher order functions. This was evaluated in a single Python process using Generators. When we develop using higher order functions we have no shared mutable state which means we can easily scale to multiple processes. Ideally we don’t want to re-write the existing pipeline using a new syntax, instead we want the flexibility to evaluate the pipeline using multiple interpreters.

In this post I will cover how to build a functional stream domain specific language (DSL) embedded in Python. A DSL is a language that is tailored to a specific domain and is easier to use than a general purpose language. In this case we will build a DSL for functional stream processing. An embedded DSL takes advantage of the host language’s features and syntax - making it much easier to integrate into existing codebases and for us to develop.

The Syntax

A DSL has two main components:

  1. The syntax
  2. The interpreter

We have already seen an example of the syntax we would like to use in a previous post.

pipeline = (
    Pipeline()
    .source(generator(10))
    .map(lambda x: x + 1)
    .flat_map(lambda x: [x, x + 1])
    .reduce(0, lambda x, y: x + y)
    .to_sink(sink)
)

In that previous post, the Pipeline class used a builder pattern, we can chain the operations together to create a pipeline which is evaluated lazily using Generators. However, we would like to defer the evaluation of the pipeline until we provide an interpreter. We know that each pipeline has a Source and a Sink and then zero or more Map and FlatMap and Reduce operators. Using the builder pattern is an example of using the features of the host language, we can use autocomplete to suggest the available methods and we know which parameters each method expects and returns.

Let’s add base implementations for Source and Sink classes.

from abc import ABC, abstractmethod
from typing import Generator, TypeVar

A = TypeVar("A")

class Source(ABC):
    @abstractmethod
    def __iter__(self) -> Generator[A]:
        pass

class Sink(ABC):
    @abstractmethod
    def write(self, data: A) -> None:
        pass

Next we define an Operator class which are the building blocks of the pipeline. Each operator contains a reference to the previous operator and the function to apply to the data. These don’t include any logic on how we evaluate each stage, that is the responsibility of the interpreter.

from dataclasses import dataclass
from typing import Callable, Iterable, Optional, TypeVar

B = TypeVar("B")

class Operator:
    def __init__(self, prev: Optional['Operator'] = None):
        self.prev = prev

class SourceOperator(Operator):
    def __init__(self, source: Source):
        super().__init__(None)
        self.source = source

class Map(Operator):
    def __init__(self, prev: Operator, func: Callable[[A], B]):
        super().__init__(prev)
        self.func = func

class FlatMap(Operator):
    def __init__(self, prev: Operator, func: Callable[[A], Iterable[B]]):
        super().__init__(prev)
        self.func = func

class Reduce(Operator):
    def __init__(self, prev: Operator, initial_value: A, func: Callable[[A, B], A]):
        super().__init__(prev)
        self.initial_value = initial_value
        self.func = func

class SinkOperator(Operator):
    def __init__(self, prev: Operator, sink: Sink):
        super().__init__(prev)
        self.sink = sink

We’ve defined the same operators as in the previous post. But these are simply data which contains the previous operator and the function to apply. We can now define the Pipeline class using the builder pattern:

from typing import Generic

class Pipeline(Generic[A]):
    def __init__(self, tail: Operator):
        self.tail = tail

    @property
    def pipeline_operators(self) -> list[Operator]:
        operators = []
        current = self.tail
        while current is not None:
            operators.append(current)
            if isinstance(current, SourceOperator):
                break
            current = current.prev
        return operators[::-1]

    @classmethod
    def from_source(cls, source: Source) -> "Pipeline[A]":
        return cls(SourceOperator(source))

    def map(self, func: Callable[[A], B]) -> "Pipeline[B]":
        return Pipeline(Map(self.tail, func))

    def flat_map(self, func: Callable[[A], Iterable[B]]) -> "Pipeline[B]":
        return Pipeline(FlatMap(self.tail, func))

    def reduce(self, initial_value: A, func: Callable[[A, B], A]) -> "Pipeline[A]":
        return Pipeline(Reduce(self.tail, initial_value, func))

    def to_sink(self, sink: Sink) -> "Pipeline[None]":
        return Pipeline(SinkOperator(self.tail, sink))

Now we can define a straightforward FileSource class which reads from a text file line-by-line.

class FileSource(Source):
    def __init__(self, uri: str):
        self.uri = uri

    def __iter__(self) -> Generator[str]:
        with open(self.uri, 'r') as file:
            for line in file:
                yield line

Now we can define a Pipeline using the syntax we saw earlier:

def split_words(line: str) -> list[str]:
    return line.split(" ")

word_count_pipeline = (
    Pipeline.from_source(FileSource("file.txt"))
    .flat_map(split_words)
    .map(lambda x: 1)
    .reduce(0, lambda x, y: x + y)
)

This pipeline reads from a file, splits the lines into words, counts the number of words and then writes the result to the console.

We need to define a Sink class to write the results of the pipeline to a destination. Let’s start with a simple ConsoleSink that writes to the console.

class ConsoleSink(Sink):
    def write(self, data) -> None:
        print(data)

We can chain the ConsoleSink to the pipeline using the to_sink method.

word_count_pipeline = word_count_pipeline.to_sink(ConsoleSink())

This defines a pipeline - but we can’t evaluate it yet. To do that, we need to define an interpreter.

Defining an interpreter

The interpreter then navigates the list of operators and runs code depending on the type of operator. Each operator stores the previous operator in the prev attribute, we then loop oper the operators and run code depending on the type of operator. The first interpreter we will implement is a Mermaid diagram interpreter to allow us to visualize the pipeline.

Mermaid Diagram Interpreter

Next, we will define a Interpreter class which is a base class for all interpreters which simply has an evaluate method. The first concrete implementation we will implement is a Mermaid diagram interpreter to allow us to visualize the pipeline.

from abc import ABC, abstractmethod

class Interpreter(ABC):
    @abstractmethod
    def evaluate(self, pipeline: Pipeline[A]) -> None:
        pass

class MermaidDiagramInterpreter(Interpreter):
    def __get_name_from_function(self, func: Callable) -> str:
        name = func.__name__ if hasattr(func, '__name__') else "lambda"
        return "λ" if name == "<lambda>" else name

    def evaluate(self, pipeline: Pipeline[A]) -> None:
        operators = pipeline.pipeline_operators
        lines = ["graph TB"]
        
        for i, op in enumerate(operators):
            node_id = f"node{i}"
            
            match op:
                case SourceOperator(source=source):
                    source_name = source.__class__.__name__
                    label = f"{node_id}[(Source: {source_name})]"
                case Map(func=func):
                    name = self.__get_name_from_function(func)
                    label = f"{node_id}{{Map: {name}}}"
                case FlatMap(func=func):
                    name = self.__get_name_from_function(func)
                    label = f"{node_id}{{FlatMap: {name}}}"
                case Reduce(initial_value=initial_value, func=func):
                    name = self.__get_name_from_function(func)
                    label = f"{node_id}(Reduce: {name} init={initial_value})"
                case SinkOperator(sink=sink):
                    sink_name = sink.__class__.__name__
                    label = f"{node_id}[/Sink: {sink_name}\\]"
                case _:
                    label = f"{node_id}[Unknown Operator]"

            lines.append(f"    {label}")
            
            if i < len(operators) - 1:
                lines.append(f"    node{i} --> node{i+1}")
        
        mermaid_code = "\n".join(lines)
        return mermaid_code
interpreter = MermaidDiagramInterpreter()
mermaid_code = interpreter.evaluate(word_count_pipeline)
print(mermaid_code)
graph TB
    node0[(Source: FileSource)]
    node0 --> node1
    node1{FlatMap: split_words}
    node1 --> node2
    node2{Map: λ}
    node2 --> node3
    node3(Reduce: λ init=0)
    node3 --> node4
    node4[/Sink: ConsoleSink\]

graph TB
    node0[(Source: FileSource)]
    node0 --> node1
    node1{FlatMap: split_words}
    node1 --> node2
    node2{Map: λ}
    node2 --> node3
    node3(Reduce: λ init=0)
    node3 --> node4
    node4[/Sink: ConsoleSink\]

We have used anonymous lambda functions in the pipeline, this makes the Mermaid diagram interpreter difficult to read. We can improve the readability by passing in a named function, or defining a __str__ method for the operator.

Local Interpreter

Next, we will define a LocalInterpreter class - this will behave in the same way as the pipeline defined in the previous post.

import itertools

class LocalInterpreter(Interpreter):
    def evaluate(self, pipeline: Pipeline[A]) -> None:
        operators = pipeline.pipeline_operators

        stream: Generator[A] = None
        for operator in operators:
            match operator:
                case SourceOperator(source=source):
                    stream = source.__iter__()
                case Map(func=f):
                    stream = map(f, stream)
                case FlatMap(func=f):
                    stream = itertools.chain.from_iterable(map(f, stream))
                case Reduce(initial_value=iv, func=f):
                    result = iv
                    for x in stream:
                        result = f(result, x)
                    stream = iter([result])
                case SinkOperator(sink=sink):
                    for item in stream:
                        sink.write(item)

        return stream

We can now evaluate the pipeline using the LocalInterpreter.

interpreter = LocalInterpreter()
interpreter.evaluate(word_count_pipeline)
13
<list_iterator at 0x10a6b14e0>

The pipeline is used to count the number of words in the file. The file contains two lines, one word per line, so the pipeline should output 13.

Multiprocessing Interpreter

Now, as promised in the introduction, we can use the same pipeline definition and evaluate it using multiple processes. For simplicity, we’ll use the multiprocessing library to create a pool of processes.

from multiprocess import Pool

class MultiprocessingInterpreter(Interpreter):
    def __init__(self, num_processes: int = 4):
        self.num_processes = num_processes

    def evaluate(self, pipeline: Pipeline[A]) -> None:
        operators = pipeline.pipeline_operators

        stream: Generator[A] = None

        with Pool(self.num_processes) as pool:
            for operator in operators:
                match operator:
                    case SourceOperator(source=source):
                        stream = source.__iter__()
                    case Map(func=f):
                        stream = pool.imap(f, stream)
                    case FlatMap(func=f):
                        stream = itertools.chain.from_iterable(pool.imap(f, stream))
                    case Reduce(initial_value=iv, func=f):
                        result = iv
                        for x in stream:
                            result = f(result, x)
                        stream = iter([result])
                    case SinkOperator(sink=sink):
                        for item in stream:
                            sink.write(item)

We can now evaluate the pipeline using the MultiprocessingInterpreter.

interpreter = MultiprocessingInterpreter(num_processes=2)
interpreter.evaluate(word_count_pipeline)

I haven’t ran the multiprocessing interpreter in this blog post - working with multiple processes within a notebook is not straightforward. We need to move it over to a separate script and run it there.

Conclusion

In this post we have seen how to build a functional stream DSL embedded in Python. We have seen how to define a pipeline using the builder pattern and how to evaluate it using a local interpreter and a multiprocessing interpreter. This allows us to separate the pipeline definition from the evaluation logic. It is often expensive and slow to dispatch a pipeline to a remote worker (build a Docker image, deploy to the cloud, provision infrastructure, etc.) and testing locally using a LocalInterpreter is much faster. This is how Apache Beam works, the Beam SDK provides a DSL for building pipelines and then you can execute the pipeline using a runner, such as Google Cloud Dataflow, Apache Flink or the locally using the Direct Runner.

In a future post I will consider how to effectively call remote procedure calls (RPCs) asynchronously from the Pipeline. This is useful for integrating with external services such as databases, LLM APIs and other systems.

Citation

BibTeX citation:
@online{law2026,
  author = {Law, Jonny},
  title = {A {Functional} {Streaming} {DSL} Embedded in {Python}},
  date = {2026-05-05},
  langid = {en}
}
For attribution, please cite this work as:
Law, Jonny. 2026. “A Functional Streaming DSL Embedded in Python.” May 5.