Helix API API reference and usage patterns for Teleon Helix
Helix is Teleon’s production runtime system for AI agents.
Helix manages the agent lifecycle in production: process management, health checking, auto-scaling, and runtime configuration (CPU/memory limits, timeouts). It also includes LLM-oriented runtime features like token tracking and cost / budget management.
Run agents reliably in production with health checks and controlled resource limits.
Scale up and down with min_instances / max_instances.
Track LLM usage (tokens, costs) and apply budgets for long-running workloads.
from teleon import TeleonClient
client = TeleonClient( api_key = "tlk_live_xxx" )
@client.agent (
name = "support" ,
helix = {
"min_instances" : 1 ,
"max_instances" : 10 ,
"memory_limit_mb" : 512 ,
},
)
async def support_agent (query: str ):
return "ok"
from teleon.helix import AgentRuntime, RuntimeConfig, ResourceConfig
config = RuntimeConfig(
environment = "production" ,
hot_reload = False ,
max_workers = 20 ,
)
runtime = AgentRuntime(config)
await runtime.register_agent(
agent_id = "my-agent" ,
agent_callable = my_agent_function,
resources = ResourceConfig( min_instances = 2 , max_instances = 10 ),
)
await runtime.start()
await runtime.start_agent( "my-agent" )
Component Purpose AgentRuntimeMain runtime for agent lifecycle management ProcessManagerProcess spawning and monitoring HealthCheckerHealth and readiness checks ScalerAutomatic scaling based on metrics FileWatcherHot reload for development
from teleon.helix import RuntimeConfig, ResourceConfig
config = RuntimeConfig(
environment = "development" ,
debug = False ,
hot_reload = True ,
watch_paths = [ "teleon" , "agents" ],
max_workers = 10 ,
log_level = "INFO" ,
default_resources = ResourceConfig( min_instances = 1 , max_instances = 5 ),
)
from teleon.helix import ResourceConfig
resources = ResourceConfig(
cpu_limit = 2.0 ,
memory_limit_mb = 512 ,
min_instances = 1 ,
max_instances = 10 ,
startup_timeout = 30 ,
shutdown_timeout = 30 ,
health_check_enabled = True ,
health_check_interval = 30 ,
)
Option Type Default Description environmentstr "development"Runtime environment debugbool FalseEnable debug mode hot_reloadbool TrueEnable hot reload (dev only) watch_pathslist ["teleon", "agents"]Paths to watch for changes max_workersint 10Maximum worker processes log_levelstr "INFO"Log level
Main runtime for managing agent processes.
await runtime.register_agent(
agent_id = "support-agent" ,
agent_callable = support_handler,
resources = ResourceConfig( min_instances = 2 ),
health_check = HealthCheck( name = "support-health" ),
)
Parameters:
agent_id (str): Unique agent identifier
agent_callable (Callable): Agent function
resources (ResourceConfig, optional): Resource configuration
health_check (HealthCheck, optional): Health check configuration
await runtime.register_llm_agent(
agent_id = "chat-agent" ,
agent_callable = chat_handler,
model = "gpt-4" ,
max_tokens = 2000 ,
cost_budget = 10.0 ,
resources = ResourceConfig( min_instances = 3 ),
)
Parameters:
agent_id (str): Unique agent identifier
agent_callable (Callable): Agent function
model (str): LLM model name
max_tokens (int): Max tokens per request (default: 2000)
cost_budget (float, optional): Cost budget per hour
resources (ResourceConfig, optional): Resource configuration
process_ids = await runtime.start_agent( "my-agent" )
print ( f "Started {len (process_ids) } instances" )
Returns: list of process IDs
await runtime.stop_agent( "my-agent" )
await runtime.stop_agent( "my-agent" , force = True )
Parameters:
agent_id (str): Agent identifier
force (bool): Force kill if True (default: False)
await runtime.scale_agent( "my-agent" , instances = 5 )
status = await runtime.get_agent_status( "my-agent" )
print ( f "Status: { status[ 'status' ] } " )
print ( f "Instances: { status[ 'instances' ] } " )
print ( f "Health: { status[ 'health' ] } " )
Returns: dict with keys status, instances, resources, processes, health
agents = await runtime.list_agents()
for agent in agents:
print ( f " { agent[ 'agent_id' ] } : { agent[ 'status' ] } " )
from teleon.helix import ProcessStatus
ProcessStatus. STARTING
ProcessStatus. RUNNING
ProcessStatus. STOPPING
ProcessStatus. STOPPED
ProcessStatus. FAILED
ProcessStatus. CRASHED
from teleon.helix import ProcessInfo
process = await manager.get_process(process_id)
print ( f "PID: { process.pid } " )
print ( f "Status: { process.status } " )
print ( f "CPU: { process.cpu_percent } %" )
print ( f "Memory: { process.memory_mb } MB" )
print ( f "Restarts: { process.restart_count } " )
from teleon.helix import HealthStatus
HealthStatus. HEALTHY
HealthStatus. UNHEALTHY
HealthStatus. DEGRADED
HealthStatus. UNKNOWN
from teleon.helix import CheckType
CheckType. READINESS
CheckType. LIVENESS
CheckType. CUSTOM
from teleon.helix import HealthCheck, CheckType
async def custom_check ():
return await database.ping()
health_check = HealthCheck(
name = "database-check" ,
check_type = CheckType. CUSTOM ,
check_fn = custom_check,
interval = 30 ,
timeout = 10 ,
failure_threshold = 3 ,
success_threshold = 1 ,
initial_delay = 5 ,
)
from teleon.helix import HealthChecker
checker = HealthChecker()
await checker.register_check( "my-agent" , health_check)
result = await checker.check_health( "my-agent" )
overall = await checker.get_overall_health()
result = await checker.check_llm_health(
target_id = "chat-agent" ,
llm_metrics = metrics,
cost_budget = 10.0 ,
)
from teleon.helix import ScalingPolicy
policy = ScalingPolicy(
min_instances = 1 ,
max_instances = 10 ,
target_cpu_percent = 70.0 ,
target_memory_percent = 80.0 ,
scale_up_cooldown = 60 ,
scale_down_cooldown = 300 ,
scale_up_step = 1 ,
scale_down_step = 1 ,
)
from teleon.helix import Scaler, ScalingMetrics
scaler = Scaler()
await scaler.register_policy( "my-agent" , policy)
metrics = ScalingMetrics( cpu_percent = 85.0 , memory_percent = 60.0 )
desired = await scaler.evaluate_scaling(
target_id = "my-agent" ,
metrics = metrics,
current_instances = 2 ,
)
from teleon.helix import WatcherConfig
config = WatcherConfig(
watch_paths = [ "teleon" , "agents" , "handlers" ],
ignore_patterns = [ "__pycache__" , "*.pyc" , ".git" , "venv" ],
debounce_seconds = 1.0 ,
)
from teleon.helix import FileWatcher
watcher = FileWatcher(config, runtime)
await watcher.start()
from teleon.helix import LLMMetrics
metrics = await tracker.get_metrics()
print ( f "Tokens/sec: { metrics.tokens_per_second } " )
print ( f "Requests/min: { metrics.requests_per_minute } " )
print ( f "P95 latency: { metrics.p95_latency_ms } ms" )
print ( f "Cost/hour: $ { metrics.cost_per_hour :.2f } " )
print ( f "Utilization: { metrics.get_utilization() * 100 :.0f } %" )
from teleon.helix import TokenCounter
counter = TokenCounter()
tokens = counter.count_tokens( "Hello" , model = "gpt-4" )
tokens = counter.count_messages_tokens(
[
{ "role" : "system" , "content" : "You are a helpful assistant." },
{ "role" : "user" , "content" : "What is the weather?" },
],
model = "gpt-4" ,
)
from teleon.helix import LLMResourceTracker
tracker = LLMResourceTracker( agent_id = "chat-agent" , model = "gpt-4" , window_size = 300 )
await tracker.record_request(
input_tokens = 150 ,
output_tokens = 200 ,
latency_ms = 1500.0 ,
ttft_ms = 250.0 ,
cost = 0.0045 ,
wait_time_ms = 100.0 ,
)
await tracker.increment_queue()
await tracker.decrement_queue()
await tracker.increment_concurrent()
await tracker.decrement_concurrent()
metrics = await tracker.get_metrics()
stats = await tracker.get_statistics()
from teleon.helix import get_throughput_monitor
monitor = get_throughput_monitor()
await monitor.register_tracker( "agent-1" , tracker1)
await monitor.register_tracker( "agent-2" , tracker2)
aggregate = await monitor.get_aggregate_metrics()
from teleon.helix import get_token_tracker, TokenPeriod
tracker = get_token_tracker()
await tracker.record_tokens(
agent_id = "chat-agent" ,
model = "gpt-4" ,
input_tokens = 100 ,
output_tokens = 150 ,
operation = "completion" ,
metadata = { "user_id" : "123" },
)
agent_tokens = await tracker.get_tokens( agent_id = "chat-agent" )
hourly = await tracker.get_tokens( period = TokenPeriod. HOURLY , period_key = "2024-01-15-14" )
from teleon.helix import get_token_budget_manager, TokenPeriod
budget_manager = get_token_budget_manager()
await budget_manager.set_budget( amount = 1_000_000 , period = TokenPeriod. DAILY )
await budget_manager.set_budget(
amount = 100_000 ,
period = TokenPeriod. HOURLY ,
agent_id = "chat-agent" ,
)
status = await budget_manager.check_budget()
async def on_budget_alert (alert):
if alert.level == "critical" :
await notify_admin(alert.message)
budget_manager.register_alert_callback(on_budget_alert)
alerts = await budget_manager.get_alerts( level = "critical" , limit = 10 )
from teleon.helix import TokenAwareScalingPolicy
policy = TokenAwareScalingPolicy(
min_instances = 1 ,
max_instances = 20 ,
target_tokens_per_second = 1000.0 ,
tokens_per_second_buffer = 0.2 ,
max_queue_depth = 10 ,
queue_depth_threshold = 5 ,
target_p95_latency_ms = 2000.0 ,
target_p99_latency_ms = 5000.0 ,
scale_up_cooldown = 60 ,
scale_down_cooldown = 300 ,
)
from teleon.helix import CostAwareScalingPolicy
policy = CostAwareScalingPolicy(
min_instances = 1 ,
max_instances = 10 ,
max_cost_per_hour = 50.0 ,
max_cost_per_day = 500.0 ,
cost_warning_threshold = 0.8 ,
cost_critical_threshold = 0.95 ,
base_scaling_policy = token_policy,
)
from teleon.helix import create_llm_scaler, get_llm_scaler
scaler = create_llm_scaler(
target_tokens_per_second = 1000.0 ,
max_cost_per_hour = 50.0 ,
min_instances = 2 ,
max_instances = 20 ,
)
scaler = get_llm_scaler()
desired = await scaler.evaluate_scaling(
agent_id = "chat-agent" ,
llm_metrics = metrics,
current_instances = 5 ,
)
from teleon.helix import ModelVariant
ModelVariant. SMALL
ModelVariant. MEDIUM
ModelVariant. LARGE
ModelVariant. XLARGE
from teleon.helix import ContextWindowManager
manager = ContextWindowManager()
await manager.register_instance(
instance_id = "inst-1" ,
agent_id = "chat-agent" ,
model = "gpt-4" ,
max_context_tokens = 8192 ,
)
instance = await manager.get_best_instance(
required_context_tokens = 50000 ,
prefer_model = "gpt-4" ,
prefer_cost = True ,
)
from teleon.helix import get_request_router
router = get_request_router()
decision = await router.route(
messages = [{ "role" : "user" , "content" : "Summarize this document..." }],
model = "gpt-4" ,
max_tokens = 500 ,
prefer_cost = True ,
prefer_speed = False ,
)
from teleon.helix import BatchConfig, BatchProcessor
async def process_batch (requests):
results = []
for req in requests:
result = await llm.complete(req.messages, model = req.model)
results.append(result)
return results
processor = BatchProcessor( config = BatchConfig( max_batch_size = 10 ), llm_caller = process_batch)
await processor.start()
result = await processor.process_request(
messages = [{ "role" : "user" , "content" : "Hello" }],
model = "gpt-4" ,
priority = 0 ,
)
await processor.stop()
from teleon.helix import create_cache, get_response_cache, CacheStrategy
cache = create_cache(
max_size = 1000 ,
strategy = CacheStrategy. EXACT ,
eviction_policy = CacheEvictionPolicy. LRU ,
default_ttl = 3600 ,
)
cache = get_response_cache()
messages = [{ "role" : "user" , "content" : "What is 2+2?" }]
cached = await cache.get(messages, model = "gpt-4" )
Variable Default Description TELEON_DEPLOYMENT_ID- Deployment identifier TELEON_API_KEY- API key for authentication TELEON_PLATFORM_URLhttps://api.teleon.aiPlatform API URL TELEON_METRICS_INTERVAL10Reporting interval (seconds) TELEON_METRICS_BATCH_SIZE100Max batch size
from teleon.helix import (
init_agent_reporter,
get_agent_reporter,
shutdown_agent_reporter,
)
reporter = await init_agent_reporter(
deployment_id = "deploy-123" ,
api_key = "tlk_xxx" ,
flush_interval = 10.0 ,
batch_size = 100 ,
)
reporter = get_agent_reporter()
await reporter.start()
await reporter.report_request(
input_tokens = 100 ,
output_tokens = 50 ,
latency_ms = 250.5 ,
model = "gpt-4" ,
success = True ,
cost = 0.002 ,
error_type = None ,
)
await reporter.report_health(
status = "healthy" ,
active_requests = 5 ,
queue_depth = 2 ,
)
await reporter.flush()
stats = reporter.get_local_stats()
await shutdown_agent_reporter()
from fastapi import FastAPI
from teleon.helix import setup_health_endpoints, get_health_manager
app = FastAPI()
health_manager = setup_health_endpoints(
app,
service_name = "my-agent" ,
version = "1.0.0" ,
prefix = "" ,
)
health_manager = get_health_manager()
async def check_database ():
try :
await db.ping()
return True , "Database connected"
except Exception as e:
return False , str (e)
health_manager.add_check( "database" , check_database, critical = True )
health_manager.set_ready( True , reason = "Initialization complete" )
health_manager.record_request( success = True )
health_manager.mount(app, prefix = "/api" )
Endpoint Purpose Response /healthOverall health 200 if healthy, 503 if unhealthy /readyReadiness probe 200 if ready, 503 if not /liveLiveness probe 200 if alive /metricsPrometheus metrics Text format metrics
from teleon import TeleonClient
client = TeleonClient( api_key = "tlk_xxx" )
@client.agent (
name = "support-agent" ,
helix = {
"min_instances" : 2 ,
"max_instances" : 10 ,
"cpu_limit" : 2.0 ,
"memory_limit_mb" : 512 ,
"health_check_enabled" : True ,
"health_check_interval" : 30 ,
"startup_timeout" : 30 ,
"shutdown_timeout" : 30 ,
"auto_start" : False ,
},
)
async def support_agent (query: str , customer_id: str ):
return await process_query(query, customer_id)
@client.agent (
name = "my-agent" ,
helix = {
"min" : 2 ,
"max" : 10 ,
"cpu" : 2.0 ,
"memory" : 512 ,
"health_interval" : 30 ,
},
)
from teleon.helix import (
register_agent_with_helix,
scale_agent,
get_agent_status,
restart_agent,
stop_agent,
)
wrapper = await register_agent_with_helix(
agent_id = "my-agent" ,
agent_func = my_function,
helix_config = { "min_instances" : 2 , "max_instances" : 10 },
)
result = await wrapper(input_data)
await scale_agent( "my-agent" , instances = 5 )
status = await get_agent_status( "my-agent" )
await restart_agent( "my-agent" )
await stop_agent( "my-agent" , force = False )
Set appropriate resource limits (especially memory_limit_mb)
Use health checks for production deployments
Configure scaling (min_instances for baseline, max_instances for budget)
Use token budgets to prevent cost overruns
Enable caching for repeated queries
Batch when possible for throughput
Monitor metrics via the metrics reporter
Set cooldowns to prevent scaling oscillation
Check max_instances > min_instances
Verify scaling policy is registered
Check cooldown periods
Review scaling history
Verify check function returns (bool, str)
Check timeout configuration
Review failure_threshold
Check for dependency/network issues
Enable response caching
Set token budgets
Use cost-aware scaling policy
Review context sizes with routing
Check queue depth metrics
Enable batch processing
Review context routing for model selection
Consider increasing max_instances
See the full Helix documentation in helix_user_docs.md for runtime APIs and production features.