A spectroscopy-based diagnostic test is not a model. It is a data pipeline. The model sits in the middle, but the infrastructure around it - acquisition, conversion, quality control, transport, storage, inference, and result delivery - determines whether the test works reliably in a clinical environment or fails silently on a Tuesday afternoon because a network cable got unplugged.
This article is the reference architecture for that pipeline. It covers all six layers, from the instrument driver that reads photons off a detector to the HL7 message that delivers a classification result to a patient's medical record. Each layer has specific engineering requirements that differ from general-purpose data infrastructure, because spectral data has properties that most data engineers have never encountered:
- Fixed-length floating-point vectors with strict ordering constraints
- Sub-second latency budgets imposed by a clinician standing at the instrument
- Regulatory retention rules that span the lifetime of a medical device
- Quality metrics that are physics-based rather than statistical
If you are building a production spectroscopy data system - whether for clinical diagnostics, pharmaceutical quality control, or industrial process monitoring - this is the blueprint. We assume familiarity with Python, cloud infrastructure, and basic spectroscopy concepts. For the ML pipeline specifically (preprocessing, model training, validation), see Building AI Pipelines for Spectral Classification. For the clinical workflow that wraps this pipeline, see Building Clinical Workflow Software for Spectroscopy-Based Diagnostics. This article focuses on the data infrastructure that connects everything.
Reference Architecture
Here is the end-to-end pipeline. Each numbered layer is covered in detail below.
┌─────────────────────────────────────────────────────────────────────┐
│ SPECTROSCOPY DATA PIPELINE │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ 1. INSTRUMENT │ USB/Serial/TCP ──► Driver ──► Raw Binary │
│ │ ACQUISITION │ Health monitor, auto-calibration trigger │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 2. LOCAL │ Proprietary ──► JCAMP-DX / internal repr. │
│ │ PROCESSING │ QC: SNR, saturation, cosmic ray, baseline │
│ │ │ Metadata: patient ID, timestamp, operator │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 3. CLOUD │ TLS upload ──► queue ──► acknowledgment │
│ │ INGESTION │ Compression, retry, offline buffering │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 4. STORAGE │ Object store (raw) + metadata DB (indexed) │
│ │ & RETENTION │ 21 CFR Part 11 audit trail, versioned blobs │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 5. ML │ Model registry ──► inference service │
│ │ INFERENCE │ Real-time + batch, A/B, feature store │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 6. RESULT │ Classification ──► HL7v2/FHIR ──► LIS/EHR │
│ │ DELIVERY │ Audit trail, amendment workflow, alerting │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Layer 1: Instrument Acquisition
The pipeline starts at the detector. A spectrometer acquires a signal - an interferogram (FTIR), a scattered photon count (Raman), or an absorption measurement (NIR/UV-Vis) - and the acquisition layer is responsible for getting that data out of the instrument and into your software.
Communication Protocols
Every major spectrometer manufacturer uses a different communication approach. The three physical layers you will encounter:
| Protocol | Instruments | Typical Use | Latency |
|---|---|---|---|
| USB (USBTMC) | Bruker Alpha II, Ocean Insight spectrometers, Avantes | Benchtop instruments, portable analyzers | < 10 ms transfer |
| RS-232 Serial | Older Thermo Nicolet, PerkinElmer, some process analyzers | Legacy instruments, process spectroscopy | 50-200 ms at 115200 baud |
| TCP/IP (Ethernet) | Bruker Vertex/Tensor, Horiba LabRAM, Renishaw inVia | Research-grade instruments, remote instruments | < 5 ms on LAN |
USB instruments typically expose a USBTMC (USB Test & Measurement Class) interface or a virtual COM port. USBTMC gives you SCPI-like command/response semantics over USB bulk transfers. Virtual COM ports emulate serial communication - simpler to program but with the overhead of serial protocol framing.
TCP instruments use either a proprietary binary protocol or, increasingly, a REST API. Bruker's OPUS HTTP Server exposes REST endpoints for acquisition control. Horiba's ICL (Instrument Control Language) runs over TCP sockets. Renishaw's WiRE automation uses COM/DCOM on Windows.
Instrument SDKs and Drivers
The practical reality is that you rarely write raw USB or serial code. Each vendor provides an SDK, and the quality varies enormously:
| Vendor | SDK / Interface | Language | Platform | Notes |
|---|---|---|---|---|
| Bruker | OPUS HTTP Server | REST (any) | Windows | Requires OPUS running; HTTP commands trigger acquisition |
| Bruker | OPUS DDE | DDE (Python via pywin32) | Windows | Legacy but stable; OPUS must be running |
| Thermo Fisher | OMNIC SDK / Thermo Connect | COM/.NET | Windows | .NET SDK for Nicolet instruments |
| Horiba | LabSpec ICL | TCP socket (Python) | Windows/Linux | Text-based command protocol over TCP |
| Renishaw | WiRE Automation | COM/DCOM (Python via comtypes) | Windows | Requires WiRE license |
| Ocean Insight | OceanDirect / SeaBreeze | C/Python | Cross-platform | Open-source SeaBreeze for older models |
| Avantes | Avasoft SDK / AvaSoft-DLL | C/Python (ctypes) | Windows/Linux | DLL-based, good Python bindings |
| Wasatch Photonics | Wasatch.PY | Python | Cross-platform | Native Python, open-source |
The critical design decision: wrap every vendor SDK behind a uniform interface. Your pipeline should not care whether a spectrum came from a Bruker or a Thermo. The instrument adapter pattern from our spectral data formats article applies here at the acquisition level:
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
import numpy as np
@dataclass
class AcquisitionResult:
wavenumbers: np.ndarray
intensities: np.ndarray
instrument_id: str
acquired_at: datetime
acquisition_params: dict = field(default_factory=dict)
raw_path: str | None = None
class InstrumentDriver(ABC):
@abstractmethod
def connect(self) -> None: ...
@abstractmethod
def acquire(self, params: dict) -> AcquisitionResult: ...
@abstractmethod
def health_check(self) -> dict: ...
@abstractmethod
def disconnect(self) -> None: ...Real-Time vs. Batch Acquisition
Two acquisition patterns exist, and you need to support both:
Real-time (single-sample clinical). A clinician places a sample, presses a button, and waits. Acquisition takes 5-30 seconds depending on modality (FTIR ATR is fast, Raman with long integration times is slower). The pipeline must process and return a result within 1-2 seconds of acquisition completing. This is the latency-critical path.
Batch (multi-sample or process monitoring). A set of samples is loaded on an autosampler, or a process analyzer runs continuously. Spectra arrive at regular intervals - every 30 seconds for a process NIR probe, every 2 minutes for an autosampler cycle. Throughput matters more than single-sample latency.
Design the acquisition layer to handle both. A message-based architecture works: the instrument driver publishes an AcquisitionResult to a local queue (even just an in-process asyncio.Queue), and downstream processing subscribes to it. This decouples acquisition timing from processing timing.
Instrument Health Monitoring
Instruments drift. Lasers age, detectors degrade, ATR crystals get scratched, optical alignment shifts. If your pipeline does not monitor instrument health, you will discover degradation when a clinician calls to say "the results look wrong."
Monitor these metrics continuously:
- Background/reference spectrum stability. Compare each new background to the baseline reference. A drift in background intensity or shape beyond a threshold triggers a recalibration alert.
- Laser wavelength (Raman). Raman peak positions depend on laser wavelength. A small drift in the excitation laser shifts all peaks. Monitor using a reference material (silicon at 520.7 cm-1 or polystyrene).
- Detector response. Track the noise floor over time. Rising noise indicates detector aging or thermal issues.
- Environmental sensors. Temperature and humidity affect spectral measurements. Log these alongside every acquisition.
@dataclass
class InstrumentHealthReport:
instrument_id: str
timestamp: datetime
background_drift_pct: float
noise_floor_rms: float
laser_wavelength_nm: float | None
temperature_c: float | None
humidity_pct: float | None
status: str # "ok", "warning", "critical"
def check_background_drift(
current_bg: np.ndarray,
reference_bg: np.ndarray,
threshold_pct: float = 5.0
) -> tuple[float, bool]:
drift = np.mean(np.abs(current_bg - reference_bg)) / np.mean(np.abs(reference_bg)) * 100
return drift, drift < threshold_pctLayer 2: Local Processing
Raw data from the instrument is in a vendor-specific binary format and may contain artifacts. The local processing layer converts it to a standardized representation, validates quality, and attaches clinical metadata before anything leaves the instrument PC.
Format Conversion
Every instrument outputs a proprietary format: Bruker OPUS (.0), Thermo SPC (.spc), Renishaw WDF (.wdf), Horiba LabSpec (.l6s). Your pipeline needs to read all of them. We cover the parsing code for each format in detail in Instrument-Agnostic Spectral Data Formats - do not reinvent those parsers.
The conversion target is your internal normalized representation - not another file format, but an in-memory data structure with a defined schema:
@dataclass
class ProcessedSpectrum:
spectrum_id: str
wavenumbers: np.ndarray
intensities: np.ndarray
source_format: str
source_path: str
instrument_id: str
acquired_at: datetime
quality_metrics: dict
clinical_metadata: dict
preprocessing_version: strStore the raw vendor file alongside the processed representation. You will need the raw file for regulatory compliance, model retraining, and debugging. The processed representation is what moves through the rest of the pipeline.
Spectral Quality Checks
Not every acquisition produces a usable spectrum. The quality check layer acts as a gate - spectra that fail quality criteria are flagged for repeat measurement instead of being sent to the ML model. This is your first line of defense against garbage-in, garbage-out.
SNR (signal-to-noise ratio). The most universal quality metric. Calculated as the ratio of the signal amplitude in a diagnostically relevant region to the RMS noise in a region with no expected signal:
def calculate_snr(
wavenumbers: np.ndarray,
spectrum: np.ndarray,
signal_region: tuple[float, float] = (1600, 1700),
noise_region: tuple[float, float] = (1900, 2100)
) -> float:
signal_mask = (wavenumbers >= signal_region[0]) & (wavenumbers <= signal_region[1])
noise_mask = (wavenumbers >= noise_region[0]) & (wavenumbers <= noise_region[1])
signal_amplitude = np.max(spectrum[signal_mask]) - np.min(spectrum[signal_mask])
noise_segment = spectrum[noise_mask]
noise_rms = np.std(noise_segment - np.polyval(np.polyfit(
wavenumbers[noise_mask], noise_segment, 1
), wavenumbers[noise_mask]))
return signal_amplitude / noise_rms if noise_rms > 0 else 0.0For clinical FTIR, an SNR below 100 typically indicates insufficient sample contact or a contaminated ATR crystal. For Raman, the threshold depends on integration time - 20-50 is typical for 1-second exposures.
Saturation detection. When the detector signal exceeds its dynamic range, intensity values clip at a maximum. Saturated spectra produce distorted peak shapes and unreliable classification results:
def detect_saturation(
spectrum: np.ndarray,
saturation_threshold: float = 0.95
) -> dict:
max_val = np.max(np.abs(spectrum))
n_saturated = np.sum(np.abs(spectrum) > saturation_threshold * max_val)
is_saturated = n_saturated > len(spectrum) * 0.01
return {
"is_saturated": is_saturated,
"saturated_points": int(n_saturated),
"max_value": float(max_val)
}Cosmic ray removal (Raman-specific). Raman detectors (CCDs) are occasionally hit by cosmic rays, producing sharp, narrow spikes that are 10-100x taller than real peaks. These must be detected and removed before classification. The standard approach is a median-based filter:
def remove_cosmic_rays(
spectrum: np.ndarray,
threshold_sigma: float = 5.0,
window: int = 5
) -> tuple[np.ndarray, list[int]]:
from scipy.ndimage import median_filter
median_spectrum = median_filter(spectrum, size=window)
residuals = spectrum - median_spectrum
sigma = np.std(residuals)
spike_mask = np.abs(residuals) > threshold_sigma * sigma
cleaned = spectrum.copy()
cleaned[spike_mask] = median_spectrum[spike_mask]
spike_indices = list(np.where(spike_mask)[0])
return cleaned, spike_indicesBaseline anomaly detection. A spectrum with an abnormally high or distorted baseline - caused by fluorescence (Raman), scattering (FTIR), or sample preparation errors - may pass SNR checks but still produce unreliable results. Fit a polynomial baseline and check whether its amplitude relative to the signal is within normal bounds.
Quality Gate Decision
Combine all checks into a single pass/fail decision with a detailed report:
def quality_gate(
wavenumbers: np.ndarray,
spectrum: np.ndarray,
modality: str,
thresholds: dict | None = None
) -> dict:
defaults = {
"ftir": {"min_snr": 100, "max_baseline_ratio": 3.0},
"raman": {"min_snr": 20, "max_baseline_ratio": 5.0, "cosmic_ray_sigma": 5.0},
"nir": {"min_snr": 50, "max_baseline_ratio": 2.0},
}
t = thresholds or defaults.get(modality, defaults["ftir"])
snr = calculate_snr(wavenumbers, spectrum)
sat = detect_saturation(spectrum)
result = {
"passed": True,
"snr": snr,
"saturation": sat,
"failures": []
}
if snr < t["min_snr"]:
result["passed"] = False
result["failures"].append(f"SNR {snr:.1f} below minimum {t['min_snr']}")
if sat["is_saturated"]:
result["passed"] = False
result["failures"].append("Detector saturation detected")
if modality == "raman":
_, spikes = remove_cosmic_rays(spectrum, t.get("cosmic_ray_sigma", 5.0))
result["cosmic_rays_removed"] = len(spikes)
return resultMetadata Attachment
Every spectrum must carry clinical metadata that links it to a patient encounter. This metadata is attached at the local processing layer - before the spectrum leaves the instrument PC - because the instrument PC is where the clinician interaction happens.
Required metadata fields for a clinical deployment:
| Field | Source | Example |
|---|---|---|
patient_id | Barcode scan / manual entry | MRN-0047291 |
accession_number | LIS order | ACC-2026-183742 |
operator_id | Session login | tech-jsmith |
acquired_at | System clock (NTP-synced) | 2026-10-24T14:23:07.412Z |
instrument_id | Config file | BRUKER-ALPHA-SN4821 |
site_id | Config file | SITE-MAYO-ROCHESTER |
test_code | Workflow selection | FTIR-STREP-V2 |
specimen_type | Workflow config | throat-swab |
The acquired_at timestamp must come from an NTP-synchronized clock. Clinical audit trails require traceable timestamps. Do not use the instrument's internal clock - they drift.
Layer 3: Cloud Ingestion
The processed spectrum with quality metrics and clinical metadata now needs to reach the cloud for inference, storage, and result delivery. This layer handles the transport.
Data Size Considerations
Spectral data is small. Surprisingly small, compared to most clinical data types:
| Data Type | Typical Size | Spectra per MB |
|---|---|---|
| Single FTIR spectrum (4000 points, float32) | ~16 KB | 64 |
| Single Raman spectrum (1024 points, float32) | ~4 KB | 256 |
| FTIR with metadata + raw file | ~50-100 KB | 10-20 |
| Raman spectral map (100x100, 1024 pts each) | ~40 MB | 0.025 |
| Hyperspectral image (512x512, 200 bands) | ~100-200 MB | - |
A busy clinical site running 200 FTIR tests per day generates about 10-20 MB of spectral data. This is negligible bandwidth - a 3G cellular connection can handle it. The exception is hyperspectral imaging, which produces data volumes comparable to medical imaging (CT, MRI) and requires different transport strategies.
For single-spectrum clinical workflows, the upload protocol is straightforward. For hyperspectral or spectral mapping applications, consider compression (wavelet-based methods can achieve 10-30x lossless compression on spatially correlated spectral images) or edge inference (run the model locally and upload only the result plus a compressed representation).
Upload Protocol
Use HTTPS POST with structured payloads. Do not invent a custom binary protocol - the complexity is not justified for the data volumes involved.
import httpx
import gzip
import json
import base64
async def upload_spectrum(
spectrum: ProcessedSpectrum,
endpoint: str,
api_key: str,
timeout: float = 30.0
) -> dict:
payload = {
"spectrum_id": spectrum.spectrum_id,
"instrument_id": spectrum.instrument_id,
"acquired_at": spectrum.acquired_at.isoformat(),
"source_format": spectrum.source_format,
"preprocessing_version": spectrum.preprocessing_version,
"quality_metrics": spectrum.quality_metrics,
"clinical_metadata": spectrum.clinical_metadata,
"spectral_data": {
"wavenumbers": base64.b64encode(
spectrum.wavenumbers.astype(np.float32).tobytes()
).decode(),
"intensities": base64.b64encode(
spectrum.intensities.astype(np.float32).tobytes()
).decode(),
"n_points": len(spectrum.wavenumbers),
"encoding": "base64_float32_le"
}
}
body = gzip.compress(json.dumps(payload).encode())
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
content=body,
headers={
"Content-Type": "application/json",
"Content-Encoding": "gzip",
"Authorization": f"Bearer {api_key}",
"X-Spectrum-ID": spectrum.spectrum_id
},
timeout=timeout
)
response.raise_for_status()
return response.json()Base64-encoding the spectral arrays inside JSON is intentional. It adds ~33% overhead (a 16 KB spectrum becomes ~21 KB) but keeps the payload self-describing and parseable by any JSON tool. For a 16 KB spectrum, this overhead is irrelevant. For hyperspectral images, use multipart upload with the binary data as a separate part.
Offline Buffering and Retry
The instrument PC will lose network connectivity. Count on it. Hospital networks go down for maintenance. VPN tunnels drop. The pipeline must handle this gracefully:
- Write every processed spectrum to a local SQLite database before attempting upload
- Attempt upload immediately. If successful, mark the record as uploaded
- If upload fails, leave the record in the queue. A background worker retries every 30 seconds with exponential backoff
- Edge inference can run locally so the clinician gets a result even when offline. The cloud upload carries the result for storage, analytics, and regulatory record-keeping
import sqlite3
from pathlib import Path
class UploadBuffer:
def __init__(self, db_path: str = "upload_buffer.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS pending_uploads (
spectrum_id TEXT PRIMARY KEY,
payload BLOB NOT NULL,
created_at TEXT NOT NULL,
attempts INTEGER DEFAULT 0,
last_attempt TEXT,
status TEXT DEFAULT 'pending'
)
""")
self.conn.commit()
def enqueue(self, spectrum_id: str, payload: bytes) -> None:
self.conn.execute(
"INSERT OR REPLACE INTO pending_uploads "
"(spectrum_id, payload, created_at) VALUES (?, ?, datetime('now'))",
(spectrum_id, payload)
)
self.conn.commit()
def get_pending(self, limit: int = 10) -> list[tuple[str, bytes]]:
rows = self.conn.execute(
"SELECT spectrum_id, payload FROM pending_uploads "
"WHERE status = 'pending' ORDER BY created_at LIMIT ?",
(limit,)
).fetchall()
return rows
def mark_uploaded(self, spectrum_id: str) -> None:
self.conn.execute(
"UPDATE pending_uploads SET status = 'uploaded' WHERE spectrum_id = ?",
(spectrum_id,)
)
self.conn.commit()
def mark_failed(self, spectrum_id: str) -> None:
self.conn.execute(
"UPDATE pending_uploads SET attempts = attempts + 1, "
"last_attempt = datetime('now') WHERE spectrum_id = ?",
(spectrum_id,)
)
self.conn.commit()Queue-Based vs. Streaming Ingestion
For single-spectrum clinical workflows (the common case), a simple request-response pattern works. The instrument PC uploads a spectrum, the cloud acknowledges receipt and returns an inference result. There is no need for Kafka, RabbitMQ, or any message broker at this scale.
When you do need a message queue:
- Multi-site aggregation. Dozens of sites uploading spectra to a central platform. A queue (SQS, Cloud Tasks, or RabbitMQ) decouples ingestion from processing and handles burst traffic from sites coming back online after outages.
- Batch reprocessing. When you deploy a new model version and need to re-classify historical spectra, a queue distributes the workload across inference workers.
- Stream processing. Process NIR probes monitoring a bioreactor produce spectra every few seconds continuously. A streaming pipeline (Kafka, Kinesis) handles the throughput and provides a durable log for replay.
For most clinical spectroscopy deployments - single instruments at tens of sites - a queue is unnecessary overhead. Start with synchronous HTTPS. Add a queue when the architecture demands it.
Layer 4: Storage and Retention
Spectral data is a regulated medical record. How you store it, how long you keep it, and how you prove it has not been tampered with are not engineering preferences - they are regulatory requirements.
Spectral Data Lake Design
A two-tier architecture separates raw spectral files from queryable metadata:
┌──────────────────────────────────────────────┐
│ METADATA DATABASE │
│ (PostgreSQL / managed SQL) │
│ │
│ spectrum_id │ patient_id │ acquired_at │
│ instrument_id │ site_id │ test_code │
│ quality_snr │ model_ver │ result │
│ confidence │ raw_blob_key │ status │
│ operator_id │ amendment_of │ audit_hash │
├───────────────────────┬──────────────────────┤
│ │ │
│ ┌───────────▼──────────┐ │
│ │ OBJECT STORAGE │ │
│ │ (S3 / Azure Blob │ │
│ │ / GCS) │ │
│ │ │ │
│ │ /raw/{date}/{id}/ │ │
│ │ spectrum.opus │ │
│ │ spectrum.jdx │ │
│ │ processed.npz │ │
│ │ quality.json │ │
│ │ audit.json │ │
│ └──────────────────────┘ │
└──────────────────────────────────────────────┘
Object storage holds the immutable spectral data: the raw vendor file (OPUS, SPC, WDF), the JCAMP-DX conversion, the processed NumPy array, quality metrics, and audit records. Objects are write-once - never modified after creation. Use object versioning (S3 versioning, Azure Blob snapshots) as a belt-and-suspenders measure.
Metadata database holds everything you need to query without downloading the spectral data: patient identifiers, instrument IDs, timestamps, quality scores, classification results, model versions. This is where your API queries run. The raw_blob_key column links each metadata row to its spectral data in object storage.
Object Storage Key Structure
Use a date-partitioned key structure that supports both single-spectrum lookups and batch retrieval:
s3://spectradx-data-{env}/
raw/
2026/10/24/
{spectrum_id}/
source.opus # original vendor file
converted.jdx # JCAMP-DX conversion
processed.npz # preprocessed array (wavenumbers + intensities)
quality.json # quality gate results
metadata.json # clinical metadata
audit.json # creation audit record
results/
2026/10/24/
{spectrum_id}/
classification.json # model output
hl7_message.json # generated HL7/FHIR message
delivery_receipt.json # LIS acknowledgment
21 CFR Part 11 Compliance
If your spectroscopy system produces results that contribute to clinical decisions - and if you are marketing it as a diagnostic device in the US - you must comply with 21 CFR Part 11 for electronic records.
Part 11 does not prescribe specific retention periods. It requires that electronic records be "protected to enable their accurate and ready retrieval throughout the records retention period." The retention period itself is defined by the predicate rule - for medical devices under 21 CFR Part 820 (now QMSR), records must be retained for the design and expected life of the device, but not less than 2 years from commercial distribution.
In practice, the requirements break down to:
| Requirement | Implementation |
|---|---|
| Audit trail | Append-only log of every create, read, modify, and delete event. Every entry includes who, what, when, and why. |
| Electronic signatures | Operator authentication (username + password at minimum) linked to every signed record. Signing manifests are cryptographically bound to the record content. |
| Record integrity | Hash each spectral record (SHA-256) at creation. Store the hash separately from the record. Verify periodically. |
| Access controls | Role-based access: operators can create and view, supervisors can release and amend, administrators can configure. No one can delete. |
| Record retention | Write-once storage (S3 Object Lock, Azure Immutable Blob). Retention period configurable per site (typically 7-10 years for clinical labs). |
| System validation | IQ/OQ/PQ documentation for the storage system. Annual revalidation. |
import hashlib
import json
from datetime import datetime
def create_audit_record(
action: str,
spectrum_id: str,
operator_id: str,
details: dict
) -> dict:
record = {
"audit_id": str(uuid4()),
"timestamp": datetime.utcnow().isoformat() + "Z",
"action": action,
"spectrum_id": spectrum_id,
"operator_id": operator_id,
"details": details,
}
record_bytes = json.dumps(record, sort_keys=True).encode()
record["integrity_hash"] = hashlib.sha256(record_bytes).hexdigest()
return record
def verify_spectrum_integrity(
stored_hash: str,
spectrum_path: str
) -> bool:
with open(spectrum_path, "rb") as f:
computed = hashlib.sha256(f.read()).hexdigest()
return computed == stored_hashVersion Management for Reprocessing
When you deploy a new preprocessing algorithm or a new classification model, you may need to reprocess historical spectra. The storage layer must support this without destroying the original records:
- Raw files are immutable. The original vendor file and its initial JCAMP-DX conversion are never modified.
- Processed outputs are versioned. Each reprocessing run produces a new
processed.npztagged with the preprocessing pipeline version. The old version remains in storage. - Classification results are versioned. A result from model v1.2 and a result from model v1.3 can coexist for the same spectrum. The metadata database tracks which model version produced which result.
- The "current" result is explicit. A
current_result_versionpointer in the metadata database identifies which classification result is the active one for clinical purposes. Changing this pointer is an audited action.
Layer 5: ML Inference
The classification model is the core of the diagnostic. The inference layer serves this model in production with the reliability, traceability, and performance that clinical use demands.
For model architecture choices (PLS-DA, SVM, CNN), training methodology, and validation strategy, see Building AI Pipelines for Spectral Classification. For confidence scoring design, see Confidence Scoring for Spectral Classification. This section covers the infrastructure around the model - how you serve it, version it, test updates, and manage the feature pipeline.
Model Serving: Batch vs. Real-Time
Two inference patterns, matching the two acquisition patterns from Layer 1:
Real-time inference for clinical single-sample workflows. Latency budget: < 500 ms from preprocessed spectrum to classification result. This is a synchronous call - the clinician is waiting.
import onnxruntime as ort
import numpy as np
from functools import lru_cache
@lru_cache(maxsize=4)
def load_model(model_path: str) -> ort.InferenceSession:
opts = ort.SessionOptions()
opts.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
opts.intra_op_num_threads = 2
return ort.InferenceSession(model_path, opts)
def infer_realtime(
spectrum: np.ndarray,
model_path: str,
class_names: list[str]
) -> dict:
session = load_model(model_path)
input_data = spectrum.reshape(1, 1, -1).astype(np.float32)
logits = session.run(None, {"spectrum": input_data})[0]
probs = np.exp(logits) / np.sum(np.exp(logits), axis=1, keepdims=True)
predicted_idx = int(np.argmax(probs[0]))
return {
"predicted_class": class_names[predicted_idx],
"confidence": float(probs[0][predicted_idx]),
"all_probabilities": {
name: float(probs[0][i]) for i, name in enumerate(class_names)
}
}ONNX Runtime on CPU handles spectral classification models in < 50 ms. The preprocessing pipeline (baseline correction, normalization, region selection) typically takes longer - 20-100 ms. Total end-to-end inference well within the 500 ms budget.
Batch inference for reprocessing, research, and analytics. When you deploy a new model version and want to re-classify a historical dataset of 50,000 spectra, you need a batch pipeline. This is not latency-sensitive - run it overnight.
async def batch_inference(
spectrum_ids: list[str],
model_path: str,
storage_client,
batch_size: int = 256
) -> list[dict]:
session = load_model(model_path)
results = []
for i in range(0, len(spectrum_ids), batch_size):
batch_ids = spectrum_ids[i:i + batch_size]
spectra = np.stack([
await storage_client.load_processed(sid)
for sid in batch_ids
])
input_data = spectra.reshape(-1, 1, spectra.shape[1]).astype(np.float32)
logits = session.run(None, {"spectrum": input_data})[0]
probs = np.exp(logits) / np.sum(np.exp(logits), axis=1, keepdims=True)
for j, sid in enumerate(batch_ids):
results.append({
"spectrum_id": sid,
"predicted_idx": int(np.argmax(probs[j])),
"confidence": float(np.max(probs[j]))
})
return resultsModel Versioning and Registry
Every classification result must be traceable to the exact model that produced it. Use a model registry (MLflow Model Registry, or a custom implementation if MLflow is too heavy for your deployment):
| Field | Example | Purpose |
|---|---|---|
model_id | ftir-strep-cnn | Identifies the model family |
model_version | v2.1.0 | Semantic version |
training_dataset_hash | sha256:a3f8... | Which data trained this model |
preprocessing_version | preproc-v3.0 | Which preprocessing pipeline |
validation_metrics | {"sensitivity": 0.973, "specificity": 0.961, "auc": 0.994} | Performance at deployment |
validated_instruments | ["BRUKER-ALPHA-SN4821", "BRUKER-ALPHA-SN5102"] | Which hardware this model is cleared for |
deployed_at | 2026-10-24T00:00:00Z | When this version went live |
status | production | staging, production, retired, recalled |
The registry enforces a promotion workflow: a model cannot move from staging to production without documented validation metrics that meet minimum thresholds and sign-off from a clinical officer.
A/B Testing in Clinical Settings
A/B testing ML models in clinical diagnostics is not the same as A/B testing a recommendation algorithm. You cannot randomly assign patients to different model versions and compare outcomes, because:
- Regulatory constraints. Each model version is a validated medical device configuration. Running an unvalidated model on clinical samples may violate your FDA clearance or CE marking.
- Clinical consequence. If model B gives wrong results for 5% of the A/B test patients, those are real patients who received wrong results.
The compliant approach is shadow mode: run the new model in parallel with the production model. Both models process every spectrum. The production model's result goes to the clinician. The shadow model's result is logged but not displayed. After sufficient shadow data accumulates (typically 200-500 spectra), compare the two models' performance statistically. If the new model meets or exceeds the production model's performance, promote it through the validation process.
async def inference_with_shadow(
spectrum: np.ndarray,
production_model: str,
shadow_model: str | None,
class_names: list[str]
) -> dict:
prod_result = infer_realtime(spectrum, production_model, class_names)
result = {"production": prod_result}
if shadow_model:
shadow_result = infer_realtime(spectrum, shadow_model, class_names)
result["shadow"] = shadow_result
result["agreement"] = (
prod_result["predicted_class"] == shadow_result["predicted_class"]
)
return resultFeature Store for Spectral Features
A feature store ensures that the features used for training are identical to the features used for inference - eliminating training-serving skew, which is the most common cause of silent model failure in production ML systems.
For spectral classification, the "features" are the preprocessed spectral values themselves (for CNN models) or derived features like PCA scores, peak ratios, and derivative values (for classical models). The feature store records:
- The preprocessing pipeline version and parameters (baseline correction method, normalization type, spectral region selection)
- The feature extraction parameters (PCA model fitted on training data, peak positions for ratio calculation)
- The exact output for every spectrum, keyed by spectrum ID and pipeline version
This is simpler than a general-purpose feature store because spectral features are computed from a single input (the spectrum) with no joins across data sources. A versioned preprocessing pipeline with deterministic output is sufficient - you do not need Feast or Tecton unless your features combine spectral data with external data sources (patient demographics, prior test results).
Layer 6: Result Delivery
The classification result must reach the clinician and the patient's medical record. This is the integration layer where spectroscopy meets healthcare IT.
Classification Result Structure
The ML model outputs probabilities. The result delivery layer transforms these into a structured clinical result:
@dataclass
class ClassificationResult:
spectrum_id: str
model_version: str
predicted_class: str
confidence: float
confidence_category: str # "high", "moderate", "low"
all_probabilities: dict[str, float]
quality_metrics: dict
clinical_interpretation: str # "Positive", "Negative", "Indeterminate"
requires_review: bool
created_at: datetime
created_by: str # "system" for automated, operator ID for manualThe clinical_interpretation field is the result the clinician sees. It maps from the model's probabilistic output through configurable thresholds to one of three categories: Positive, Negative, or Indeterminate. The thresholds are set during clinical validation and are part of the validated device configuration - changing them is a regulatory event.
HL7v2 Result Delivery
Most laboratory information systems still receive results via HL7v2 ORU messages over MLLP (Minimal Lower Layer Protocol) TCP connections. The spectral classification result must be encoded as an HL7v2 ORU^R01 message.
We cover the full HL7v2 encoding in HL7v2 Result Delivery for Spectroscopy Diagnostics. The key mapping:
MSH|^~\&|SPECTRADX|SITE-MAYO|LABLIS|MAYO|20261024142307||ORU^R01|MSG00001|P|2.5.1
PID|1||MRN-0047291^^^MAYO^MR||DOE^JANE^M||19850315|F
OBR|1|ACC-2026-183742||FTIR-STREP^FTIR Strep A Screen^L|||20261024142107
OBX|1|CWE|FTIR-STREP^FTIR Strep A Screen^L||10828004^Positive^SCT||||||F
OBX|2|NM|FTIR-CONF^Classification Confidence^L||97.3|%|||||F
OBX|3|NM|FTIR-SNR^Spectral SNR^L||142.8||||||F
NTE|1||Model: ftir-strep-cnn v2.1.0
FHIR R4 Result Delivery
For EHR systems that support FHIR R4 (Epic, Oracle Health, MEDITECH Expanse), the result maps to a DiagnosticReport with linked Observation resources. See FHIR R4 for Diagnostic Spectroscopy for the complete resource mapping and authentication flow.
For systems that need both protocols - and most clinical deployments do - see Connecting Spectroscopy Instruments to LIMS for the middleware architecture that routes results to the right destination.
Audit Trail
Every result delivery event must be logged with enough detail to reconstruct the complete chain of custody:
def log_result_delivery(
spectrum_id: str,
result: ClassificationResult,
delivery_method: str,
destination: str,
ack_status: str,
ack_message: str | None = None
) -> dict:
return create_audit_record(
action="result_delivered",
spectrum_id=spectrum_id,
operator_id="system",
details={
"model_version": result.model_version,
"predicted_class": result.predicted_class,
"confidence": result.confidence,
"clinical_interpretation": result.clinical_interpretation,
"delivery_method": delivery_method,
"destination": destination,
"ack_status": ack_status,
"ack_message": ack_message,
"delivered_at": datetime.utcnow().isoformat() + "Z"
}
)The audit trail must answer these questions for any result, at any point in the future:
- Which spectrum produced this result?
- Which model version classified it?
- What were the quality metrics at acquisition?
- Who was the operator?
- When was the result delivered to the LIS?
- Did the LIS acknowledge receipt?
- Was the result ever amended, and if so, by whom and why?
Result Amendment Workflow
Results sometimes need to be corrected. An operator may realize that a sample was mislabeled. A supervisor may override an indeterminate result after manual review. A model update may reclassify a historical spectrum differently.
Amendments never overwrite the original result. The original result is preserved with an amended status, and a new result is created with a reference to the original:
def amend_result(
original_spectrum_id: str,
new_interpretation: str,
reason: str,
operator_id: str,
supervisor_id: str
) -> dict:
amendment = {
"amendment_id": str(uuid4()),
"original_spectrum_id": original_spectrum_id,
"new_interpretation": new_interpretation,
"reason": reason,
"requested_by": operator_id,
"approved_by": supervisor_id,
"created_at": datetime.utcnow().isoformat() + "Z"
}
# Send corrected HL7 ORU with amended result
# Original result status changes from "F" (final) to "C" (corrected)
return amendmentAmendments require dual authorization - an operator requests the amendment and a supervisor approves it. This is both a regulatory requirement (21 CFR Part 11) and a clinical safety measure.
Technology Stack Recommendation
Choosing the right tools for each layer. This table reflects what works in production spectroscopy deployments - not what is trendy.
| Layer | Component | Recommendation | Rationale |
|---|---|---|---|
| Acquisition | Instrument drivers | Vendor SDK + custom adapter | No alternative; each vendor is different |
| Acquisition | Local message bus | asyncio.Queue or ZeroMQ | In-process for single instrument; ZeroMQ for multi-instrument |
| Processing | Format parsing | brukeropus, spc-spectra, jcamp | See spectral data formats |
| Processing | Signal processing | scipy.signal, numpy | Industry standard; no reason to use anything else |
| Processing | Quality checks | Custom (see code above) | Domain-specific; no off-the-shelf solution exists |
| Ingestion | Upload protocol | HTTPS + gzip | Sufficient for single-spectrum clinical workflows |
| Ingestion | Message queue (if needed) | SQS / Cloud Tasks | Managed, no infrastructure to maintain |
| Ingestion | Offline buffer | SQLite | Embedded, zero-config, battle-tested |
| Storage | Spectral objects | S3 / Azure Blob / GCS | Object lock for Part 11 retention |
| Storage | Metadata DB | PostgreSQL (managed) | Relational queries on clinical metadata |
| Storage | Audit log | Append-only table + object store backup | Immutable audit trail |
| Inference | Model format | ONNX | Portable, fast, framework-independent |
| Inference | Inference runtime | ONNX Runtime (CPU) | < 50 ms for spectral CNN; GPU unnecessary |
| Inference | Model registry | MLflow or custom | Versioning, lineage, promotion workflow |
| Inference | Feature pipeline | Versioned preprocessing module | Deterministic; no external feature store needed |
| Delivery | HL7v2 | python-hl7 + MLLP client | Most LIS integrations |
| Delivery | FHIR R4 | fhirclient or httpx + JSON | Modern EHR integrations |
| Delivery | Integration engine (if needed) | Mirth Connect / Rhapsody | Multi-destination routing, message transformation |
What You Probably Do Not Need
Some technologies that are popular in general ML infrastructure but are overkill for spectroscopy data pipelines:
- Kafka. Your throughput is tens to hundreds of spectra per day, not millions of events per second. SQS or a simple HTTP queue is sufficient.
- GPU inference. Spectral classification models are small (< 10 MB) and process 1D input. CPU inference takes < 50 ms. GPUs add cost, complexity, and a driver dependency for no latency benefit.
- Data lake formats (Delta Lake, Iceberg, Hudi). You are storing individual spectral files and metadata rows, not petabytes of analytical data. PostgreSQL + S3 is the right scale.
- Kubernetes. If you are deploying to 5-20 clinical sites, a managed service (ECS, Cloud Run, Azure Container Apps) with auto-scaling is simpler to operate. Kubernetes makes sense at 100+ sites with a dedicated platform team.
- Real-time feature stores (Feast, Tecton). Spectral features are computed from a single input with no cross-entity joins. A versioned Python module is sufficient.
Putting It All Together
The complete flow for a single clinical test - from sample placement to result in the medical record:
t=0s Clinician places sample, presses "Run Test"
│
t=0.1s Instrument driver sends acquisition command via SDK
│
t=15s Instrument completes acquisition (FTIR ATR, 32 scans)
Driver reads raw OPUS file from instrument
│
t=15.1s Format conversion: OPUS → internal representation
Quality gate: SNR=142, no saturation → PASS
Metadata attached: patient MRN, operator, timestamp
│
t=15.2s Spectrum written to local SQLite buffer
│
t=15.3s Preprocessing: baseline correction, SNV, region selection
ML inference (ONNX Runtime): 47ms
Result: "Positive", confidence 97.3% → "High confidence"
│
t=15.4s Result displayed to clinician on screen
│
t=15.5s HTTPS upload to cloud (background)
Cloud stores raw + processed + result
Audit trail created
│
t=16.0s HL7v2 ORU^R01 sent to LIS via MLLP
LIS ACK received
│
t=16.1s Result available in patient's medical record
Total time from acquisition complete to clinical result: ~0.4 seconds. Total time from acquisition complete to LIS: ~1 second. Total time including acquisition: ~16 seconds.
The clinician experiences a 16-second test with a one-button interface. Behind that button, six architectural layers executed in sequence - each one doing exactly its job and nothing more. This is the pipeline that the SpectraDx platform implements end-to-end.
Further Reading
- Building AI Pipelines for Spectral Classification - the ML pipeline that sits inside Layer 5
- Confidence Scoring for Spectral Classification - threshold design and indeterminate result handling
- Instrument-Agnostic Spectral Data Formats - parsing every major spectral format
- Building Clinical Workflow Software for Spectroscopy-Based Diagnostics - the clinical application layer that wraps this pipeline
- Connecting Spectroscopy Instruments to LIMS - middleware patterns for LIS integration
- FHIR R4 for Diagnostic Spectroscopy - modern EHR integration
- HL7v2 Result Delivery for Spectroscopy Diagnostics - legacy LIS integration

