Skip to main content

Error Handling

Error Types

from tracia import Tracia, TraciaError, TraciaErrorCode

try:
    result = client.run_local(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello!"}],
    )
except TraciaError as error:
    if error.code == TraciaErrorCode.MISSING_PROVIDER_API_KEY:
        print("Set OPENAI_API_KEY environment variable")
    elif error.code == TraciaErrorCode.UNSUPPORTED_MODEL:
        print("Specify provider explicitly for custom models")
    elif error.code == TraciaErrorCode.PROVIDER_ERROR:
        print(f"Provider error: {error.message}")
    elif error.code == TraciaErrorCode.INVALID_REQUEST:
        print(f"Invalid request: {error.message}")
    else:
        print(f"Unexpected error: {error.message}")

Retry Logic

Implement retry logic for transient errors:
import time
from tracia import Tracia, TraciaError, TraciaErrorCode, RunLocalResult

NON_RETRYABLE = {
    TraciaErrorCode.MISSING_PROVIDER_API_KEY,
    TraciaErrorCode.UNSUPPORTED_MODEL,
    TraciaErrorCode.INVALID_REQUEST,
}


def run_with_retry(client: Tracia, max_retries: int = 3, **kwargs) -> RunLocalResult:
    for attempt in range(1, max_retries + 1):
        try:
            return client.run_local(**kwargs)
        except TraciaError as error:
            if error.code in NON_RETRYABLE:
                raise
            if attempt < max_retries:
                delay = 2**attempt
                print(f"Attempt {attempt} failed, retrying in {delay}s...")
                time.sleep(delay)
                continue
            raise
    raise RuntimeError("Unreachable")

Error Recovery with Fallback

Use a different model or provider as fallback:
def run_with_fallback(messages):
    models = ["gpt-4o", "claude-sonnet-4-20250514", "gemini-2.0-flash"]

    for model in models:
        try:
            return client.run_local(model=model, messages=messages)
        except TraciaError as error:
            if error.code == TraciaErrorCode.PROVIDER_ERROR:
                print(f"{model} failed, trying next provider...")
                continue
            raise

    raise RuntimeError("All providers failed")

Concurrent Requests

Parallel Execution with ThreadPoolExecutor

Run multiple requests in parallel:
import concurrent.futures

prompts = [
    "Explain recursion",
    "Explain closures",
    "Explain generators",
]


def run_prompt(content: str):
    return client.run_local(
        model="gpt-4o",
        messages=[{"role": "user", "content": content}],
        tags=["batch"],
    )


with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    futures = {executor.submit(run_prompt, p): p for p in prompts}
    for future in concurrent.futures.as_completed(futures):
        prompt = futures[future]
        result = future.result()
        print(f"{prompt}: {result.usage.total_tokens} tokens")

Async Parallel Execution

import asyncio


async def run_parallel():
    prompts = [
        "Explain recursion",
        "Explain closures",
        "Explain generators",
    ]

    tasks = [
        client.arun_local(
            model="gpt-4o",
            messages=[{"role": "user", "content": content}],
            tags=["batch"],
        )
        for content in prompts
    ]

    results = await asyncio.gather(*tasks)

    for prompt, result in zip(prompts, results):
        print(f"{prompt}: {result.usage.total_tokens} tokens")

Rate Limiting

Control concurrency to avoid rate limits:
import asyncio


async def run_with_concurrency_limit(tasks, limit: int):
    semaphore = asyncio.Semaphore(limit)

    async def limited_task(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(*(limited_task(t) for t in tasks))


# Run with max 3 concurrent requests
tasks = [
    client.arun_local(
        model="gpt-4o",
        messages=[{"role": "user", "content": content}],
    )
    for content in prompts
]

results = await run_with_concurrency_limit(tasks, limit=3)

Batch Processing with Progress

def process_batch(items: list[str], on_progress=None):
    results = []
    total = len(items)

    for i, item in enumerate(items):
        result = client.run_local(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": item}],
            tags=["batch-processing"],
        )
        results.append(result)
        if on_progress:
            on_progress(i + 1, total)

    return results


# Usage
process_batch(items, on_progress=lambda done, total: print(f"Progress: {done}/{total} ({done*100//total}%)"))

Timeout Handling

Set request timeouts to prevent hanging:
result = client.run_local(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Complex analysis..."}],
    timeout_ms=30000,  # 30 seconds
)

Timeout with Fallback

def run_with_timeout_fallback(messages):
    try:
        return client.run_local(
            model="gpt-4o",
            messages=messages,
            timeout_ms=10000,
        )
    except Exception:
        # Try faster model on timeout
        return client.run_local(
            model="gpt-4o-mini",
            messages=messages,
            timeout_ms=30000,
        )

Multi-Tenant Applications

Handle multiple API keys for different customers:
class TenantLLMService:
    def __init__(self, tracia_api_key: str):
        self.client = Tracia(api_key=tracia_api_key)
        self.api_keys: dict[str, dict[str, str]] = {}

    def set_tenant_keys(self, tenant_id: str, keys: dict[str, str]):
        self.api_keys[tenant_id] = keys

    def run_for_tenant(self, tenant_id: str, **kwargs):
        keys = self.api_keys.get(tenant_id)
        if not keys:
            raise ValueError("Tenant not found")

        model = kwargs.get("model", "")
        provider = "anthropic" if model.startswith("claude") else "openai"
        provider_api_key = keys.get(provider)

        return self.client.run_local(
            provider_api_key=provider_api_key,
            user_id=tenant_id,
            **kwargs,
        )

Logging and Monitoring

Structured Logging

import json
import logging
from datetime import datetime

logger = logging.getLogger(__name__)

client = Tracia(
    api_key="tr_your_api_key",
    on_span_error=lambda error, span_id: logger.error(
        json.dumps({
            "type": "span_submission_failed",
            "span_id": span_id,
            "error": str(error),
            "timestamp": datetime.now().isoformat(),
        })
    ),
)


def run_with_logging(messages):
    result = client.run_local(
        model="gpt-4o",
        messages=messages,
        tags=["monitored"],
    )

    logger.info(json.dumps({
        "type": "llm_request",
        "span_id": result.span_id,
        "model": result.model,
        "provider": result.provider,
        "latency_ms": result.latency_ms,
        "input_tokens": result.usage.input_tokens,
        "output_tokens": result.usage.output_tokens,
        "timestamp": datetime.now().isoformat(),
    }))

    return result