Skip to content

REDCap Pipeline Documentation

Overview

The REDCap Pipeline is an automated data extraction and transformation service that pulls data from multiple REDCap projects, transforms it according to configurable field mappings, and stages it in S3 for validation and loading.

Architecture

graph TB
    subgraph "REDCap Pipeline"
        SCHED[Scheduler/Trigger]
        CONFIG[Configuration Loader]

        subgraph "Extraction"
            API[REDCap API Client]
            INCR[Incremental Tracker]
        end

        subgraph "Transformation"
            MAP[Field Mapper]
            TRANS[Data Transformer]
            VAL[Basic Validator]
        end

        subgraph "Loading"
            FRAG[Fragment Generator]
            S3[S3 Uploader]
            META[Metadata Tracker]
        end
    end

    RC[REDCap Projects] -->|API| API
    API --> INCR
    INCR --> MAP
    MAP --> TRANS
    TRANS --> VAL
    VAL --> FRAG
    FRAG --> S3
    S3 --> META

    CONFIG -.->|Field Mappings| MAP
    CONFIG -.->|Project Config| API

    S3 -->|Upload| S3BUCKET[(S3 Bucket)]

    style RC fill:#FF9800
    style S3BUCKET fill:#2196F3

Features

  • Multi-Project Support: Extract from multiple REDCap projects
  • Incremental Extraction: Only fetch new/updated records
  • Field Mapping: Transform source fields to target schema
  • Batch Processing: Process records in configurable batches
  • Error Handling: Robust error handling and retry logic
  • Audit Trail: Complete logging of extraction and transformation

Configuration

Project Configuration

Projects are defined in config/projects.json:

{
  "projects": {
    "gap": {
      "name": "GAP",
      "redcap_project_id": "16894",
      "api_token_env": "REDCAP_API_TOKEN_GAP",
      "field_mappings": "gap_field_mappings.json",
      "schedule": "continuous",
      "batch_size": 50,
      "enabled": true,
      "description": "Main biobank project",
      "tables": ["lcl", "genotype", "sequence"]
    },
    "uc_demarc": {
      "name": "UC DEMARC",
      "redcap_project_id": "12345",
      "api_token_env": "REDCAP_API_TOKEN_UC_DEMARC",
      "field_mappings": "uc_demarc_field_mappings.json",
      "schedule": "daily",
      "batch_size": 100,
      "enabled": true,
      "description": "UC DEMARC study data",
      "tables": ["specimen", "genotype"]
    }
  }
}

Configuration Fields:

Field Type Required Description
name string Yes Human-readable project name
redcap_project_id string Yes REDCap project ID
api_token_env string Yes Environment variable name for API token
field_mappings string Yes Field mapping configuration file
schedule string No Extraction schedule (continuous, daily, weekly)
batch_size integer No Records per batch (default: 50)
enabled boolean No Enable/disable project (default: true)
tables array Yes Target tables for this project

Field Mapping Configuration

Field mappings define how source fields map to target schema:

{
  "lcl": {
    "field_mapping": {
      "knumber": "k_number",
      "niddk_no": "niddk_number",
      "passage_number": "passage_num",
      "cell_line_status": "status",
      "freeze_date": "date_frozen",
      "storage_location": "location",
      "notes": "comments"
    },
    "subject_id_candidates": ["consortium_id", "subject_id"],
    "center_id_field": "center",
    "default_center_id": 1,
    "exclude_from_load": ["record_id", "redcap_event_name"],
    "transformations": {
      "freeze_date": "date",
      "passage_number": "integer"
    }
  },
  "genotype": {
    "field_mapping": {
      "genotype_id": "genotyping_sample_id",
      "genotyping_project": "project_name",
      "genotyping_barcode": "barcode",
      "batch": "genotyping_batch"
    },
    "subject_id_candidates": ["consortium_id"],
    "center_id_field": "center",
    "default_center_id": 1,
    "exclude_from_load": ["record_id"],
    "transformations": {
    }
  }
}

Mapping Fields:

