Table Loader Documentation
Overview
The Table Loader is the final and most critical service in the IDhub data ingestion pipeline. Its primary responsibility is to safely and intelligently load validated data fragments from the staging area into the central PostgreSQL database. It operates on batches of data that have been processed by the Fragment Validator and queued for loading.
The service's core is the Universal Update Strategy, which uses natural keys to perform "upsert" operations (inserting new records and updating existing ones) while protecting immutable fields from being overwritten.
Architecture
graph TD
subgraph "Input"
VQ[(Validation Queue<br/>PostgreSQL)]
S3[(S3 Bucket<br/>Validated Fragments)]
end
subgraph "Table Loader Service"
MAIN[main.py - CLI Entrypoint]
CONFIG[Configuration Loader]
subgraph "Processing Engine"
BATCH[1. Batch Processor]
GROUP[2. Group by Table]
FETCH[3. Fetch Fragments from S3]
UPSERT[4. Upsert Logic Engine]
end
subgraph "Database Interaction"
LOOKUP[Natural Key Lookup]
INSERT[INSERT Statement]
UPDATE[UPDATE Statement]
AUDIT[Update Queue Status]
end
end
subgraph "Output"
DB[(IDhub Database)]
end
VQ -- "Read pending batch" --> MAIN
MAIN --> BATCH
CONFIG -- "table_configs.json" --> UPSERT
BATCH --> GROUP
GROUP --> FETCH
FETCH -- "Download fragment" --> S3
FETCH --> UPSERT
UPSERT --> LOOKUP
LOOKUP -- "Query record" --> DB
LOOKUP -- "Exists?" --> UPSERT
UPSERT -- "New" --> INSERT
UPSERT -- "Exists" --> UPDATE
INSERT -- "Write" --> DB
UPDATE -- "Write" --> DB
UPSERT -- "Success/Fail" --> AUDIT
AUDIT -- "Update status" --> VQ
style VQ fill:#2196F3
style S3 fill:#FF9800
style DB fill:#4CAF50
Key Features
- Batch Processing: Loads data in discrete, auditable batches for transactional integrity.
- Natural Key Upserts: Intelligently inserts or updates records based on business-defined natural keys (e.g.,
global_subject_id+niddk_no), not database IDs. - Immutable Field Protection: Prevents critical, once-set data (like
created_atorglobal_subject_id) from being modified. - Transactional Loading: Processes records within a database transaction. If any record in a batch fails, the entire transaction can be rolled back to prevent partial loads.
- Auditing: Updates the status of each fragment in the
validation_queuetoloadedorfailed, providing a clear record of what was loaded and when.
Universal Update Strategy
The power of the Table Loader comes from its update strategy, which is configured on a per-table basis.
- Natural Key Lookup: For each incoming record, the loader uses the
natural_keydefined intable_configs.jsonto check if a corresponding record already exists in the target database table. - Decision Point:
- If Not Found: The loader performs an
INSERToperation to create a new record. - If Found: The loader proceeds to the update path.
- If Not Found: The loader performs an
- Immutable Field Check: Before updating, the loader compares the values of
immutable_fieldsin the incoming record with the existing database record. If any immutable field has changed, the update is rejected, and the fragment is marked as failed. - Value Comparison: If immutable fields are unchanged, the loader compares the remaining fields. If there are no changes, the update is skipped to avoid unnecessary database writes.
- Perform Update: If there are changes to mutable fields, the loader performs an
UPDATEoperation on the existing record.
➡️ See the full Universal Update Strategy documentation for more details.
Configuration
Table Configuration (config/table_configs.json)
This central configuration file dictates the loader's behavior for each table.
{
"lcl": {
"natural_key": ["global_subject_id", "niddk_no"],
"immutable_fields": ["created_at", "global_subject_id"],
"update_strategy": "upsert"
},
"subjects": {
"natural_key": ["global_subject_id"],
"immutable_fields": ["created_at", "gsid"],
"update_strategy": "upsert"
}
}
natural_key: An array of fields that together uniquely identify a record.immutable_fields: An array of fields that cannot be changed after a record is first created.update_strategy: The loading strategy to use (e.g.,upsert,insert_only).
Environment Variables
# Database Connection
DATABASE_URL=postgresql://user:password@host:port/database
# AWS S3 for fetching fragments
AWS_ACCESS_KEY_ID=test
AWS_SECRET_ACCESS_KEY=test
AWS_ENDPOINT_URL=http://localhost:4566 # For local development
S3_BUCKET=idhub-dev-fragments
# Logging
LOG_LEVEL=INFO
Workflow
The Table Loader is executed for a specific batch_id.
- Query Queue: The service queries the
validation_queuefor all pending fragments associated with the givenbatch_id. - Group by Table: To optimize database operations, it groups the list of fragments by their target
table_name. - Process Tables Sequentially: The loader processes one table at a time.
- Process Fragments: For each fragment in the table group:
a. Download from S3: The JSON fragment data is downloaded from the S3 bucket.
b. Execute Upsert Logic: The Universal Update Strategy is applied to determine whether to
INSERT,UPDATE,SKIP, orREJECTthe record. c. Database Operation: The appropriate SQL command is executed against the database. - Update Queue Status: After processing the fragment, its entry in the
validation_queueis updated:- On success (
INSERT,UPDATE,SKIP), the status is set toloaded. - On failure (
REJECT), the status is set tofailed, and anerror_messageis recorded.
- On success (
- Generate Summary: Once the entire batch is processed, a summary report is generated detailing how many records were inserted, updated, skipped, or failed.
Error Handling and Auditing
The Table Loader is designed to be robust and auditable.
- Load Failures: If an individual record fails (e.g., an immutable field violation), only that fragment is marked as
failed. The rest of the batch continues to load. - Transactional Integrity: In non-dry-run mode, operations for a batch can be wrapped in a transaction, ensuring that the database is not left in a partially-loaded state in case of a critical service failure.
- Audit Trail: The
validation_queueserves as a permanent audit log. By inspecting the queue, an administrator can see the status of every fragment for every batch, along with any error messages for failed loads.
Usage
The Table Loader is run from the command line, typically as part of an automated workflow.
Command-Line Arguments
--batch-id: Required. The unique identifier for the batch to be loaded.--table: (Optional) If specified, only loads fragments for the given table within the batch.--dry-run: (Optional) Executes the entire loading process—including database lookups and value comparisons—but does not write any data or update the queue. This is essential for previewing changes and debugging.
Examples
# Preview the load for a specific batch without making changes
python main.py --batch-id batch_20251205_103000 --dry-run
# Perform a live load of the entire batch
python main.py --batch-id batch_20251205_103000
# Load only the 'lcl' table from the batch
python main.py --batch-id batch_20251205_103000 --table lcl
Testing
The service includes unit and integration tests to validate its core logic.
# Navigate to the service directory
cd table-loader
# Install test dependencies
pip install -r requirements-test.txt
# Run all tests
pytest
# Run tests with coverage report
pytest --cov=. --cov-report=html