from abc import ABC, abstractmethod
from typing import Generator, TypeVar
A = TypeVar("A")
class Source(ABC):
@abstractmethod
def __iter__(self) -> Generator[A]:
pass
class Sink(ABC):
@abstractmethod
def write(self, data: A) -> None:
passIntroduction
In a previous post I covered the basics of functional stream processing with higher order functions. This was evaluated in a single Python process using Generators. When we develop using higher order functions we have no shared mutable state which means we can easily scale to multiple processes. Ideally we don’t want to re-write the existing pipeline using a new syntax, instead we want the flexibility to evaluate the pipeline using multiple interpreters.
In this post I will cover how to build a functional stream domain specific language (DSL) embedded in Python. A DSL is a language that is tailored to a specific domain and is easier to use than a general purpose language. In this case we will build a DSL for functional stream processing. An embedded DSL takes advantage of the host language’s features and syntax - making it much easier to integrate into existing codebases and for us to develop.
The Syntax
A DSL has two main components:
- The syntax
- The interpreter
We have already seen an example of the syntax we would like to use in a previous post.
pipeline = (
Pipeline()
.source(generator(10))
.map(lambda x: x + 1)
.flat_map(lambda x: [x, x + 1])
.reduce(0, lambda x, y: x + y)
.to_sink(sink)
)In that previous post, the Pipeline class used a builder pattern, we can chain the operations together to create a pipeline which is evaluated lazily using Generators. However, we would like to defer the evaluation of the pipeline until we provide an interpreter. We know that each pipeline has a Source and a Sink and then zero or more Map and FlatMap and Reduce operators. Using the builder pattern is an example of using the features of the host language, we can use autocomplete to suggest the available methods and we know which parameters each method expects and returns.
Let’s add base implementations for Source and Sink classes.
Next we define an Operator class which are the building blocks of the pipeline. Each operator contains a reference to the previous operator and the function to apply to the data. These don’t include any logic on how we evaluate each stage, that is the responsibility of the interpreter.
from dataclasses import dataclass
from typing import Callable, Iterable, Optional, TypeVar
B = TypeVar("B")
class Operator:
def __init__(self, prev: Optional['Operator'] = None):
self.prev = prev
class SourceOperator(Operator):
def __init__(self, source: Source):
super().__init__(None)
self.source = source
class Map(Operator):
def __init__(self, prev: Operator, func: Callable[[A], B]):
super().__init__(prev)
self.func = func
class FlatMap(Operator):
def __init__(self, prev: Operator, func: Callable[[A], Iterable[B]]):
super().__init__(prev)
self.func = func
class Reduce(Operator):
def __init__(self, prev: Operator, initial_value: A, func: Callable[[A, B], A]):
super().__init__(prev)
self.initial_value = initial_value
self.func = func
class SinkOperator(Operator):
def __init__(self, prev: Operator, sink: Sink):
super().__init__(prev)
self.sink = sinkWe’ve defined the same operators as in the previous post. But these are simply data which contains the previous operator and the function to apply. We can now define the Pipeline class using the builder pattern:
from typing import Generic
class Pipeline(Generic[A]):
def __init__(self, tail: Operator):
self.tail = tail
@property
def pipeline_operators(self) -> list[Operator]:
operators = []
current = self.tail
while current is not None:
operators.append(current)
if isinstance(current, SourceOperator):
break
current = current.prev
return operators[::-1]
@classmethod
def from_source(cls, source: Source) -> "Pipeline[A]":
return cls(SourceOperator(source))
def map(self, func: Callable[[A], B]) -> "Pipeline[B]":
return Pipeline(Map(self.tail, func))
def flat_map(self, func: Callable[[A], Iterable[B]]) -> "Pipeline[B]":
return Pipeline(FlatMap(self.tail, func))
def reduce(self, initial_value: A, func: Callable[[A, B], A]) -> "Pipeline[A]":
return Pipeline(Reduce(self.tail, initial_value, func))
def to_sink(self, sink: Sink) -> "Pipeline[None]":
return Pipeline(SinkOperator(self.tail, sink))Now we can define a straightforward FileSource class which reads from a text file line-by-line.
class FileSource(Source):
def __init__(self, uri: str):
self.uri = uri
def __iter__(self) -> Generator[str]:
with open(self.uri, 'r') as file:
for line in file:
yield lineNow we can define a Pipeline using the syntax we saw earlier:
def split_words(line: str) -> list[str]:
return line.split(" ")
word_count_pipeline = (
Pipeline.from_source(FileSource("file.txt"))
.flat_map(split_words)
.map(lambda x: 1)
.reduce(0, lambda x, y: x + y)
)This pipeline reads from a file, splits the lines into words, counts the number of words and then writes the result to the console.
We need to define a Sink class to write the results of the pipeline to a destination. Let’s start with a simple ConsoleSink that writes to the console.
class ConsoleSink(Sink):
def write(self, data) -> None:
print(data)We can chain the ConsoleSink to the pipeline using the to_sink method.
word_count_pipeline = word_count_pipeline.to_sink(ConsoleSink())This defines a pipeline - but we can’t evaluate it yet. To do that, we need to define an interpreter.
Defining an interpreter
The interpreter then navigates the list of operators and runs code depending on the type of operator. Each operator stores the previous operator in the prev attribute, we then loop oper the operators and run code depending on the type of operator. The first interpreter we will implement is a Mermaid diagram interpreter to allow us to visualize the pipeline.
Mermaid Diagram Interpreter
Next, we will define a Interpreter class which is a base class for all interpreters which simply has an evaluate method. The first concrete implementation we will implement is a Mermaid diagram interpreter to allow us to visualize the pipeline.
from abc import ABC, abstractmethod
class Interpreter(ABC):
@abstractmethod
def evaluate(self, pipeline: Pipeline[A]) -> None:
pass
class MermaidDiagramInterpreter(Interpreter):
def __get_name_from_function(self, func: Callable) -> str:
name = func.__name__ if hasattr(func, '__name__') else "lambda"
return "λ" if name == "<lambda>" else name
def evaluate(self, pipeline: Pipeline[A]) -> None:
operators = pipeline.pipeline_operators
lines = ["graph TB"]
for i, op in enumerate(operators):
node_id = f"node{i}"
match op:
case SourceOperator(source=source):
source_name = source.__class__.__name__
label = f"{node_id}[(Source: {source_name})]"
case Map(func=func):
name = self.__get_name_from_function(func)
label = f"{node_id}{{Map: {name}}}"
case FlatMap(func=func):
name = self.__get_name_from_function(func)
label = f"{node_id}{{FlatMap: {name}}}"
case Reduce(initial_value=initial_value, func=func):
name = self.__get_name_from_function(func)
label = f"{node_id}(Reduce: {name} init={initial_value})"
case SinkOperator(sink=sink):
sink_name = sink.__class__.__name__
label = f"{node_id}[/Sink: {sink_name}\\]"
case _:
label = f"{node_id}[Unknown Operator]"
lines.append(f" {label}")
if i < len(operators) - 1:
lines.append(f" node{i} --> node{i+1}")
mermaid_code = "\n".join(lines)
return mermaid_codeinterpreter = MermaidDiagramInterpreter()
mermaid_code = interpreter.evaluate(word_count_pipeline)
print(mermaid_code)graph TB
node0[(Source: FileSource)]
node0 --> node1
node1{FlatMap: split_words}
node1 --> node2
node2{Map: λ}
node2 --> node3
node3(Reduce: λ init=0)
node3 --> node4
node4[/Sink: ConsoleSink\]
graph TB
node0[(Source: FileSource)]
node0 --> node1
node1{FlatMap: split_words}
node1 --> node2
node2{Map: λ}
node2 --> node3
node3(Reduce: λ init=0)
node3 --> node4
node4[/Sink: ConsoleSink\]
We have used anonymous lambda functions in the pipeline, this makes the Mermaid diagram interpreter difficult to read. We can improve the readability by passing in a named function, or defining a __str__ method for the operator.
Local Interpreter
Next, we will define a LocalInterpreter class - this will behave in the same way as the pipeline defined in the previous post.
import itertools
class LocalInterpreter(Interpreter):
def evaluate(self, pipeline: Pipeline[A]) -> None:
operators = pipeline.pipeline_operators
stream: Generator[A] = None
for operator in operators:
match operator:
case SourceOperator(source=source):
stream = source.__iter__()
case Map(func=f):
stream = map(f, stream)
case FlatMap(func=f):
stream = itertools.chain.from_iterable(map(f, stream))
case Reduce(initial_value=iv, func=f):
result = iv
for x in stream:
result = f(result, x)
stream = iter([result])
case SinkOperator(sink=sink):
for item in stream:
sink.write(item)
return streamWe can now evaluate the pipeline using the LocalInterpreter.
interpreter = LocalInterpreter()
interpreter.evaluate(word_count_pipeline)13
<list_iterator at 0x10a6b14e0>
The pipeline is used to count the number of words in the file. The file contains two lines, one word per line, so the pipeline should output 13.
Multiprocessing Interpreter
Now, as promised in the introduction, we can use the same pipeline definition and evaluate it using multiple processes. For simplicity, we’ll use the multiprocessing library to create a pool of processes.
from multiprocess import Pool
class MultiprocessingInterpreter(Interpreter):
def __init__(self, num_processes: int = 4):
self.num_processes = num_processes
def evaluate(self, pipeline: Pipeline[A]) -> None:
operators = pipeline.pipeline_operators
stream: Generator[A] = None
with Pool(self.num_processes) as pool:
for operator in operators:
match operator:
case SourceOperator(source=source):
stream = source.__iter__()
case Map(func=f):
stream = pool.imap(f, stream)
case FlatMap(func=f):
stream = itertools.chain.from_iterable(pool.imap(f, stream))
case Reduce(initial_value=iv, func=f):
result = iv
for x in stream:
result = f(result, x)
stream = iter([result])
case SinkOperator(sink=sink):
for item in stream:
sink.write(item)We can now evaluate the pipeline using the MultiprocessingInterpreter.
interpreter = MultiprocessingInterpreter(num_processes=2)
interpreter.evaluate(word_count_pipeline)I haven’t ran the multiprocessing interpreter in this blog post - working with multiple processes within a notebook is not straightforward. We need to move it over to a separate script and run it there.
Conclusion
In this post we have seen how to build a functional stream DSL embedded in Python. We have seen how to define a pipeline using the builder pattern and how to evaluate it using a local interpreter and a multiprocessing interpreter. This allows us to separate the pipeline definition from the evaluation logic. It is often expensive and slow to dispatch a pipeline to a remote worker (build a Docker image, deploy to the cloud, provision infrastructure, etc.) and testing locally using a LocalInterpreter is much faster. This is how Apache Beam works, the Beam SDK provides a DSL for building pipelines and then you can execute the pipeline using a runner, such as Google Cloud Dataflow, Apache Flink or the locally using the Direct Runner.
In a future post I will consider how to effectively call remote procedure calls (RPCs) asynchronously from the Pipeline. This is useful for integrating with external services such as databases, LLM APIs and other systems.
Citation
@online{law2026,
author = {Law, Jonny},
title = {A {Functional} {Streaming} {DSL} Embedded in {Python}},
date = {2026-05-05},
langid = {en}
}