Field Type Description
field_mapping object Source → Target field mappings
subject_id_candidates array Fields to try for subject ID resolution
center_id_field string Field containing center ID
default_center_id integer Default center ID if not in data
exclude_from_load array Fields to exclude from output
transformations object Field type transformations

Environment Variables

# REDCap Configuration
REDCAP_API_URL=https://redcap.example.edu/api/
REDCAP_API_TOKEN_GAP=your_gap_token_here
REDCAP_API_TOKEN_UC_DEMARC=your_uc_demarc_token_here

# Database Configuration
DB_HOST=idhub_db
DB_NAME=idhub
DB_USER=idhub_user
DB_PASSWORD=your_secure_password
DB_PORT=5432

# S3 Configuration
S3_BUCKET=idhub-curated-fragments
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_REGION=us-east-1

# GSID Service
GSID_SERVICE_URL=https://api.idhub.ibdgc.org
GSID_API_KEY=your_gsid_api_key

# Pipeline Configuration
BATCH_SIZE=50
DRY_RUN=false
LOG_LEVEL=INFO

Usage

Command Line

# Run pipeline for all enabled projects
python main.py

# Run for specific project
python main.py --project gap

# Run with custom batch size
python main.py --project gap --batch-size 100

# Dry run (no S3 upload)
python main.py --project gap --dry-run

# Specify date range
python main.py --project gap --start-date 2024-01-01 --end-date 2024-01-31

# Force full extraction (ignore incremental)
python main.py --project gap --full

# Verbose logging
python main.py --project gap --verbose

Programmatic Usage

from services.pipeline import REDCapPipeline
from core.config import settings

# Initialize pipeline
pipeline = REDCapPipeline(
    project_config={
        "name": "GAP",
        "redcap_project_id": "16894",
        "api_token": "your_token",
        "field_mappings": "gap_field_mappings.json"
    }
)

# Run extraction
results = await pipeline.run(
    batch_size=50,
    dry_run=False
)

# Check results
print(f"Extracted: {results['records_extracted']}")
print(f"Transformed: {results['records_transformed']}")
print(f"Uploaded: {results['fragments_uploaded']}")
print(f"Errors: {results['errors']}")

Docker Usage

# Build image
docker build -t redcap-pipeline:latest .

# Run container
docker run --rm \
  -e REDCAP_API_URL=https://redcap.example.edu/api/ \
  -e REDCAP_API_TOKEN_GAP=your_token \
  -e GSID_API_KEY=your_key \
  -e S3_BUCKET=idhub-curated-fragments \
  redcap-pipeline:latest \
  python main.py --project gap

# Run with docker-compose
docker-compose run --rm redcap-pipeline python main.py --project gap

Pipeline Workflow

1. Initialization

class REDCapPipeline:
    def __init__(self, project_config: dict):
        """Initialize pipeline with project configuration"""
        self.project_config = project_config
        self.redcap_client = REDCapClient(
            api_url=settings.REDCAP_API_URL,
            api_token=project_config['api_token']
        )
        self.s3_client = S3Client(bucket=settings.S3_BUCKET)
        self.gsid_client = GSIDClient(
            service_url=settings.GSID_SERVICE_URL,
            api_key=settings.GSID_API_KEY
        )
        self.field_mapper = FieldMapper(
            mapping_config=self.load_field_mappings()
        )

2. Incremental Extraction

async def extract_records(self, start_date: Optional[str] = None) -> list:
    """
    Extract records from REDCap

    Args:
        start_date: Optional start date for incremental extraction

    Returns:
        List of extracted records
    """
    # Get last successful run
    if not start_date:
        last_run = await self.get_last_run_date()
        start_date = last_run.isoformat() if last_run else None

    # Build API parameters
    params = {
        'content': 'record',
        'format': 'json',
        'type': 'flat',
        'rawOrLabel': 'raw'
    }

    if start_date:
        params['dateRangeBegin'] = start_date
        logger.info(f"Extracting records modified since {start_date}")
    else:
        logger.info("Extracting all records (full extraction)")

    # Call REDCap API
    records = await self.redcap_client.export_records(params)

    logger.info(f"Extracted {len(records)} records from REDCap")

    return records

