Skip to content

API Reference

processors

A collection of processors and their pipelines

ChainedPipeline

ChainedPipeline(*args: Any, **kwargs: Any)

Default pipeline (Pipeline alias). Sequentially processes data through multiple stages.

Example
# Process data through multiple stages
pipeline = (
    (ChainedPipeline[int]())
    .pipe(lambda payload: payload + 1)
    .pipe(lambda payload: payload * 2)
    .pipe(lambda payload: payload + 1)
)

# Assert result
assert await pipeline.process(1) == 5

ChainedProcessor

Default processor. Sequentially processes data through multiple stages.

Example
# Create processor
processor = ChainedProcessor[int]()

# Stages
result = await processor.process(
    payload=5,
    stages=(
        lambda payload: payload + 1,
        lambda payload: payload * 2,
    ),
)

# Assert result
assert result == 12

InterruptiblePipeline

InterruptiblePipeline(
    check: CheckCallable, *args: Any, **kwargs: Any
)

Pipeline with conditional interruption.

Parameters:

  • check

    (CheckCallable) –

    Callable used to interrupt processing.

Example
# Interrupts when payload value exceeds 100
pipeline = (
    InterruptiblePipeline[int](lambda payload: payload > 100)
    .pipe(lambda payload: payload + 2)
    .pipe(lambda payload: payload * 10)
    .pipe(lambda payload: payload * 10)
)

# Process payload - will stop if value exceeds 100
assert await pipeline.process(5) == 70

InterruptibleProcessor

InterruptibleProcessor(check: CheckCallable[T_in])

Processor with conditional interruption.

Parameters:

  • check

    (CheckCallable[T_in]) –

    Callable for processing interruption.

Example
# Interrupts when payload value exceeds 100
def check_value(payload: int) -> bool:
    return payload > 100

# Create processor with the check
processor = InterruptibleProcessor(check_value)

# Process payload - will stop if value exceeds 100
result = await processor.process(initial_payload, stages)

check instance-attribute

check: CheckCallable[T_in] = check

Callable for processing interruption. Useful for declarative subclassing.

Example
class MaxValueProcessor(InterruptibleProcessor[int, int]):
    # interrupt if value exceeds 100
    check = lambda x: x > 100