API Reference
processors
A collection of processors and their pipelines
ChainedPipeline
ChainedPipeline(*args: Any, **kwargs: Any)
Default pipeline (Pipeline
alias). Sequentially processes data through multiple stages.
Example
# Process data through multiple stages
pipeline = (
(ChainedPipeline[int]())
.pipe(lambda payload: payload + 1)
.pipe(lambda payload: payload * 2)
.pipe(lambda payload: payload + 1)
)
# Assert result
assert await pipeline.process(1) == 5
ChainedProcessor
Default processor. Sequentially processes data through multiple stages.
Example
# Create processor
processor = ChainedProcessor[int]()
# Stages
result = await processor.process(
payload=5,
stages=(
lambda payload: payload + 1,
lambda payload: payload * 2,
),
)
# Assert result
assert result == 12
InterruptiblePipeline
InterruptiblePipeline(
check: CheckCallable, *args: Any, **kwargs: Any
)
Pipeline with conditional interruption.
Parameters:
-
check
CheckCallable
) –Callable used to interrupt processing.
Example
# Interrupts when payload value exceeds 100
pipeline = (
InterruptiblePipeline[int](lambda payload: payload > 100)
.pipe(lambda payload: payload + 2)
.pipe(lambda payload: payload * 10)
.pipe(lambda payload: payload * 10)
)
# Process payload - will stop if value exceeds 100
assert await pipeline.process(5) == 70
InterruptibleProcessor
Processor with conditional interruption.
Parameters:
Example
# Interrupts when payload value exceeds 100
def check_value(payload: int) -> bool:
return payload > 100
# Create processor with the check
processor = InterruptibleProcessor(check_value)
# Process payload - will stop if value exceeds 100
result = await processor.process(initial_payload, stages)
check
instance-attribute
Callable for processing interruption. Useful for declarative subclassing.
Example
class MaxValueProcessor(InterruptibleProcessor[int, int]):
# interrupt if value exceeds 100
check = lambda x: x > 100