3. Field Mapping

class FieldMapper:
    def __init__(self, mapping_config: dict):
        self.mapping_config = mapping_config

    def transform_record(self, record: dict, table_name: str) -> dict:
        """
        Transform record according to field mappings

        Args:
            record: Source record
            table_name: Target table name

        Returns:
            Transformed record
        """
        if table_name not in self.mapping_config:
            raise ValueError(f"No mapping config for table: {table_name}")

        table_config = self.mapping_config[table_name]
        transformed = {}

        # Apply field mappings
        for target_field, source_field in table_config['field_mapping'].items():
            if source_field in record:
                value = record[source_field]

                # Apply transformations
                if target_field in table_config.get('transformations', {}):
                    transform_type = table_config['transformations'][target_field]
                    value = self.apply_transformation(value, transform_type)

                transformed[target_field] = value

        # Exclude fields
        for field in table_config.get('exclude_from_load', []):
            transformed.pop(field, None)

        return transformed

    def apply_transformation(self, value: Any, transform_type: str) -> Any:
        """Apply type transformation"""
        if value is None or value == '':
            return None

        transformations = {
            'integer': lambda v: int(float(v)),
            'float': lambda v: float(v),
            'date': lambda v: self.parse_date(v),
            'datetime': lambda v: self.parse_datetime(v),
            'boolean': lambda v: str(v).lower() in ('true', '1', 'yes'),
            'string': lambda v: str(v).strip()
        }

        transform_func = transformations.get(transform_type)
        if transform_func:
            try:
                return transform_func(value)
            except (ValueError, TypeError) as e:
                logger.warning(f"Transformation failed for {value}: {e}")
                return None

        return value

4. Subject ID Resolution

async def resolve_subject_id(self, record: dict, table_config: dict) -> Optional[str]:
    """
    Resolve subject ID from record

    Args:
        record: Source record
        table_config: Table configuration

    Returns:
        GSID if resolved, None otherwise
    """
    # Get center ID
    center_id = record.get(table_config.get('center_id_field'))
    if not center_id:
        center_id = table_config.get('default_center_id', 0)

    # Try each candidate field
    for candidate_field in table_config.get('subject_id_candidates', []):
        if candidate_field in record and record[candidate_field]:
            local_id = str(record[candidate_field]).strip()

            try:
                # Try to resolve existing GSID
                gsid = await self.gsid_client.resolve(
                    center_id=center_id,
                    local_subject_id=local_id
                )

                if gsid:
                    logger.debug(f"Resolved {local_id} -> {gsid}")
                    return gsid

            except Exception as e:
                logger.warning(f"Failed to resolve {local_id}: {e}")

    logger.warning(f"Could not resolve subject ID for record: {record}")
    return None

5. Fragment Generation

async def generate_fragment(
    self,
    record: dict,
    table_name: str,
    gsid: str,
    batch_id: str
) -> dict:
    """
    Generate fragment from transformed record

    Args:
        record: Transformed record
        table_name: Target table
        gsid: Global subject ID
        batch_id: Batch identifier

    Returns:
        Fragment dictionary
    """
    fragment = {
        "fragment_id": f"frag_{ulid.create()}",
        "table_name": table_name,
        "source_system": "redcap",
        "project_name": self.project_config['name'],
        "project_id": self.project_config['redcap_project_id'],
        "batch_id": batch_id,
        "extracted_at": datetime.utcnow().isoformat(),
        "global_subject_id": gsid,
        "data": record
    }

    return fragment

6. S3 Upload

async def upload_fragment(self, fragment: dict) -> str:
    """
    Upload fragment to S3

    Args:
        fragment: Fragment dictionary

    Returns:
        S3 key
    """
    # Generate S3 key
    s3_key = (
        f"staging/{fragment['batch_id']}/"
        f"{fragment['table_name']}/"
        f"{fragment['fragment_id']}.json"
    )

    # Upload to S3
    await self.s3_client.upload_json(
        key=s3_key,
        data=fragment
    )

    logger.debug(f"Uploaded fragment to s3://{settings.S3_BUCKET}/{s3_key}")

    return s3_key

