Freight Contract Architecture & Rate Mapping

The Freight Contract Architecture & Rate Mapping framework serves as the deterministic backbone of modern freight audit pipelines. It transforms unstructured carrier agreements, EDI 210 invoices, and negotiated rate schedules into auditable, machine-readable rate matrices. For logistics analysts, freight auditors, and Python ETL developers, this architecture eliminates manual rate lookups, enforces strict compliance thresholds, and automates discrepancy routing before invoices hit the AP ledger. The pipeline operates across five sequential stages: Architecture → Ingestion → Validation → Dispute → Audit. Each stage requires strict schema alignment, idempotent processing, and production-ready configuration management.

1. Architecture: Schema Design & Versioning

A resilient rate mapping architecture begins with a normalized contract schema that decouples carrier identifiers, lane definitions, and pricing tiers. The core data model must support effective dating, geographic zoning, weight/quantity breakpoints, and mode-specific pricing rules. Without deterministic versioning, overlapping contract periods and mid-cycle renegotiations introduce silent audit failures that compound across thousands of monthly invoices.

Production systems implement Contract Version Control to maintain a strict lineage of rate amendments. Each contract record is stamped with effective_start, effective_end, version_hash, and amendment_type (e.g., GENERAL_RATE_INCREASE, LANE_SPECIFIC_ADJUSTMENT). This enables point-in-time rate resolution during historical invoice reconciliation and prevents temporal bleed where newer rates incorrectly overwrite historical shipments.

To support regulatory compliance and retrospective dispute resolution, the architecture must integrate Historical Rate Sheet Archiving as an immutable data lake layer. Archived rate sheets are stored in partitioned Parquet or Delta Lake formats, indexed by carrier_scac, contract_id, and effective_date. This ensures that when an EDI 210 invoice references a rate from a superseded agreement, the audit engine can reconstruct the exact pricing environment at the time of shipment tender.

# models/contract_schema.py
from pydantic import BaseModel, Field, field_validator, model_validator
from datetime import date
from typing import Optional, Dict, Any
from enum import Enum
import hashlib
import json

class ContractStatus(str, Enum):
    ACTIVE = "ACTIVE"
    EXPIRED = "EXPIRED"
    SUSPENDED = "SUSPENDED"

class AmendmentType(str, Enum):
    GRI = "GENERAL_RATE_INCREASE"
    LANE_ADJ = "LANE_SPECIFIC_ADJUSTMENT"
    ACCESSORIAL_UPDATE = "ACCESSORIAL_UPDATE"

class RateContract(BaseModel):
    contract_id: str
    carrier_scac: str
    mode: str  # LTL, FTL, INTERMODAL
    effective_start: date
    effective_end: Optional[date] = None
    status: ContractStatus = ContractStatus.ACTIVE
    rate_matrix: Dict[str, Any] = Field(default_factory=dict)
    amendment_type: Optional[AmendmentType] = None

    @field_validator("effective_end")
    @classmethod
    def validate_date_range(cls, v: Optional[date], info) -> Optional[date]:
        if v and v <= info.data.get("effective_start"):
            raise ValueError("effective_end must be strictly after effective_start")
        return v

    @model_validator(mode="before")
    @classmethod
    def compute_version_hash(cls, data: Any) -> Any:
        if isinstance(data, dict):
            payload = json.dumps({
                "contract_id": data.get("contract_id"),
                "carrier_scac": data.get("carrier_scac"),
                "effective_start": str(data.get("effective_start")),
                "rate_matrix_keys": sorted(data.get("rate_matrix", {}).keys())
            }, sort_keys=True)
            data["version_hash"] = hashlib.sha256(payload.encode()).hexdigest()
        return data

2. Ingestion: EDI 210 Parsing & Carrier Data Normalization

The ingestion layer ingests heterogeneous data sources: ANSI ASC X12 EDI 210 Motor Carrier Freight Details and Invoices, carrier portal exports, and digitized PDF rate sheets. Production pipelines must normalize these inputs into a unified ShipmentInvoice schema before any validation occurs. EDI 210 parsing requires strict segment mapping: B3 for invoice headers, N1 for shipper/consignee SCACs, L11 for reference numbers (PRO, BOL), and L3 for total charges.

For less-than-truckload (LTL) agreements, LTL Rate Sheet Digitization requires parsing class-based pricing tables, discount percentages, and minimum charge floors. The ETL process must flatten nested tariff structures into lookup-ready matrices keyed by origin/destination ZIP3, freight class, and weight breaks.

For full-truckload (FTL) contracts, FTL Base Rate Extraction focuses on lane-specific flat rates, mileage-based calculations, and equipment type modifiers. The ingestion pipeline applies idempotent processing by generating deterministic ingestion keys (f"{carrier_scac}_{invoice_number}_{pro_number}"). Duplicate submissions are routed to a quarantine table rather than silently dropped, preserving audit trails for carrier reconciliation.

3. Validation: Rate Resolution & Compliance Thresholds

Once ingested, invoices enter the validation stage where the pipeline cross-references shipment attributes against the active rate matrix. The engine performs vectorized lookups using DuckDB or Polars to match origin/destination pairs, weight, and service level. Tolerance thresholds (typically ±0.5% to ±2.0%) are applied to account for rounding differences between carrier billing systems and internal ERP calculations.

A critical validation component is Accessorial Charge Taxonomy Mapping. Carriers frequently apply non-standard codes for detention, liftgate, residential delivery, or inside pickup. The pipeline maps carrier-specific codes to a canonical taxonomy, validates them against the contracted accessorial schedule, and flags unauthorized charges for dispute.

