Skip to content

Latest commit

 

History

History
1067 lines (785 loc) · 23 KB

File metadata and controls

1067 lines (785 loc) · 23 KB

Common Workflows

Quick reference for "How do I..." tasks.

Each workflow includes: context, steps, verification, and troubleshooting.


Table of Contents


Adding a New Exchange

When: Expanding platform to support Bybit, Coinbase, etc.

Time: 4-6 hours

Prerequisites

  • Exchange API documentation
  • Sample API responses
  • Exchange account (for testing rate limits)

Steps

1. Research Exchange API (30 min)

# Test API manually
curl https://api.bybit.com/v5/market/instruments-info?category=spot | jq . > bybit_sample.json

# Analyze response structure
cat bybit_sample.json | jq '.result.list[0]'

Document:

  • API endpoint URL
  • Rate limits (requests/second)
  • Authentication required? (yes/no)
  • Response format differences

2. Create Exchange Client (1 hour)

# Create client file
touch src/refdata/ingestion/sources/bybit.py
# src/refdata/ingestion/sources/bybit.py

"""Bybit exchange client for instrument ingestion."""

import logging
from typing import Any

from refdata.ingestion.sources.base import BaseExchangeClient

logger = logging.getLogger(__name__)


class BybitClient(BaseExchangeClient):
    """Bybit REST API client."""

    def __init__(self):
        """Initialize Bybit client."""
        super().__init__(
            exchange_name="bybit",
            base_url="https://api.bybit.com",
            instruments_endpoint="/v5/market/instruments-info",
            rate_limit_rps=10,  # Check docs for actual limit
        )

    def fetch_instruments(self) -> dict[str, Any]:
        """Fetch all instruments from Bybit.

        Returns:
            Raw API response

        Raises:
            ExchangeAPIError: If API call fails
        """
        params = {"category": "spot"}  # Start with spot only

        response = self._make_request("GET", self.instruments_endpoint, params=params)
        response_json = response.json()

        # Bybit-specific validation
        if response_json.get("retCode") != 0:
            raise ExchangeAPIError(
                f"Bybit API error: {response_json.get('retMsg', 'Unknown error')}"
            )

        return response_json

    def parse_instruments(self, api_response: dict[str, Any]) -> list[dict[str, Any]]:
        """Parse Bybit API response to normalized format.

        Args:
            api_response: Raw Bybit API response

        Returns:
            List of parsed instrument dictionaries
        """
        instruments = []

        for item in api_response.get("result", {}).get("list", []):
            # Map Bybit fields to our standard format
            parsed = {
                "exchange": "bybit",
                "symbol": item.get("symbol"),  # e.g., "BTCUSDT"
                "base_asset": item.get("baseCoin"),
                "quote_asset": item.get("quoteCoin"),
                "status": "active" if item.get("status") == "Trading" else "suspended",
                "instrument_type": "spot",  # Bybit uses different endpoints for derivatives
                "tick_size": item.get("priceFilter", {}).get("tickSize"),
                "lot_size": item.get("lotSizeFilter", {}).get("basePrecision"),
                "min_notional": item.get("lotSizeFilter", {}).get("minOrderQty"),
                "max_leverage": None,  # Spot doesn't have leverage
                "funding_interval_hours": None,
                "settlement_asset": None,
            }

            instruments.append(parsed)

        logger.info("Parsed Bybit instruments", count=len(instruments))
        return instruments

3. Write Unit Tests (1 hour)

# tests/unit/ingestion/test_bybit_client.py

import pytest
from unittest.mock import patch, Mock
from refdata.ingestion.sources.bybit import BybitClient


@pytest.fixture
def bybit_sample_response():
    """Sample Bybit API response."""
    return {
        "retCode": 0,
        "retMsg": "OK",
        "result": {
            "list": [
                {
                    "symbol": "BTCUSDT",
                    "baseCoin": "BTC",
                    "quoteCoin": "USDT",
                    "status": "Trading",
                    "priceFilter": {"tickSize": "0.01"},
                    "lotSizeFilter": {
                        "basePrecision": "0.00001",
                        "minOrderQty": "0.001"
                    }
                }
            ]
        }
    }