7. Queue Entry

async def create_queue_entry(self, fragment: dict, s3_key: str) -> int:
    """
    Create validation queue entry

    Args:
        fragment: Fragment dictionary
        s3_key: S3 key where fragment is stored

    Returns:
        Queue entry ID
    """
    query = """
        INSERT INTO validation_queue (
            batch_id,
            table_name,
            fragment_id,
            s3_key,
            source_system,
            project_name,
            status,
            created_at
        ) VALUES ($1, $2, $3, $4, $5, $6, 'pending', NOW())
        RETURNING id
    """

    queue_id = await self.db.fetchval(
        query,
        fragment['batch_id'],
        fragment['table_name'],
        fragment['fragment_id'],
        s3_key,
        fragment['source_system'],
        fragment['project_name']
    )

    return queue_id

8. Complete Pipeline

async def run(
    self,
    batch_size: int = 50,
    dry_run: bool = False,
    start_date: Optional[str] = None
) -> dict:
    """
    Run complete pipeline

    Args:
        batch_size: Records per batch
        dry_run: If True, don't upload to S3
        start_date: Optional start date for extraction

    Returns:
        Pipeline execution results
    """
    start_time = time.time()
    batch_id = f"batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"

    results = {
        'batch_id': batch_id,
        'records_extracted': 0,
        'records_transformed': 0,
        'fragments_uploaded': 0,
        'errors': 0,
        'duration_seconds': 0
    }

    try:
        # 1. Extract records
        logger.info(f"Starting extraction for project: {self.project_config['name']}")
        records = await self.extract_records(start_date)
        results['records_extracted'] = len(records)

        if not records:
            logger.info("No records to process")
            return results

        # 2. Process records by table
        for table_name in self.project_config['tables']:
            logger.info(f"Processing table: {table_name}")

            for i in range(0, len(records), batch_size):
                batch = records[i:i + batch_size]

                for record in batch:
                    try:
                        # Transform record
                        transformed = self.field_mapper.transform_record(
                            record,
                            table_name
                        )
                        results['records_transformed'] += 1

                        # Resolve subject ID
                        table_config = self.mapping_config[table_name]
                        gsid = await self.resolve_subject_id(record, table_config)

                        if not gsid:
                            logger.warning(f"Skipping record - no GSID: {record}")
                            results['errors'] += 1
                            continue

                        # Generate fragment
                        fragment = await self.generate_fragment(
                            transformed,
                            table_name,
                            gsid,
                            batch_id
                        )

                        if not dry_run:
                            # Upload to S3
                            s3_key = await self.upload_fragment(fragment)

                            # Create queue entry
                            await self.create_queue_entry(fragment, s3_key)

                            results['fragments_uploaded'] += 1

                    except Exception as e:
                        logger.error(f"Error processing record: {e}", exc_info=True)
                        results['errors'] += 1

        # 3. Update metadata
        if not dry_run:
            await self.update_last_run(datetime.utcnow())

        results['duration_seconds'] = time.time() - start_time

        logger.info(
            f"Pipeline complete: {results['fragments_uploaded']} fragments uploaded, "
            f"{results['errors']} errors in {results['duration_seconds']:.2f}s"
        )

    except Exception as e:
        logger.error(f"Pipeline failed: {e}", exc_info=True)
        raise

    return results

REDCap API Integration

API Client