Fuel surcharges require precise mathematical validation. Fuel Surcharge Formula Implementation must align with the DOE national diesel average or a contracted index. The ETL engine calculates the expected surcharge using the exact formula defined in the contract (e.g., (Current Index - Base Index) / Base Index * Linehaul * Multiplier) and compares it against the EDI 210 L3 segment. Deviations outside the tolerance threshold trigger automated exception routing.

# validation/rate_engine.py
import logging
from datetime import date
from decimal import Decimal
from typing import Tuple

logger = logging.getLogger(__name__)

class RateValidationEngine:
    def __init__(self, tolerance_pct: float = 0.015):
        self.tolerance = Decimal(str(tolerance_pct))

    def validate_linehaul(self, contracted_rate: Decimal, billed_rate: Decimal) -> Tuple[bool, Decimal]:
        if contracted_rate == 0:
            return False, Decimal("0")
        variance = abs(billed_rate - contracted_rate) / contracted_rate
        is_compliant = variance <= self.tolerance
        if not is_compliant:
            logger.warning(f"Rate variance {variance:.4%} exceeds tolerance. "
                           f"Contracted: {contracted_rate}, Billed: {billed_rate}")
        return is_compliant, variance

    def resolve_point_in_time(self, rate_matrix: dict, shipment_date: date) -> dict:
        """Filters rate matrix to only active records at shipment_date."""
        active_rates = []
        for lane_id, rate_data in rate_matrix.items():
            if rate_data["effective_start"] <= shipment_date:
                if rate_data.get("effective_end") is None or rate_data["effective_end"] >= shipment_date:
                    active_rates.append(rate_data)
        if not active_rates:
            raise ValueError(f"No active contract found for shipment date {shipment_date}")
        return max(active_rates, key=lambda x: x["effective_start"])

4. Dispute: Exception Handling & Fallback Routing

When validation fails, the pipeline must not halt. Instead, it routes discrepancies into structured dispute queues based on severity and root cause. Minor variances within tolerance are auto-approved. Unauthorized accessorials, expired contract references, or missing rate matches are escalated.

The system relies on Fallback Routing Configuration to handle edge cases where a primary contract is missing or ambiguous. Fallback logic typically cascades through: 1) Carrier default tariff rates, 2) Industry benchmark pricing (e.g., DAT, Truckstop), 3) Historical shipment averages, or 4) Manual auditor assignment. Each fallback step is logged with a confidence score and requires explicit configuration approval to prevent silent overpayment.

Dispute records are serialized into a standardized JSON payload containing the invoice ID, failed validation rule, expected vs. actual values, and recommended resolution. These payloads are pushed to a message broker (RabbitMQ/Kafka) or directly into a freight audit SaaS API, ensuring that AP teams only receive pre-validated, actionable exceptions.

5. Audit: Ledger Reconciliation & Lifecycle Management

The final stage closes the loop by reconciling approved invoices with the enterprise AP ledger. The audit engine generates immutable settlement files, applies payment terms, and archives the complete transaction lineage. Every rate lookup, validation decision, and fallback application is hashed and stored for SOX compliance and carrier audit requests.

Proactive lifecycle management prevents future audit failures. Contract Expiry & Renewal Tracking runs as a scheduled job that evaluates effective_end dates against upcoming shipment forecasts. Contracts expiring within 30 days trigger automated alerts to procurement teams. If a contract lapses without renewal, the pipeline automatically switches to fallback routing and flags all subsequent shipments for manual review until a new agreement is digitized.

Production Pipeline Orchestration

Deploying this architecture requires an orchestration layer that guarantees exactly-once processing semantics, distributed tracing, and configuration-as-code. Modern implementations leverage Apache Airflow or Prefect to manage DAG dependencies, while DuckDB handles in-memory analytical validation. All configuration (tolerances, fallback matrices, carrier mappings) is stored in a centralized secrets manager or Git-backed configuration repository to enable environment promotion and rollback.

# pipeline/etl_runner.py
import logging
from contextlib import contextmanager
from typing import Generator, List

logger = logging.getLogger(__name__)

@contextmanager
def audit_transaction(invoice_id: str) -> Generator[None, None, None]:
    """Ensures idempotent, auditable transaction boundaries."""
    logger.info(f"BEGIN_AUDIT_TRANSACTION invoice={invoice_id}")
    try:
        yield
        logger.info(f"COMMIT_AUDIT_TRANSACTION invoice={invoice_id}")
    except Exception as e:
        logger.error(f"ROLLBACK_AUDIT_TRANSACTION invoice={invoice_id} error={e}")
        raise
    finally:
        logger.info(f"END_AUDIT_TRANSACTION invoice={invoice_id}")

def run_freight_audit_pipeline(invoices: List[dict], config: dict) -> dict:
    results = {"approved": [], "disputed": [], "errors": []}
    
    for inv in invoices:
        with audit_transaction(inv["invoice_id"]):
            try:
                # 1. Ingest & Normalize
                normalized = normalize_edi_210(inv)
                # 2. Resolve Contract
                contract = resolve_active_contract(normalized, config["rate_store"])
                # 3. Validate
                compliance, variance = validate_charges(normalized, contract)
                # 4. Route
                if compliance:
                    results["approved"].append(normalized)
                else:
                    results["disputed"].append(build_dispute_payload(normalized, variance))
            except Exception as e:
                results["errors"].append({"invoice_id": inv["invoice_id"], "error": str(e)})
                
    return results

By enforcing strict schema validation, deterministic versioning, and automated exception routing, this architecture transforms freight billing from a reactive, error-prone accounting function into a proactive, data-driven operational capability.