Skip to main content

HAPI FHIR Server Event-Driven Materialization

FHIR4DS can keep patient-level DQM results materialized from a HAPI FHIR JPA Server PostgreSQL backend.

The design separates change capture from measure calculation:

HAPI PostgreSQL trigger
-> fhir4ds_patient_change_queue
-> pg_notify('fhir4ds_patient_changed', ...)
-> FHIR4DS worker
-> fhir4ds_measure_result / fhir4ds_measure_audit

PostgreSQL triggers only enqueue changed patients. They do not run CQL or DQM logic inside the HAPI write transaction.

Install Tables and Triggers

Install the FHIR4DS-owned schema objects into the HAPI PostgreSQL database:

fhir4ds dqm hapi install \
--connection postgresql://hapi:hapi@localhost:15432/hapi

If your config customizes HAPI table or column names, install from that config so the trigger SQL is rendered against the same mapping:

fhir4ds dqm hapi install --config hapi-dqm.yaml

This creates:

  • fhir4ds_patient_change_queue
  • fhir4ds_measure_config
  • fhir4ds_measure_run
  • fhir4ds_measure_result
  • fhir4ds_measure_audit
  • fhir4ds_hapi_current_resources
  • trigger functions on hfj_resource and hfj_res_ver

Configure Measures

Use a YAML or JSON materialization config:

postgres:
connection_string: postgresql://hapi:hapi@localhost:15432/hapi
hapi_schema:
schema: public
resource_table: hfj_resource
version_table: hfj_res_ver
text_lob_column: res_text
decoded_view: fhir4ds_hapi_current_resources

worker:
batch_size: 100
max_attempts: 3
retry_backoff_seconds: 60
processing_timeout_seconds: 900

period:
start: "2026-01-01"
end: "2026-12-31"

defaults:
audit_mode: population

results:
persist_audit: true

measures:
- id: CMS122
enabled: true
path: /data/ecqm/Measure-CMS122.json
cql: /data/ecqm/CMS122.cql
version: "2025"

Sync the config into PostgreSQL:

fhir4ds dqm hapi sync-config --config hapi-dqm.yaml

The worker reads enabled rows from fhir4ds_measure_config. A config file may also carry measure definitions directly for local one-off runs.

postgres.hapi_schema is optional. The defaults match the current HAPI JPA PostgreSQL table layout, and the fields can be overridden for older HAPI versions or local table/column customizations. decoded_view tells HapiPostgresSource to read the PostgreSQL-side decoded view created by hapi install, which supports both inline JSON in res_text_vc and uncompressed JSON stored in the res_text large-object column.

Process Changes

Process one batch:

fhir4ds dqm hapi process-queue --config hapi-dqm.yaml --limit 100

Run continuously with LISTEN/NOTIFY and polling fallback:

fhir4ds dqm hapi listen --config hapi-dqm.yaml

The durable queue is the source of truth. Notifications are wake-up messages only; if the worker is offline, pending rows remain in the queue.

Failed rows are retried with linear backoff until worker.max_attempts is reached. Rows left in processing longer than worker.processing_timeout_seconds are returned to pending or marked failed if they have already exhausted their attempts. The listen worker handles SIGINT/SIGTERM and logs structured JSON events at the configured log level:

fhir4ds dqm hapi listen --config hapi-dqm.yaml --log-level INFO

The continuous listen worker keeps one DuckDB/evaluator runtime open and compiles each configured measure once per static configuration. It executes each claimed patient batch through a temporary target patient table. Patient IDs are supplied at execution time, so a new batch does not force SQL regeneration. CQL parameters, included libraries, audit mode, and other result-shaping options remain part of the compile cache key.

Result Storage

fhir4ds_measure_result is the indexed current/history table. Recalculation deactivates the previous active row for (patient_id, measure_id) and inserts a new row.

Full audit is stored separately in fhir4ds_measure_audit when persist_audit is enabled. This keeps current-result queries small while retaining evidence for later review.

fhir4ds_measure_run records the batch size, measure count, status, error text, compiled SQL cache hits/misses, compile time, execution time, prepared statement counts, and metrics_json. The top-level run columns are per-batch deltas; for the continuous worker, metrics_json.cumulative also carries the lifetime worker counters.

Apply retention windows from the config with:

fhir4ds dqm hapi prune --config hapi-dqm.yaml

retention.audit_days removes old audit rows, retention.inactive_result_days removes old inactive result rows, and retention.run_days removes old run rows that are no longer referenced by retained results. Active result rows are never removed by the prune command.

Worker Container

Build and run the worker image with the compose profile:

cd docker/hapi-postgres
docker compose --profile worker up --build worker

The worker image installs fhir4ds-v2[hapi], so psycopg is available inside the container. The compose profile mounts the repository read-only at /workspace and reads /config/hapi-materialization.yaml.

Current Scope

This integration supports HAPI current resources where res_encoding = 'JSON'. It can read inline JSON from hfj_res_ver.res_text_vc, and the installed decoded view can decode uncompressed JSON from the PostgreSQL large-object column hfj_res_ver.res_text. Compressed JSONC resource bodies are detected and remain outside the v1 scope.

2025 eCQM Smoke Testing

Use the repository's 2025 conformance fixture to load selected test patients into the local HAPI server:

python3 scripts/hapi/load_2025_measure.py \
--measure CMS122 \
--base-url http://localhost:18080/fhir \
--limit-patients 1

The script prints a suggested measures[] config entry with the discovered Measure bundle, CQL file, include paths, and ValueSet paths. Start with one measure and one patient, then expand the patient limit and measure set after the queue/result workflow is behaving as expected.

For an automated end-to-end smoke against the local compose stack:

python3 scripts/hapi/smoke_2025_materialization.py \
--measure CMS122 \
--limit-patients 1