@pytest.mark.unit
def test_fetch_instruments_success(bybit_sample_response):
    """Test successful fetch from Bybit API."""
    with patch('httpx.Client.request') as mock_request:
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = bybit_sample_response
        mock_response.raise_for_status = Mock()
        mock_request.return_value = mock_response

        client = BybitClient()
        result = client.fetch_instruments()

        assert result == bybit_sample_response
        assert result["retCode"] == 0
        client.close()


@pytest.mark.unit
def test_parse_instruments(bybit_sample_response):
    """Test parsing Bybit instruments."""
    client = BybitClient()
    parsed = client.parse_instruments(bybit_sample_response)

    assert len(parsed) == 1
    assert parsed[0]["exchange"] == "bybit"
    assert parsed[0]["symbol"] == "BTCUSDT"
    assert parsed[0]["tick_size"] == "0.01"

    client.close()

Run tests:

pytest tests/unit/ingestion/test_bybit_client.py -v

4. Create Avro Schema (15 min)

cp config/schemas/binance_instrument_raw.avsc config/schemas/bybit_instrument_raw.avsc

Edit to match Bybit response structure (usually same as Binance).

5. Register Schema (5 min)

# Add to scripts/register_schemas.py

# Bybit schema
with open('config/schemas/bybit_instrument_raw.avsc') as f:
    bybit_schema = Schema(f.read(), schema_type='AVRO')
    sr_client.register_schema(
        subject_name='refdata-bybit-instrument-raw-value',
        schema=bybit_schema
    )

Run:

python scripts/register_schemas.py

6. Create Bronze Table (5 min)

# Add to scripts/init_iceberg_catalog.py

catalog.create_table(
    "refdata.bronze_instruments_bybit",
    schema=bronze_schema,  # Same schema as other exchanges
    partition_spec=PartitionSpec(
        PartitionField(source_id=2, transform=DayTransform(), name="ingestion_date")
    )
)

Run:

python scripts/init_iceberg_catalog.py

7. Add to CLI (15 min)

# Edit src/refdata/cli/ingest.py

from refdata.ingestion.sources.bybit import BybitClient

# In ingest_exchange():
elif exchange == "bybit":
    client = BybitClient()

8. Update DBT Sources (15 min)

# dbt/models/bronze/sources.yml

tables:
  - name: bronze_instruments_bybit
    description: Raw Bybit /v5/market/instruments-info responses
    identifier: bronze_instruments_bybit
    # ... same structure as other exchanges

9. Update Silver Model (30 min)

-- dbt/models/silver/silver_instruments.sql

bronze_bybit AS (
    SELECT
        ingestion_id,
        ingestion_timestamp,
        api_response_raw
    FROM {{ source('bronze', 'bronze_instruments_bybit') }}

    {% if is_incremental() %}
    WHERE ingestion_timestamp > (
        SELECT COALESCE(MAX(record_created_at), '1900-01-01'::TIMESTAMP)
        FROM {{ this }}
        WHERE source_system = 'bybit'
    )
    {% endif %}
),

parsed_bybit AS (
    SELECT
        'bybit' AS exchange,
        ingestion_id,
        ingestion_timestamp,

        json_extract_string(item_data, '$.symbol') AS symbol,
        json_extract_string(item_data, '$.baseCoin') AS base_asset,
        json_extract_string(item_data, '$.quoteCoin') AS quote_asset,
        -- ... rest of fields

        ingestion_timestamp AS valid_from,
        CAST(item_data AS STRING) AS raw_metadata

    FROM bronze_bybit,
    LATERAL (
        SELECT UNNEST(json_extract(api_response_raw, '$.result.list')) AS item_data
    )
),

