Pipeline
Pipeline
Section titled “Pipeline”The Pipeline is what executes your stages. This page is the reference for
building, running, and observing one.
Building
Section titled “Building”The shortest path is positional:
import scrivafrom scriva.preprocess import deskewfrom scriva.detect import morphological_gridfrom scriva.recognize import openaifrom scriva.export import excel
pipeline = scriva.Pipeline( deskew(), morphological_grid(), openai(model="gpt-4o", cache=".scriva_cache"), excel("out.xlsx"), page_concurrency=4,)Pipeline(*stages, **options) slots each stage into its phase by sniffing
its Protocol — out-of-order arguments still produce the correct chain. The
builder validates capability compatibility between stages: if you wire a
post-processor that requires Capability.LANGUAGE_DETECTION to a recogniser
that lacks it, construction raises ConfigurationError before you ever load
a document.
The fluent builder
Section titled “The fluent builder”For when you want named phases, a stage you register more than once, or options threaded through:
from scriva import Pipeline
pipeline = ( Pipeline.builder() .preprocess(deskew()) # 0..N .detect(morphological_grid()) # exactly 1 .classify(rule_based()) # 0..1 .recognize(openai(model="gpt-4o"), cache=".scriva_cache") # exactly 1 .post_process(dictionary.from_yaml("corrections.yaml")) # 0..N .export(excel("out.xlsx")) # 0..N .options(page_concurrency=4, error_policy="continue") .build())Builder methods
Section titled “Builder methods”| Method | Args | Notes |
|---|---|---|
preprocess(stage) | a Preprocessor | repeatable; page-level |
detect(stage) | a LayoutDetector | exactly one |
classify(stage) | a RegionClassifier | optional |
region_preprocess(stage) | a RegionPreprocessor | repeatable; per-region transforms / slicers — see preprocessors.md › Region preprocessors |
recognize(stage, *, cache=None) | a Recognizer | exactly one; cache is opt-in |
post_process(stage) | a PostProcessor | repeatable; runs in order added |
export(stage) | an Exporter | repeatable |
stage(stage) | any Stage | generic slot for custom kinds |
options(**kwargs) | global pipeline options | see below |
build() | — | returns a Pipeline |
Options
Section titled “Options”| Option | Type | Default | Meaning |
|---|---|---|---|
page_concurrency | int | 1 | Parallel pages within one document run |
error_policy | str | "continue" | "continue", "page", or "abort" |
default_language | str | None | ISO 639-1; passed to language-aware stages |
event_buffer | int | 1024 | Max queued events before back-pressure |
cancel_on_signal | bool | False | Install SIGINT handler tied to pipeline.cancel() |
Error policies
Section titled “Error policies”continue— log the error, attach an emptyRecognitionwitherrorset, keep going. Default; right for batch runs.page— abort the current page, continue with the next. Right for multi-page PDFs where one bad page should not poison the rest.abort— raise and stop. Right for CLI / one-shot.
Running
Section titled “Running”A pipeline is callable. Sync is the default; async is one method away.
result: DocumentResult = pipeline(document)result: DocumentResult = await pipeline.aio(document)
# Streaming per pageasync for page_result in pipeline.stream(document): ...document may be a Document, a path, or bytes — pipeline("scan.pdf")
loads internally. Pipeline is a description, not a session: run it on as
many documents as you like.
If you call pipeline(doc) from inside a running event loop, scriva raises
with a pointer to await pipeline.aio(doc) instead of silently hanging.
Observing
Section titled “Observing”Pick the path that fits your host:
Callback (recommended for most uses)
Section titled “Callback (recommended for most uses)”pipeline(document, on_event=lambda e: print(e.stage, e.kind))Async iterator of events + result
Section titled “Async iterator of events + result”async for item in pipeline.events(document): if item.kind == "event": print(item.event.stage, item.event.kind) elif item.kind == "result": result = item.resultpipeline.events(doc) owns the run — you get events and the final result
through one channel.
Server-Sent Events
Section titled “Server-Sent Events”from scriva.events import to_sseasync for chunk in to_sse(pipeline.events(document)): yield chunkEvent shape is documented in architecture.md.
Cancelling
Section titled “Cancelling”task = asyncio.create_task(pipeline.aio(document))...pipeline.cancel() # cooperativeawait task # raises CancelledError once stages yieldStandard adapters check the cancel flag between regions and around every
network call. Custom stages must check ctx.cancelled at any
long-running yield point.
Replacing and re-using stages
Section titled “Replacing and re-using stages”from scriva.recognize import anthropicfrom scriva.postprocess import language_detector
pipeline.replace("recognize", anthropic(model="claude-opus-4-7"))pipeline.insert_after("recognize", language_detector())pipeline.remove("export")The names are the Stage.name you set on each adapter — or the auto-derived
kebab-cased class name when you didn’t. Built-in adapters use predictable
names (detect_grid, recognize, export_excel, …) — see each adapter’s
page.
High-accuracy patterns
Section titled “High-accuracy patterns”When the project’s bar is “wrong answers are unacceptable, not just inconvenient,” scriva exposes three independent accuracy levers. They compose; production runs typically stack two of three.
| Lever | What it is | When to reach for it |
|---|---|---|
| Human-in-the-loop | Pause the pipeline for a human to confirm layout / text / fields before commit | High-stakes documents, low volume, or first-pass quality unknown |
| Cross-check against the original data | Render the recognised text back into pixels and score the match; or run two recognizers and score agreement | High volume where humans can’t review every page; production confidence gating |
| Cross-check against ground-truth data | Score every run against a labelled corpus or known-good values; gate deploys on the score | CI for pipeline / prompt / model changes; drift monitoring |
Each lever has a canonical recipe below. They share the same primitives
— RecognitionHint, confidence_score, result.merge,
result.diff, scriva.eval — composed in different shapes.
Lever 1: Human-in-the-loop review
Section titled “Lever 1: Human-in-the-loop review”Workflows that need a human to inspect detected regions before the (expensive) recognizer runs — toggle blank flags, fix merged spans, drop spurious cells. Decompose the run into two pipelines and persist the layout between them:
import scrivafrom scriva.preprocess import orientation, deskewfrom scriva.detect import morphological_grid, box_annotationsfrom scriva.classify import rule_basedfrom scriva.recognize import openaifrom scriva.export import json_, excel
# Phase 1 — detect only, write the layout to a sidecarphase1 = scriva.Pipeline( orientation(), deskew(), morphological_grid(), rule_based(), json_("layout.json", select={"regions"}),)phase1("scan.png")# … your UI opens layout.json, the user edits regions, saves …
# Phase 2 — read the (possibly edited) layout and recognizephase2 = scriva.Pipeline( box_annotations("layout.json"), openai(model="gpt-4o", cache=".scriva_cache"), excel("out.xlsx"),)result = phase2("scan.png")box_annotations exists for exactly this case: it reads regions from a
JSON sidecar instead of computing them. The sidecar shape is the same as
the regions field of result.to_json(), so round-tripping is lossless.
For SSE-driven UIs, emit phase 1’s finished event, hand control to the
browser, and start phase 2 only after the client PUTs the edited
layout back. The pipeline runs are independent — different sessions, no
shared Context.
Lever 2: Cross-check against the original data
Section titled “Lever 2: Cross-check against the original data”Two flavours, both cheap to wire in.
2a. Round-trip rendering check
Section titled “2a. Round-trip rendering check”postprocess.confidence_score.rendering()
takes each recognised string, renders it back as a glyph image, embeds
both the original crop and the rendering, and computes cosine
similarity. When the answer doesn’t visually match the source, the
confidence drops — independent of whatever the recognizer reported
about itself. This is the “the model has to be able to draw what it
read” check.
import scrivafrom scriva.detect import morphological_gridfrom scriva.recognize import openaifrom scriva.postprocess import confidence_scorefrom scriva.export import excel
pipeline = scriva.Pipeline( morphological_grid(), openai(model="gpt-4o"), confidence_score.rendering(), # round-trip cross-check excel("out.xlsx", confidence_thresholds=(0.6, 0.8)),)
result = pipeline("scan.png")to_review = result.low_confidence(threshold=0.6) # routes to lever 1 or 2cThis is the foundation of every other accuracy lever — the confidence
it produces is what low_confidence(...) reads, what
uncertainty_first sorts on, and what gates auto-post in
Cookbook › Invoices › Confidence-gated auto-post.
2b. Multi-recognizer agreement (consensus)
Section titled “2b. Multi-recognizer agreement (consensus)”When the original “data” is more authoritative than any one model, cross-check by running two or more recognizers in parallel and flagging disagreements:
from scriva.recognize import consensus, openai, anthropic, bedrock
recognize = consensus( openai(model="gpt-4o"), anthropic(model="claude-opus-4-7"), bedrock(model="qwen.qwen3-vl-235b-a22b"), on_disagreement="confidence", # or "majority", "tiebreaker")The pipeline records every member’s answer; the consensus wrapper
resolves disagreements by the strategy you pick. See
Recognizers › consensus for the full
shape and cost trade-off.
For a softer variant — only re-run the uncertain regions through a
stronger oracle — use
recognize.uncertainty_first(primary, oracle, ...).
That is the cheap shape that scales to large batches.
2c. Confidence-driven re-OCR
Section titled “2c. Confidence-driven re-OCR”After a run, re-OCR only the regions whose confidence falls below a
threshold, using the previous text as a hint:
import scrivafrom scriva import RecognitionHintfrom scriva.detect import box_annotationsfrom scriva.recognize import openaifrom scriva.postprocess import confidence_scorefrom scriva.export import excel
# First passfirst = scriva.Pipeline( morphological_grid(), rule_based(), openai(model="gpt-4o", cache=".scriva_cache"), confidence_score.rendering(), excel("out.xlsx"),)result = first("scan.png")
# Refinement — only the low-confidence regions, hint = previous textrefine = scriva.Pipeline( box_annotations.from_result( result, where=lambda r: (r.confidence or 0) < 0.6, ), openai( model="gpt-4o", prompt=scriva.prompts.Prompt.ocr_with_hint(), hints=RecognitionHint.from_result(result), ), confidence_score.rendering(), excel("out_refined.xlsx"),)refined = refine("scan.png")combined = result.merge(refined) # field-aware overlay; refined winsRecognitionHint reaches the recognizer’s recognize(...) call through
the hint= keyword. Built-in VLM recognizers splice the hint into the
prompt; custom recognizers see it directly. See
Recognizers › RecognitionHint for the
full shape.
DocumentResult.merge
Section titled “DocumentResult.merge”result.merge(other) is the symmetric counterpart of .diff(). It
aligns regions by region_id and produces a new DocumentResult where
each region’s Recognition is resolved by the chosen strategy:
def merge( self, other: "DocumentResult", *, strategy: Literal["right_wins", "left_wins", "highest_confidence"] | Callable[[Recognition, Recognition], Recognition] = "right_wins",) -> "DocumentResult": ...| Strategy | Behaviour |
|---|---|
"right_wins" | other’s recognition replaces self’s for every region in other |
"left_wins" | self’s recognition is kept; other only contributes missing regions |
"highest_confidence" | The recognition with the higher confidence wins; ties go to other |
Callable | (left, right) -> Recognition — caller-supplied conflict resolution |
Layout is taken from self unchanged; only Recognition rows are
merged. Regions in other that are not present in self are appended,
which makes merge safe for partial refinement passes that only
recognise a subset of regions.
A confidence-aware callable strategy is the common shape:
def prefer_high_conf(left, right): return right if (right.confidence or 0) >= (left.confidence or 0) + 0.1 else left
combined = result.merge(refined, strategy=prefer_high_conf)Lever 3: Cross-check against ground-truth data
Section titled “Lever 3: Cross-check against ground-truth data”When you have a corpus of labelled “known-right” examples — even small
— score every run against them and gate downstream effects on the
score. scriva.eval is the offline scorer; the same
machinery is the right shape for production drift monitoring.
import scrivafrom scriva.schemas import Invoice
# 1. CI gate — fail the build if F1 regressesreport = scriva.eval( pipeline=scriva.presets.invoice.pipeline(), ground_truth="./annotations/invoices/",)assert report.f1 >= 0.92, report.to_markdown()
# 2. Per-run regression check (production)def on_each(src, invoice): if golden := lookup_golden(src): diff = compare_fields(invoice, golden, schema=Invoice) if diff.has_changes: alert_drift(src, diff)
scriva.batch(sources, schema=Invoice, on_each=on_each)Two cross-check shapes — pick by where the labelled data lives:
- Static corpus —
./annotations/of golden JSONs paired with source files. Read in CI, scriva.eval prints precision / recall / F1 / calibration. See Evaluation › Ground-truth format and Evaluation › CI gates. - Live golden values — known-right fields for a subset of incoming
docs (e.g. recurring vendor invoices where you already know the
expected totals). Compare per-run and alert on drift; the same
EvalReport.field_metricsshape works here too.
The two recognizer comparison shape — useful for picking a model or catching regressions when you swap one — is the same pattern run locally:
gpt = scriva.Pipeline(morphological_grid(), openai(model="gpt-4o"))qwen = scriva.Pipeline(morphological_grid(), bedrock(model="qwen.qwen3-vl-235b-a22b"))
a = gpt("scan.png")b = qwen("scan.png")
diff = a.diff(b) # DocumentDiff: per-region disagreements on text/confidence/languagefor d in diff.regions: print(d.region_id, d.left.text, "≠", d.right.text)DocumentResult.diff(other) is field-aware: it compares text,
confidence, and language per region and ignores deterministic fields
like bbox. Useful both for active-learning sample selection and for
regression-testing a model swap.
For deterministic CI without paying for the recognizer on every build, cache the recognizer responses against your fixtures — see Evaluation › Regression fixtures.
Combining the three levers
Section titled “Combining the three levers”In production they stack. The typical full pipeline:
import scrivafrom scriva.detect import morphological_gridfrom scriva.recognize import uncertainty_first, openai, anthropicfrom scriva.postprocess import confidence_score, dictionaryfrom scriva.export import excel
pipeline = scriva.Pipeline( morphological_grid(), uncertainty_first( # lever 2b primary=openai(model="gpt-4o", cache=".scriva_cache"), oracle=anthropic(model="claude-opus-4-7"), k=20, ), dictionary.from_samples(".scriva_samples"), # lever 3 (offline-learned corrections) confidence_score.rendering(), # lever 2a excel("out.xlsx"),)
result = pipeline("scan.png")
if to_review := result.low_confidence(0.7): # lever 1 (review queue) send_for_human_review(to_review)else: auto_post(result.fields)— and the offline CI run uses lever 3 against ./annotations/ to gate
the deploy. That is the full shape: cross-check on every run,
gate-then-route by confidence, escalate to a human when the confidence
isn’t there.
Writing your own stage
Section titled “Writing your own stage”Subclass for stateful adapters:
from scriva import PostProcessor
class StripQuotes(PostProcessor): async def process(self, page, layout, recognitions): return {rid: r.with_text(r.text.strip('"')) if r.text else r for rid, r in recognitions.items()}
pipeline = scriva.Pipeline(..., StripQuotes(), ...)Decorate a function for stateless ones:
from scriva import postprocessor
@postprocessorasync def strip_quotes(page, layout, recognitions): return {rid: r.with_text(r.text.strip('"')) if r.text else r for rid, r in recognitions.items()}
pipeline = scriva.Pipeline(..., strip_quotes, ...)@preprocessor, @detector, @recognizer, @postprocessor, and @exporter
all exist. Each accepts name= and capabilities= keyword args if you want
to override defaults. Custom stages are first-class — they can sit anywhere
in the chain.