class REDCapClient:
    def __init__(self, api_url: str, api_token: str):
        self.api_url = api_url
        self.api_token = api_token
        self.session = aiohttp.ClientSession()

    async def export_records(
        self,
        params: Optional[dict] = None
    ) -> list:
        """
        Export records from REDCap

        Args:
            params: Optional API parameters

        Returns:
            List of records
        """
        default_params = {
            'token': self.api_token,
            'content': 'record',
            'format': 'json',
            'type': 'flat',
            'rawOrLabel': 'raw'
        }

        if params:
            default_params.update(params)

        try:
            async with self.session.post(
                self.api_url,
                data=default_params,
                timeout=aiohttp.ClientTimeout(total=300)
            ) as response:
                response.raise_for_status()
                records = await response.json()

                return records

        except aiohttp.ClientError as e:
            logger.error(f"REDCap API error: {e}")
            raise

    async def export_metadata(self) -> list:
        """Export project metadata (data dictionary)"""
        params = {
            'token': self.api_token,
            'content': 'metadata',
            'format': 'json'
        }

        async with self.session.post(self.api_url, data=params) as response:
            response.raise_for_status()
            return await response.json()

    async def export_instruments(self) -> list:
        """Export list of instruments"""
        params = {
            'token': self.api_token,
            'content': 'instrument',
            'format': 'json'
        }

        async with self.session.post(self.api_url, data=params) as response:
            response.raise_for_status()
            return await response.json()

    async def close(self):
        """Close HTTP session"""
        await self.session.close()

Error Handling

class REDCapAPIError(Exception):
    """REDCap API error"""
    pass

class REDCapAuthError(REDCapAPIError):
    """Authentication error"""
    pass

class REDCapRateLimitError(REDCapAPIError):
    """Rate limit exceeded"""
    pass

# Retry logic
async def export_records_with_retry(
    self,
    params: dict,
    max_retries: int = 3
) -> list:
    """Export records with retry logic"""
    for attempt in range(max_retries):
        try:
            return await self.export_records(params)

        except aiohttp.ClientResponseError as e:
            if e.status == 401:
                raise REDCapAuthError("Invalid API token")
            elif e.status == 429:
                wait_time = 2 ** attempt
                logger.warning(f"Rate limited, waiting {wait_time}s")
                await asyncio.sleep(wait_time)
            else:
                raise

        except aiohttp.ClientError as e:
            if attempt == max_retries - 1:
                raise REDCapAPIError(f"Failed after {max_retries} attempts: {e}")

            wait_time = 2 ** attempt
            logger.warning(f"Request failed, retrying in {wait_time}s")
            await asyncio.sleep(wait_time)

    raise REDCapAPIError("Max retries exceeded")

Monitoring and Logging

Structured Logging

import structlog

logger = structlog.get_logger()

# Log with context
logger.info(
    "pipeline_started",
    project=project_name,
    batch_id=batch_id,
    batch_size=batch_size
)

logger.info(
    "records_extracted",
    project=project_name,
    count=len(records),
    duration_ms=duration
)

logger.error(
    "transformation_failed",
    project=project_name,
    record_id=record_id,
    error=str(e)
)

Metrics Collection

class PipelineMetrics:
    def __init__(self):
        self.metrics = {
            'records_extracted': 0,
            'records_transformed': 0,
            'fragments_uploaded': 0,
            'errors': 0,
            'api_calls': 0,
            'api_duration_ms': 0
        }

    def increment(self, metric: str, value: int = 1):
        """Increment metric"""
        self.metrics[metric] += value

    def record_duration(self, metric: str, duration_ms: float):
        """Record duration metric"""
        self.metrics[metric] = duration_ms

    def get_summary(self) -> dict:
        """Get metrics summary"""
        return {
            **self.metrics,
            'success_rate': (
                self.metrics['fragments_uploaded'] /
                self.metrics['records_extracted']
                if self.metrics['records_extracted'] > 0
                else 0
            )
        }

Health Checks