-- Add to union
all_instruments AS (
    SELECT * FROM parsed_binance
    UNION ALL
    SELECT * FROM parsed_kraken
    UNION ALL
    SELECT * FROM parsed_bybit  -- NEW
),

10. Update Gold Symbology (15 min)

-- dbt/models/gold/gold_symbology_master.sql

-- In pivoted CTE:
MAX(CASE WHEN exchange = 'bybit' THEN symbol END) AS bybit_symbol,

11. Test End-to-End (30 min)

# Ingest
make ingest-now  # Should now include Bybit

# Verify Bronze
duckdb -c "
SELECT COUNT(*) FROM iceberg_scan('s3://refdata-warehouse/bronze/instruments/bybit')
"

# Run DBT
make dbt-run
make dbt-test

# Verify Silver
duckdb -c "
SELECT exchange, COUNT(*)
FROM iceberg_scan('s3://refdata-warehouse/silver/instruments')
GROUP BY exchange
"
# Should show: binance, kraken, bybit

# Test API
curl "http://localhost:8001/v1/instruments?exchange=bybit&limit=5"

# Test symbology
curl "http://localhost:8001/v1/symbology/resolve?exchange=bybit&symbol=BTCUSDT"

Verification Checklist

  • Unit tests pass (make test-unit)
  • Integration tests pass (make test-integration)
  • DBT tests pass (make dbt-test)
  • Bronze table populated (check count)
  • Silver table has Bybit records
  • Gold symbology includes Bybit mappings
  • API returns Bybit instruments
  • Documentation updated

Common Issues

Issue: API returns 403 Forbidden

  • Fix: Check if API key required, add authentication

Issue: DBT parsing fails

  • Fix: Check JSON structure matches expectations
    duckdb -c "SELECT api_response_raw FROM iceberg_scan('...') LIMIT 1" | jq .

Issue: No records in Silver

  • Fix: Check Bronze has data, verify DBT incremental logic
    dbt run --select silver_instruments --full-refresh

Adding a New Field

When: Exchange adds new field to API, or we need to track additional data.

Time: 1-2 hours

Steps

1. Identify Field Source (15 min)

# Check exchange API response
curl <exchange_api_url> | jq . > response.json

# Find new field
cat response.json | jq '.symbols[0].newFieldName'

2. Update Avro Schema (optional, 10 min)

Only if field needs schema validation:

// config/schemas/binance_instrument_raw.avsc

{
  "name": "new_field_name",
  "type": ["null", "string"],
  "default": null,
  "doc": "Description of what this field means"
}

Increment schema version and re-register.

3. Update Silver Model (20 min)

-- dbt/models/silver/silver_instruments.sql

-- In parsed_binance (or relevant exchange):
json_extract_string(symbol_data, '$.newFieldName') AS new_field_name,

-- In final CTE:
new_field_name,

4. Add Column to Schema Doc (10 min)

# dbt/models/silver/silver_instruments.yml

- name: new_field_name
  description: What this field represents and why we track it
  tests:
    - not_null  # If required
    - accepted_values:  # If enum
        values: ['value1', 'value2']

5. Update API Model (15 min)

# src/refdata/api/models.py

class InstrumentResponse(BaseModel):
    # ... existing fields ...

    new_field_name: str | None = Field(
        None,
        description="Field description",
        examples=["example_value"]
    )

6. Test (20 min)

# Run DBT
make dbt-run
make dbt-test

# Query new field
duckdb -c "
SELECT new_field_name, COUNT(*)
FROM iceberg_scan('s3://refdata-warehouse/silver/instruments')
GROUP BY new_field_name
"

# Test API
make api-dev
curl "http://localhost:8001/v1/instruments?limit=1" | jq .new_field_name

7. Update Documentation (10 min)

# Update docs/architecture/SCHEMA.md

## Silver Instruments

