Skip to content

Pipeline

The Pipeline is what executes your stages. This page is the reference for building, running, and observing one.

The shortest path is positional:

import scriva
from scriva.preprocess import deskew
from scriva.detect import morphological_grid
from scriva.recognize import openai
from scriva.export import excel
pipeline = scriva.Pipeline(
deskew(),
morphological_grid(),
openai(model="gpt-4o", cache=".scriva_cache"),
excel("out.xlsx"),
page_concurrency=4,
)

Pipeline(*stages, **options) slots each stage into its phase by sniffing its Protocol — out-of-order arguments still produce the correct chain. The builder validates capability compatibility between stages: if you wire a post-processor that requires Capability.LANGUAGE_DETECTION to a recogniser that lacks it, construction raises ConfigurationError before you ever load a document.

For when you want named phases, a stage you register more than once, or options threaded through:

from scriva import Pipeline
pipeline = (
Pipeline.builder()
.preprocess(deskew()) # 0..N
.detect(morphological_grid()) # exactly 1
.classify(rule_based()) # 0..1
.recognize(openai(model="gpt-4o"), cache=".scriva_cache") # exactly 1
.post_process(dictionary.from_yaml("corrections.yaml")) # 0..N
.export(excel("out.xlsx")) # 0..N
.options(page_concurrency=4, error_policy="continue")
.build()
)
MethodArgsNotes
preprocess(stage)a Preprocessorrepeatable; page-level
detect(stage)a LayoutDetectorexactly one
classify(stage)a RegionClassifieroptional
region_preprocess(stage)a RegionPreprocessorrepeatable; per-region transforms / slicers — see preprocessors.md › Region preprocessors
recognize(stage, *, cache=None)a Recognizerexactly one; cache is opt-in
post_process(stage)a PostProcessorrepeatable; runs in order added
export(stage)an Exporterrepeatable
stage(stage)any Stagegeneric slot for custom kinds
options(**kwargs)global pipeline optionssee below
build()returns a Pipeline
OptionTypeDefaultMeaning
page_concurrencyint1Parallel pages within one document run
error_policystr"continue""continue", "page", or "abort"
default_languagestrNoneISO 639-1; passed to language-aware stages
event_bufferint1024Max queued events before back-pressure
cancel_on_signalboolFalseInstall SIGINT handler tied to pipeline.cancel()
  • continue — log the error, attach an empty Recognition with error set, keep going. Default; right for batch runs.
  • page — abort the current page, continue with the next. Right for multi-page PDFs where one bad page should not poison the rest.
  • abort — raise and stop. Right for CLI / one-shot.

A pipeline is callable. Sync is the default; async is one method away.

result: DocumentResult = pipeline(document)
result: DocumentResult = await pipeline.aio(document)
# Streaming per page
async for page_result in pipeline.stream(document):
...

document may be a Document, a path, or bytes — pipeline("scan.pdf") loads internally. Pipeline is a description, not a session: run it on as many documents as you like.

If you call pipeline(doc) from inside a running event loop, scriva raises with a pointer to await pipeline.aio(doc) instead of silently hanging.

Pick the path that fits your host:

pipeline(document, on_event=lambda e: print(e.stage, e.kind))
async for item in pipeline.events(document):
if item.kind == "event":
print(item.event.stage, item.event.kind)
elif item.kind == "result":
result = item.result

pipeline.events(doc) owns the run — you get events and the final result through one channel.

from scriva.events import to_sse
async for chunk in to_sse(pipeline.events(document)):
yield chunk

Event shape is documented in architecture.md.

task = asyncio.create_task(pipeline.aio(document))
...
pipeline.cancel() # cooperative
await task # raises CancelledError once stages yield

Standard adapters check the cancel flag between regions and around every network call. Custom stages must check ctx.cancelled at any long-running yield point.

from scriva.recognize import anthropic
from scriva.postprocess import language_detector
pipeline.replace("recognize", anthropic(model="claude-opus-4-7"))
pipeline.insert_after("recognize", language_detector())
pipeline.remove("export")

The names are the Stage.name you set on each adapter — or the auto-derived kebab-cased class name when you didn’t. Built-in adapters use predictable names (detect_grid, recognize, export_excel, …) — see each adapter’s page.

When the project’s bar is “wrong answers are unacceptable, not just inconvenient,” scriva exposes three independent accuracy levers. They compose; production runs typically stack two of three.

LeverWhat it isWhen to reach for it
Human-in-the-loopPause the pipeline for a human to confirm layout / text / fields before commitHigh-stakes documents, low volume, or first-pass quality unknown
Cross-check against the original dataRender the recognised text back into pixels and score the match; or run two recognizers and score agreementHigh volume where humans can’t review every page; production confidence gating
Cross-check against ground-truth dataScore every run against a labelled corpus or known-good values; gate deploys on the scoreCI for pipeline / prompt / model changes; drift monitoring