async def health_check() -> dict:
    """Check pipeline health"""
    checks = {
        'redcap_api': False,
        's3': False,
        'gsid_service': False,
        'database': False
    }

    # Check REDCap API
    try:
        await redcap_client.export_metadata()
        checks['redcap_api'] = True
    except Exception as e:
        logger.error(f"REDCap API check failed: {e}")

    # Check S3
    try:
        await s3_client.list_objects(prefix='staging/', max_keys=1)
        checks['s3'] = True
    except Exception as e:
        logger.error(f"S3 check failed: {e}")

    # Check GSID service
    try:
        await gsid_client.health_check()
        checks['gsid_service'] = True
    except Exception as e:
        logger.error(f"GSID service check failed: {e}")

    # Check database
    try:
        await db.fetchval("SELECT 1")
        checks['database'] = True
    except Exception as e:
        logger.error(f"Database check failed: {e}")

    return {
        'status': 'healthy' if all(checks.values()) else 'unhealthy',
        'checks': checks
    }

Testing

Unit Tests

import pytest
from unittest.mock import Mock, AsyncMock

@pytest.mark.asyncio
async def test_extract_records():
    """Test record extraction"""
    # Mock REDCap client
    mock_client = AsyncMock()
    mock_client.export_records.return_value = [
        {'record_id': '1', 'field1': 'value1'},
        {'record_id': '2', 'field1': 'value2'}
    ]

    pipeline = REDCapPipeline(project_config)
    pipeline.redcap_client = mock_client

    records = await pipeline.extract_records()

    assert len(records) == 2
    assert records[0]['record_id'] == '1'

@pytest.mark.asyncio
async def test_field_mapping():
    """Test field mapping transformation"""
    mapper = FieldMapper(mapping_config)

    source_record = {
        'k_number': 'K001',
        'niddk_number': '12345',
        'passage_num': '8'
    }

    transformed = mapper.transform_record(source_record, 'lcl')

    assert transformed['knumber'] == 'K001'
    assert transformed['niddk_no'] == '12345'
    assert transformed['passage_number'] == 8  # Transformed to int

@pytest.mark.asyncio
async def test_subject_id_resolution():
    """Test subject ID resolution"""
    mock_gsid_client = AsyncMock()
    mock_gsid_client.resolve.return_value = '01HQXYZ123'

    pipeline = REDCapPipeline(project_config)
    pipeline.gsid_client = mock_gsid_client

    record = {'consortium_id': 'GAP-001', 'center_id': 1}
    table_config = {
        'subject_id_candidates': ['consortium_id'],
        'center_id_field': 'center_id'
    }

    gsid = await pipeline.resolve_subject_id(record, table_config)

    assert gsid == '01HQXYZ123'
    mock_gsid_client.resolve.assert_called_once_with(
        center_id=1,
        local_subject_id='GAP-001'
    )

Integration Tests

@pytest.mark.integration
@pytest.mark.asyncio
async def test_full_pipeline():
    """Test complete pipeline execution"""
    pipeline = REDCapPipeline(project_config)

    results = await pipeline.run(
        batch_size=10,
        dry_run=True
    )

    assert results['records_extracted'] > 0
    assert results['errors'] == 0
    assert results['duration_seconds'] > 0

Troubleshooting

Common Issues

1. Authentication Failures

Symptom: 401 Unauthorized from REDCap API

Solution:

# Verify API token
echo $REDCAP_API_TOKEN_GAP

# Test API access
curl -X POST https://redcap.example.edu/api/ \
  -d "token=$REDCAP_API_TOKEN_GAP" \
  -d "content=metadata" \
  -d "format=json"

# Check token permissions in REDCap
# User Rights > API Export

2. Field Mapping Errors

Symptom: Missing fields in transformed records

Solution:

# Validate field mappings
def validate_field_mappings(source_record, mapping_config):
    """Check if all mapped fields exist"""
    missing_fields = []

    for target_field, source_field in mapping_config['field_mapping'].items():
        if source_field not in source_record:
            missing_fields.append(source_field)

    if missing_fields:
        logger.warning(f"Missing source fields: {missing_fields}")

    return missing_fields

# Export REDCap data dictionary
metadata = await redcap_client.export_metadata()
field_names = [field['field_name'] for field in metadata]

# Compare with mapping config
for source_field in mapping_config['field_mapping'].values():
    if source_field not in field_names:
        logger.error(f"Field not in