### new_field_name
- **Type**: STRING
- **Source**: Exchange API field `newFieldName`
- **Purpose**: Explanation
- **Added**: 2024-01-23 (ticket #123)

Verification

  • Field appears in Silver table
  • DBT tests pass
  • API returns field
  • Documentation updated
  • No null values (if required)

Fixing a Bug

When: Something's broken, need to fix it.

Time: Varies

Steps

1. Reproduce Bug (30 min)

# Get exact error message
# Copy from logs, issue, or user report

# Try to reproduce locally
make docker-up
make ingest-binance  # Or whatever triggers the bug

# Capture full stack trace
python -m pdb <failing_command>  # If Python

Document:

  • Exact error message
  • Steps to reproduce
  • Expected vs actual behavior
  • Environment (local/staging/prod)

2. Write Failing Test (20 min)

Test-Driven Fix: Write test that fails, then fix it.

# tests/unit/test_bug_fix.py

@pytest.mark.unit
def test_handles_missing_field_gracefully():
    """
    Bug: Crashes when exchange doesn't include optional field.

    Given: API response missing optional field
    When: We parse the response
    Then: Should use default value, not crash
    """
    # Arrange
    response = {"symbol": "BTCUSDT"}  # Missing tickSize

    # Act
    result = parse_instruments(response)

    # Assert
    assert result["tick_size"] is None  # Should default, not crash

Run test to confirm it fails:

pytest tests/unit/test_bug_fix.py -v
# Should FAIL initially

3. Fix the Code (30-60 min)

# src/refdata/ingestion/sources/binance.py

# Before (buggy):
tick_size = item["filters"][0]["tickSize"]  # Crashes if missing

# After (fixed):
tick_size = item.get("filters", [{}])[0].get("tickSize")  # Returns None if missing

4. Verify Fix (15 min)

# Run test again
pytest tests/unit/test_bug_fix.py -v
# Should PASS now

# Run all tests
make test-all

5. Add Regression Test (if not done in step 2)

Ensure bug doesn't come back.

6. Update Documentation (10 min)

# Add comment explaining the fix
def parse_instruments(self, api_response: dict) -> list[dict]:
    """Parse Binance instruments.

    Note: filters array is optional in Binance API.
    When missing, default to None for all filter fields.
    See: bug #456 (2024-01-23)
    """

Verification

  • Test passes
  • All other tests still pass
  • Bug can't be reproduced
  • Code review approved
  • Deployed to staging (if applicable)

Investigating Data Quality Issues

When: Data looks wrong, tests failing, or user reports issue.

Time: 30 min - 2 hours

Symptoms

  • DBT test failures
  • Unexpected row counts
  • Null values where shouldn't be
  • Duplicates
  • Missing data

Investigation Steps

1. Check Recent Changes (10 min)

# Git history
git log --oneline --since="3 days ago" -- dbt/models/

# Recent deployments
kubectl get pods -n refdata-platform --sort-by=.status.startTime

# Recent ingestion runs
kubectl get jobs -n refdata-platform | grep ingestion | tail -5

Question: Did quality issue start after recent change?

2. Check Data at Each Layer (30 min)

Bronze:

# Row count
duckdb -c "
SELECT COUNT(*) as bronze_count
FROM iceberg_scan('s3://refdata-warehouse/bronze/instruments/binance')
"

# Latest ingestion
duckdb -c "
SELECT MAX(ingestion_timestamp) as latest_ingestion
FROM iceberg_scan('s3://refdata-warehouse/bronze/instruments/binance')
"

# Sample record
duckdb -c "
SELECT api_response_raw
FROM iceberg_scan('s3://refdata-warehouse/bronze/instruments/binance')
ORDER BY ingestion_timestamp DESC
LIMIT 1
" | jq .

Silver:

# Row count by exchange
duckdb -c "
SELECT exchange, COUNT(*), MAX(record_created_at) as latest
FROM iceberg_scan('s3://refdata-warehouse/silver/instruments')
GROUP BY exchange
"

# Check for nulls in required fields
duckdb -c "
SELECT COUNT(*) as null_tick_size
FROM iceberg_scan('s3://refdata-warehouse/silver/instruments')
WHERE tick_size IS NULL
"

# Check for duplicates
duckdb -c "
SELECT instrument_sk, COUNT(*)
FROM iceberg_scan('s3://refdata-warehouse/silver/instruments')
WHERE valid_to IS NULL
GROUP BY instrument_sk
HAVING COUNT(*) > 1
"

Gold:

# Row count
duckdb -c "
SELECT COUNT(*) as gold_count
FROM iceberg_scan('s3://refdata-warehouse/gold/symbology')
"

# Check for orphans (no exchange symbols)
duckdb -c "
SELECT *
FROM iceberg_scan('s3://refdata-warehouse/gold/symbology')
WHERE binance_symbol IS NULL
  AND kraken_symbol IS NULL
  AND bybit_symbol IS NULL
"

3. Run DBT Tests (10 min)

cd dbt

# All tests
dbt test

# Specific model
dbt test --select silver_instruments

# View failed test SQL
cat target/compiled/k2_refdata/models/silver/silver_instruments.yml/<test_name>.sql

4. Check Logs (20 min)

# Ingestion logs
kubectl logs -n refdata-platform -l job-name=refdata-ingestion --tail=500

# DBT logs
kubectl logs -n refdata-platform -l job-name=refdata-dbt --tail=500

# API logs
kubectl logs -n refdata-platform -l app=refdata-api --tail=500

# Search for errors
kubectl logs -n refdata-platform -l app=refdata-api | grep ERROR

5. Compare with Known Good State (20 min)

# Check Iceberg snapshot history
# Find timestamp when data was good

# Query at that snapshot
# (Iceberg time-travel query)

Common Issues & Fixes

Issue: Duplicate records in Silver

  • Cause: SCD Type 2 logic not closing old records
  • Fix: Check UPDATE statement in DBT model

Issue: Missing data after DBT run

  • Cause: Incremental logic filtering too aggressively
  • Fix: Full refresh: dbt run --full-refresh --select silver_instruments

Issue: Null values in required fields

  • Cause: Exchange API changed response structure
  • Fix: Update JSON parsing in DBT model

Correcting Bad Data

When: Data corruption detected, need to fix manually.

Time: 30 min - 1 hour

IMPORTANT: Follow MANUAL-OVERRIDE.md for detailed procedures.

Quick Reference

1. Verify Current State

duckdb -c "
SELECT symbol, tick_size, valid_from, valid_to, record_created_at
FROM iceberg_scan('s3://refdata-warehouse/silver/instruments')
WHERE exchange = 'binance' AND symbol = 'BTCUSDT'
ORDER BY valid_from DESC
"

2. Determine Correction Type

  • Type A: Update current record (simple field change)
  • Type B: Historical correction (SCD Type 2)
  • Type C: Close invalid record

3. Execute Correction

See MANUAL-OVERRIDE.md for SQL examples.

4. Verify via API

curl "http://localhost:8001/v1/instruments?exchange=binance&symbol=BTCUSDT"
curl "http://localhost:8001/v1/instruments/binance/BTCUSDT/history"

5. Document

# Create correction report
cat > docs/corrections/$(date +%Y-%m-%d)_BTCUSDT_correction.md << EOF
# Correction: BTCUSDT tick_size

**Date**: $(date +%Y-%m-%d)
**Operator**: your_name
**Reason**: Explanation

## Changes
- Old: tick_size=0.05
- New: tick_size=0.01

## Verification
- [x] API returns corrected value
- [x] History preserved
EOF

Deploying to Production

When: Shipping new feature or fix.

Time: 2-4 hours (including verification)

IMPORTANT: Follow DEPLOYMENT-CHECKLIST.md for complete process.

Quick Steps

1. Pre-Deployment

# All tests pass
make test-all

# Code quality
make quality

# Create release tag
git tag -a v1.1.0 -m "Release 1.1.0: Add Bybit exchange"
git push origin v1.1.0

2. Build & Push Docker Image

# Build
docker build -t k2-refdata-api:v1.1.0 .

# Push to registry
docker push k2-refdata-api:v1.1.0

3. Update Kubernetes

# Update deployment
kubectl set image deployment/refdata-api \
  refdata-api=k2-refdata-api:v1.1.0 \
  -n refdata-platform

# Watch rollout
kubectl rollout status deployment/refdata-api -n refdata-platform

4. Verify

# Health check
curl https://refdata-api.k2.com/health

# Smoke test
curl "https://refdata-api.k2.com/v1/instruments?limit=1"

# Check logs
kubectl logs -n refdata-platform -l app=refdata-api --tail=100

Rolling Back a Deployment

When: Production issue after deployment.

Time: 5-10 minutes

Steps

1. Identify Problem

# Check error rate
# Check logs
kubectl logs -n refdata-platform -l app=refdata-api | grep ERROR

# Check metrics (Grafana)

2. Rollback

# Rollback to previous version
kubectl rollout undo deployment/refdata-api -n refdata-platform

# Verify rollback
kubectl rollout status deployment/refdata-api -n refdata-platform

# Check health
curl https://refdata-api.k2.com/health

3. Communicate

# Post in Slack
"🔥 Rolled back refdata-api deployment due to [issue]. Investigating root cause."

4. Root Cause Analysis

  • Create incident report
  • Schedule post-mortem
  • Fix issue locally
  • Add tests
  • Re-deploy when fixed

Optimizing Query Performance

When: API latency high, DBT runs slow.

Time: 1-3 hours

API Query Optimization

1. Profile Query

# Add timing to query
import time

start = time.time()
result = conn.execute(query).fetchall()
duration = time.time() - start

logger.info("Query executed", duration_ms=duration * 1000)

2. Check Execution Plan

duckdb -c "
EXPLAIN
SELECT *
FROM iceberg_scan('s3://refdata-warehouse/silver/instruments')
WHERE exchange = 'binance' AND symbol = 'BTCUSDT'
"

Look for:

  • Full table scans
  • Missing filters
  • Large intermediate results

3. Optimize

Add Filters Early:

-- Bad: Filter after join
SELECT * FROM big_table JOIN small_table USING (id)
WHERE exchange = 'binance'

-- Good: Filter before join
SELECT * FROM (
  SELECT * FROM big_table WHERE exchange = 'binance'
) JOIN small_table USING (id)

Use Partition Pruning:

-- Iceberg partitioned by exchange, months(valid_from)
-- This query prunes to single partition:
WHERE exchange = 'binance'
  AND valid_from >= '2024-01-01'
  AND valid_from < '2024-02-01'

Increase Connection Pool:

# src/refdata/common/duckdb_pool.py
DuckDBConnectionPool(
    min_connections=10,  # Increase from 5
    max_connections=100,  # Increase from 50
)

DBT Optimization

1. Profile Model Run

dbt run --select silver_instruments --debug
# Check execution time in output

2. Optimize Incremental Logic

-- Avoid expensive subquery
{% if is_incremental() %}
WHERE ingestion_timestamp > (
    -- This runs for EVERY row - expensive!
    SELECT MAX(record_created_at) FROM {{ this }}
)
{% endif %}

-- Better: Use dbt_utils
{% if is_incremental() %}
{% set max_timestamp = run_query("SELECT MAX(record_created_at) FROM " ~ this).columns[0][0] %}
WHERE ingestion_timestamp > '{{ max_timestamp }}'
{% endif %}

3. Use --threads for Parallelism

# Run models in parallel
dbt run --threads 8

Need help with a workflow not listed here?

Check:

  1. GETTING-STARTED.md - Initial setup
  2. DBT-GUIDE.md - DBT-specific tasks
  3. API-GUIDE.md - API usage
  4. Ask in #k2-refdata-platform Slack