Error Handling
Error Types
Copy
from tracia import Tracia, TraciaError, TraciaErrorCode
try:
result = client.run_local(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
)
except TraciaError as error:
if error.code == TraciaErrorCode.MISSING_PROVIDER_API_KEY:
print("Set OPENAI_API_KEY environment variable")
elif error.code == TraciaErrorCode.UNSUPPORTED_MODEL:
print("Specify provider explicitly for custom models")
elif error.code == TraciaErrorCode.PROVIDER_ERROR:
print(f"Provider error: {error.message}")
elif error.code == TraciaErrorCode.INVALID_REQUEST:
print(f"Invalid request: {error.message}")
else:
print(f"Unexpected error: {error.message}")
Retry Logic
Implement retry logic for transient errors:Copy
import time
from tracia import Tracia, TraciaError, TraciaErrorCode, RunLocalResult
NON_RETRYABLE = {
TraciaErrorCode.MISSING_PROVIDER_API_KEY,
TraciaErrorCode.UNSUPPORTED_MODEL,
TraciaErrorCode.INVALID_REQUEST,
}
def run_with_retry(client: Tracia, max_retries: int = 3, **kwargs) -> RunLocalResult:
for attempt in range(1, max_retries + 1):
try:
return client.run_local(**kwargs)
except TraciaError as error:
if error.code in NON_RETRYABLE:
raise
if attempt < max_retries:
delay = 2**attempt
print(f"Attempt {attempt} failed, retrying in {delay}s...")
time.sleep(delay)
continue
raise
raise RuntimeError("Unreachable")
Error Recovery with Fallback
Use a different model or provider as fallback:Copy
def run_with_fallback(messages):
models = ["gpt-4o", "claude-sonnet-4-20250514", "gemini-2.0-flash"]
for model in models:
try:
return client.run_local(model=model, messages=messages)
except TraciaError as error:
if error.code == TraciaErrorCode.PROVIDER_ERROR:
print(f"{model} failed, trying next provider...")
continue
raise
raise RuntimeError("All providers failed")
Concurrent Requests
Parallel Execution with ThreadPoolExecutor
Run multiple requests in parallel:Copy
import concurrent.futures
prompts = [
"Explain recursion",
"Explain closures",
"Explain generators",
]
def run_prompt(content: str):
return client.run_local(
model="gpt-4o",
messages=[{"role": "user", "content": content}],
tags=["batch"],
)
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(run_prompt, p): p for p in prompts}
for future in concurrent.futures.as_completed(futures):
prompt = futures[future]
result = future.result()
print(f"{prompt}: {result.usage.total_tokens} tokens")
Async Parallel Execution
Copy
import asyncio
async def run_parallel():
prompts = [
"Explain recursion",
"Explain closures",
"Explain generators",
]
tasks = [
client.arun_local(
model="gpt-4o",
messages=[{"role": "user", "content": content}],
tags=["batch"],
)
for content in prompts
]
results = await asyncio.gather(*tasks)
for prompt, result in zip(prompts, results):
print(f"{prompt}: {result.usage.total_tokens} tokens")
Rate Limiting
Control concurrency to avoid rate limits:Copy
import asyncio
async def run_with_concurrency_limit(tasks, limit: int):
semaphore = asyncio.Semaphore(limit)
async def limited_task(coro):
async with semaphore:
return await coro
return await asyncio.gather(*(limited_task(t) for t in tasks))
# Run with max 3 concurrent requests
tasks = [
client.arun_local(
model="gpt-4o",
messages=[{"role": "user", "content": content}],
)
for content in prompts
]
results = await run_with_concurrency_limit(tasks, limit=3)
Batch Processing with Progress
Copy
def process_batch(items: list[str], on_progress=None):
results = []
total = len(items)
for i, item in enumerate(items):
result = client.run_local(
model="gpt-4o-mini",
messages=[{"role": "user", "content": item}],
tags=["batch-processing"],
)
results.append(result)
if on_progress:
on_progress(i + 1, total)
return results
# Usage
process_batch(items, on_progress=lambda done, total: print(f"Progress: {done}/{total} ({done*100//total}%)"))
Timeout Handling
Set request timeouts to prevent hanging:Copy
result = client.run_local(
model="gpt-4o",
messages=[{"role": "user", "content": "Complex analysis..."}],
timeout_ms=30000, # 30 seconds
)
Timeout with Fallback
Copy
def run_with_timeout_fallback(messages):
try:
return client.run_local(
model="gpt-4o",
messages=messages,
timeout_ms=10000,
)
except Exception:
# Try faster model on timeout
return client.run_local(
model="gpt-4o-mini",
messages=messages,
timeout_ms=30000,
)
Multi-Tenant Applications
Handle multiple API keys for different customers:Copy
class TenantLLMService:
def __init__(self, tracia_api_key: str):
self.client = Tracia(api_key=tracia_api_key)
self.api_keys: dict[str, dict[str, str]] = {}
def set_tenant_keys(self, tenant_id: str, keys: dict[str, str]):
self.api_keys[tenant_id] = keys
def run_for_tenant(self, tenant_id: str, **kwargs):
keys = self.api_keys.get(tenant_id)
if not keys:
raise ValueError("Tenant not found")
model = kwargs.get("model", "")
provider = "anthropic" if model.startswith("claude") else "openai"
provider_api_key = keys.get(provider)
return self.client.run_local(
provider_api_key=provider_api_key,
user_id=tenant_id,
**kwargs,
)
Logging and Monitoring
Structured Logging
Copy
import json
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
client = Tracia(
api_key="tr_your_api_key",
on_span_error=lambda error, span_id: logger.error(
json.dumps({
"type": "span_submission_failed",
"span_id": span_id,
"error": str(error),
"timestamp": datetime.now().isoformat(),
})
),
)
def run_with_logging(messages):
result = client.run_local(
model="gpt-4o",
messages=messages,
tags=["monitored"],
)
logger.info(json.dumps({
"type": "llm_request",
"span_id": result.span_id,
"model": result.model,
"provider": result.provider,
"latency_ms": result.latency_ms,
"input_tokens": result.usage.input_tokens,
"output_tokens": result.usage.output_tokens,
"timestamp": datetime.now().isoformat(),
}))
return result