Each lever has a canonical recipe below. They share the same primitives — RecognitionHint, confidence_score, result.merge, result.diff, scriva.eval — composed in different shapes.

Workflows that need a human to inspect detected regions before the (expensive) recognizer runs — toggle blank flags, fix merged spans, drop spurious cells. Decompose the run into two pipelines and persist the layout between them:

import scriva
from scriva.preprocess import orientation, deskew
from scriva.detect import morphological_grid, box_annotations
from scriva.classify import rule_based
from scriva.recognize import openai
from scriva.export import json_, excel
# Phase 1 — detect only, write the layout to a sidecar
phase1 = scriva.Pipeline(
orientation(),
deskew(),
morphological_grid(),
rule_based(),
json_("layout.json", select={"regions"}),
)
phase1("scan.png")
# … your UI opens layout.json, the user edits regions, saves …
# Phase 2 — read the (possibly edited) layout and recognize
phase2 = scriva.Pipeline(
box_annotations("layout.json"),
openai(model="gpt-4o", cache=".scriva_cache"),
excel("out.xlsx"),
)
result = phase2("scan.png")

box_annotations exists for exactly this case: it reads regions from a JSON sidecar instead of computing them. The sidecar shape is the same as the regions field of result.to_json(), so round-tripping is lossless.

For SSE-driven UIs, emit phase 1’s finished event, hand control to the browser, and start phase 2 only after the client PUTs the edited layout back. The pipeline runs are independent — different sessions, no shared Context.

Lever 2: Cross-check against the original data

Section titled “Lever 2: Cross-check against the original data”

Two flavours, both cheap to wire in.

postprocess.confidence_score.rendering() takes each recognised string, renders it back as a glyph image, embeds both the original crop and the rendering, and computes cosine similarity. When the answer doesn’t visually match the source, the confidence drops — independent of whatever the recognizer reported about itself. This is the “the model has to be able to draw what it read” check.

import scriva
from scriva.detect import morphological_grid
from scriva.recognize import openai
from scriva.postprocess import confidence_score
from scriva.export import excel
pipeline = scriva.Pipeline(
morphological_grid(),
openai(model="gpt-4o"),
confidence_score.rendering(), # round-trip cross-check
excel("out.xlsx", confidence_thresholds=(0.6, 0.8)),
)
result = pipeline("scan.png")
to_review = result.low_confidence(threshold=0.6) # routes to lever 1 or 2c

This is the foundation of every other accuracy lever — the confidence it produces is what low_confidence(...) reads, what uncertainty_first sorts on, and what gates auto-post in Cookbook › Invoices › Confidence-gated auto-post.

2b. Multi-recognizer agreement (consensus)

Section titled “2b. Multi-recognizer agreement (consensus)”

When the original “data” is more authoritative than any one model, cross-check by running two or more recognizers in parallel and flagging disagreements:

from scriva.recognize import consensus, openai, anthropic, bedrock
recognize = consensus(
openai(model="gpt-4o"),
anthropic(model="claude-opus-4-7"),
bedrock(model="qwen.qwen3-vl-235b-a22b"),
on_disagreement="confidence", # or "majority", "tiebreaker"
)

The pipeline records every member’s answer; the consensus wrapper resolves disagreements by the strategy you pick. See Recognizers › consensus for the full shape and cost trade-off.

For a softer variant — only re-run the uncertain regions through a stronger oracle — use recognize.uncertainty_first(primary, oracle, ...). That is the cheap shape that scales to large batches.

After a run, re-OCR only the regions whose confidence falls below a threshold, using the previous text as a hint:

import scriva
from scriva import RecognitionHint
from scriva.detect import box_annotations
from scriva.recognize import openai
from scriva.postprocess import confidence_score
from scriva.export import excel
# First pass
first = scriva.Pipeline(
morphological_grid(),
rule_based(),
openai(model="gpt-4o", cache=".scriva_cache"),
confidence_score.rendering(),
excel("out.xlsx"),
)
result = first("scan.png")
# Refinement — only the low-confidence regions, hint = previous text
refine = scriva.Pipeline(
box_annotations.from_result(
result,
where=lambda r: (r.confidence or 0) < 0.6,
),
openai(
model="gpt-4o",
prompt=scriva.prompts.Prompt.ocr_with_hint(),
hints=RecognitionHint.from_result(result),
),
confidence_score.rendering(),
excel("out_refined.xlsx"),
)
refined = refine("scan.png")
combined = result.merge(refined) # field-aware overlay; refined wins

RecognitionHint reaches the recognizer’s recognize(...) call through the hint= keyword. Built-in VLM recognizers splice the hint into the prompt; custom recognizers see it directly. See Recognizers › RecognitionHint for the full shape.

result.merge(other) is the symmetric counterpart of .diff(). It aligns regions by region_id and produces a new DocumentResult where each region’s Recognition is resolved by the chosen strategy:

def merge(
self,
other: "DocumentResult",
*,
strategy: Literal["right_wins", "left_wins", "highest_confidence"]
| Callable[[Recognition, Recognition], Recognition] = "right_wins",
) -> "DocumentResult": ...
StrategyBehaviour
"right_wins"other’s recognition replaces self’s for every region in other
"left_wins"self’s recognition is kept; other only contributes missing regions
"highest_confidence"The recognition with the higher confidence wins; ties go to other
Callable(left, right) -> Recognition — caller-supplied conflict resolution

Layout is taken from self unchanged; only Recognition rows are merged. Regions in other that are not present in self are appended, which makes merge safe for partial refinement passes that only recognise a subset of regions.

A confidence-aware callable strategy is the common shape:

def prefer_high_conf(left, right):
return right if (right.confidence or 0) >= (left.confidence or 0) + 0.1 else left
combined = result.merge(refined, strategy=prefer_high_conf)

Lever 3: Cross-check against ground-truth data

Section titled “Lever 3: Cross-check against ground-truth data”

When you have a corpus of labelled “known-right” examples — even small — score every run against them and gate downstream effects on the score. scriva.eval is the offline scorer; the same machinery is the right shape for production drift monitoring.

import scriva
from scriva.schemas import Invoice
# 1. CI gate — fail the build if F1 regresses
report = scriva.eval(
pipeline=scriva.presets.invoice.pipeline(),
ground_truth="./annotations/invoices/",
)
assert report.f1 >= 0.92, report.to_markdown()
# 2. Per-run regression check (production)
def on_each(src, invoice):
if golden := lookup_golden(src):
diff = compare_fields(invoice, golden, schema=Invoice)
if diff.has_changes:
alert_drift(src, diff)
scriva.batch(sources, schema=Invoice, on_each=on_each)

Two cross-check shapes — pick by where the labelled data lives:

  • Static corpus./annotations/ of golden JSONs paired with source files. Read in CI, scriva.eval prints precision / recall / F1 / calibration. See Evaluation › Ground-truth format and Evaluation › CI gates.
  • Live golden values — known-right fields for a subset of incoming docs (e.g. recurring vendor invoices where you already know the expected totals). Compare per-run and alert on drift; the same EvalReport.field_metrics shape works here too.

The two recognizer comparison shape — useful for picking a model or catching regressions when you swap one — is the same pattern run locally:

gpt = scriva.Pipeline(morphological_grid(), openai(model="gpt-4o"))
qwen = scriva.Pipeline(morphological_grid(), bedrock(model="qwen.qwen3-vl-235b-a22b"))
a = gpt("scan.png")
b = qwen("scan.png")
diff = a.diff(b) # DocumentDiff: per-region disagreements on text/confidence/language
for d in diff.regions:
print(d.region_id, d.left.text, "", d.right.text)

DocumentResult.diff(other) is field-aware: it compares text, confidence, and language per region and ignores deterministic fields like bbox. Useful both for active-learning sample selection and for regression-testing a model swap.

For deterministic CI without paying for the recognizer on every build, cache the recognizer responses against your fixtures — see Evaluation › Regression fixtures.

In production they stack. The typical full pipeline:

import scriva
from scriva.detect import morphological_grid
from scriva.recognize import uncertainty_first, openai, anthropic
from scriva.postprocess import confidence_score, dictionary
from scriva.export import excel
pipeline = scriva.Pipeline(
morphological_grid(),
uncertainty_first( # lever 2b
primary=openai(model="gpt-4o", cache=".scriva_cache"),
oracle=anthropic(model="claude-opus-4-7"),
k=20,
),
dictionary.from_samples(".scriva_samples"), # lever 3 (offline-learned corrections)
confidence_score.rendering(), # lever 2a
excel("out.xlsx"),
)
result = pipeline("scan.png")
if to_review := result.low_confidence(0.7): # lever 1 (review queue)
send_for_human_review(to_review)
else:
auto_post(result.fields)

— and the offline CI run uses lever 3 against ./annotations/ to gate the deploy. That is the full shape: cross-check on every run, gate-then-route by confidence, escalate to a human when the confidence isn’t there.

Subclass for stateful adapters:

from scriva import PostProcessor
class StripQuotes(PostProcessor):
async def process(self, page, layout, recognitions):
return {rid: r.with_text(r.text.strip('"')) if r.text else r
for rid, r in recognitions.items()}
pipeline = scriva.Pipeline(..., StripQuotes(), ...)

Decorate a function for stateless ones:

from scriva import postprocessor
@postprocessor
async def strip_quotes(page, layout, recognitions):
return {rid: r.with_text(r.text.strip('"')) if r.text else r
for rid, r in recognitions.items()}
pipeline = scriva.Pipeline(..., strip_quotes, ...)

@preprocessor, @detector, @recognizer, @postprocessor, and @exporter all exist. Each accepts name= and capabilities= keyword args if you want to override defaults. Custom stages are first-class — they can sit anywhere in the chain.