Build an Infrastructure Monitoring Agent
Learn how to build a fully autonomous AI agent that monitors your servers, detects issues, and sends intelligent alerts — powered by the OXLO Chat API.
Chat CompletionsWhat You'll Build
By the end of this tutorial, you'll have a Python agent that:
- Probes your server endpoints and checks health status
- Analyzes failures using OXLO's LLM to classify root causes
- Generates human-readable status reports
- Sends alerts with AI-powered explanations to Slack/Discord/email
- Runs autonomously on a schedule (every 2–5 minutes)
Architecture
Health Prober
Checks endpoints
OXLO Chat API
LLM Analysis
Alert Dispatcher
Sends notifications
Your Servers
HTTP endpoints
Root Cause
AI classification
Slack / Discord
Webhook alerts
Prerequisites
- Python 3.10+ installed
- An OXLO API key (Free tier works — get one here)
- Basic Python knowledge (async/await, HTTP requests)
Step 1 — Project Setup
Create a new project directory and install dependencies:
mkdir oxlo-monitor-agent && cd oxlo-monitor-agent
pip install httpx openai python-dotenv apschedulerCreate a .env file with your OXLO API key:
# .env
OXLO_API_KEY=your_api_key_here
OXLO_BASE_URL=https://api.oxlo.ai/v1.env file.Create the project structure:
oxlo-monitor-agent/
├── .env
├── monitor.py # Main agent file
├── probes.py # Health check functions
├── analyzer.py # AI-powered analysis
└── alerts.py # Alert dispatcherStep 2 — Build the Health Prober
The prober checks your endpoints and returns structured health data. Start with probes.py:
# probes.py
"""Health probes — checks your endpoints and returns status data."""
import httpx
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class ProbeResult:
"""Result of a single health probe."""
endpoint: str
status: str # "healthy", "degraded", "down"
response_time_ms: float
status_code: Optional[int] = None
error: Optional[str] = None
async def probe_endpoint(
url: str,
timeout: float = 10.0,
expected_status: int = 200,
) -> ProbeResult:
"""
Probe a single HTTP endpoint and return structured results.
This is the core building block — your agent calls this for
each endpoint it monitors.
"""
start = time.perf_counter()
try:
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url)
elapsed_ms = (time.perf_counter() - start) * 1000
if resp.status_code == expected_status:
return ProbeResult(
endpoint=url,
status="healthy",
response_time_ms=round(elapsed_ms, 1),
status_code=resp.status_code,
)
else:
return ProbeResult(
endpoint=url,
status="degraded",
response_time_ms=round(elapsed_ms, 1),
status_code=resp.status_code,
error=f"Expected {expected_status}, got {resp.status_code}",
)
except httpx.TimeoutException:
elapsed_ms = (time.perf_counter() - start) * 1000
return ProbeResult(
endpoint=url,
status="down",
response_time_ms=round(elapsed_ms, 1),
error="Connection timed out",
)
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
return ProbeResult(
endpoint=url,
status="down",
response_time_ms=round(elapsed_ms, 1),
error=str(e),
)
async def probe_all(endpoints: list[str]) -> list[ProbeResult]:
"""Probe all endpoints concurrently and return results."""
import asyncio
tasks = [probe_endpoint(url) for url in endpoints]
return await asyncio.gather(*tasks)Try it out:
# test_probes.py — Run this to verify your prober works
import asyncio
from probes import probe_all
async def main():
results = await probe_all([
"https://httpbin.org/status/200", # Should be healthy
"https://httpbin.org/status/500", # Should be degraded
"https://httpbin.org/delay/15", # Should time out
])
for r in results:
emoji = {"healthy": "🟢", "degraded": "🟡", "down": "🔴"}[r.status]
print(f"{emoji} {r.endpoint}")
print(f" Status: {r.status} | {r.response_time_ms}ms")
if r.error:
print(f" Error: {r.error}")
print()
asyncio.run(main())Expected output:
🟢 https://httpbin.org/status/200
Status: healthy | 342.1ms
🟡 https://httpbin.org/status/500
Status: degraded | 285.7ms
Error: Expected 200, got 500
🔴 https://httpbin.org/delay/15
Status: down | 10001.3ms
Error: Connection timed outStep 3 — Build the AI Analyzer
This is where OXLO's Chat API comes in. Instead of writing complex rule-based logic, we let the LLM analyze probe results and generate human-readable explanations. Create analyzer.py:
# analyzer.py
"""AI-powered analysis using the OXLO Chat API."""
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
from probes import ProbeResult
load_dotenv()
# Initialize OXLO client — OpenAI-compatible!
client = OpenAI(
api_key=os.getenv("OXLO_API_KEY"),
base_url=os.getenv("OXLO_BASE_URL", "https://api.oxlo.ai/v1"),
)
# We use deepseek-v3.2 (free tier) — you can swap to any OXLO model
MODEL = "deepseek-v3.2"
def analyze_health_results(results: list[ProbeResult]) -> str:
"""
Send probe results to OXLO's LLM and get an intelligent analysis.
The LLM will:
- Identify which services are failing and why
- Suggest probable root causes
- Recommend actions to take
"""
# Format probe results as structured data for the LLM
results_text = ""
for r in results:
results_text += f"- {r.endpoint}: {r.status.upper()}"
results_text += f" ({r.response_time_ms}ms)"
if r.error:
results_text += f" — Error: {r.error}"
results_text += "\n"
response = client.chat.completions.create(
model=MODEL,
messages=[
{
"role": "system",
"content": """You are an infrastructure monitoring assistant.
Analyze the health check results below and provide:
1. A brief status summary (one line)
2. For any failing services: probable root cause
3. Recommended actions
Be concise and technical. Use bullet points."""
},
{
"role": "user",
"content": f"Health check results:\n{results_text}"
}
],
max_tokens=500,
temperature=0.3, # Low temperature for consistent, factual analysis
)
return response.choices[0].message.content
def classify_alert_severity(error_message: str) -> dict:
"""
Use OXLO's LLM to classify an error into severity levels.
Returns structured JSON with severity, category, and explanation.
This shows how to get structured outputs from the LLM.
"""
response = client.chat.completions.create(
model=MODEL,
messages=[
{
"role": "system",
"content": """Classify this infrastructure error. Respond in JSON only:
{
"severity": "critical" | "warning" | "info",
"category": "network" | "application" | "database" | "timeout" | "auth",
"explanation": "one-line explanation",
"action": "recommended action"
}"""
},
{
"role": "user",
"content": f"Error: {error_message}"
}
],
max_tokens=200,
temperature=0.1,
)
try:
return json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
return {
"severity": "warning",
"category": "unknown",
"explanation": error_message,
"action": "Investigate manually",
}Try it out:
# test_analyzer.py
import asyncio
from probes import probe_all
from analyzer import analyze_health_results, classify_alert_severity
async def main():
# Run probes
results = await probe_all([
"https://httpbin.org/status/200",
"https://httpbin.org/status/503",
])
# Get AI analysis
print("═══ AI Health Analysis ═══")
analysis = analyze_health_results(results)
print(analysis)
# Classify an error
print("\n═══ Alert Classification ═══")
classification = classify_alert_severity(
"HTTP 503: Service temporarily unavailable on api-gateway"
)
for key, val in classification.items():
print(f" {key}: {val}")
asyncio.run(main())Expected output:
═══ AI Health Analysis ═══
**Status: 1 of 2 services degraded**
- ✅ httpbin.org/status/200 — Healthy (342ms response)
- ⚠️ httpbin.org/status/503 — Service returning 503
**Probable cause:** The 503 indicates the upstream server is temporarily
overloaded or under maintenance.
**Recommended actions:**
- Check if the service is undergoing planned maintenance
- Monitor for auto-recovery within the next 5 minutes
- If persistent, check server logs and scaling configuration
═══ Alert Classification ═══
severity: warning
category: application
explanation: Service is temporarily unavailable, likely due to overload
action: Monitor for recovery; scale up if the issue persistsStep 4 — Build the Alert Dispatcher
The dispatcher sends formatted alerts when issues are detected. Create alerts.py:
# alerts.py
"""Alert dispatcher — sends notifications when issues are detected."""
import httpx
from datetime import datetime
from probes import ProbeResult
def format_alert_message(
result: ProbeResult,
ai_analysis: str,
severity: str = "warning",
) -> str:
"""Format a human-readable alert with AI context."""
emoji = {
"critical": "🔴 CRITICAL",
"warning": "🟡 WARNING",
"info": "ℹ️ INFO",
}.get(severity, "⚠️ ALERT")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"""
{emoji} — Infrastructure Alert
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Endpoint: {result.endpoint}
Status: {result.status.upper()}
Latency: {result.response_time_ms}ms
Error: {result.error or 'N/A'}
Time: {timestamp}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
AI Analysis:
{ai_analysis}
"""
return message.strip()
async def send_to_slack(webhook_url: str, message: str):
"""Send alert to a Slack webhook."""
async with httpx.AsyncClient() as client:
await client.post(webhook_url, json={"text": message})
async def send_to_discord(webhook_url: str, message: str):
"""Send alert to a Discord webhook."""
async with httpx.AsyncClient() as client:
await client.post(webhook_url, json={"content": message})
def print_alert(message: str):
"""Print alert to console (for development/testing)."""
print(message)
print()Step 5 — Wire It All Together
Now combine everything into the main agent in monitor.py. This is the complete, runnable agent:
# monitor.py
"""
OXLO Infrastructure Monitoring Agent
=====================================
A fully autonomous agent that monitors your endpoints,
analyzes failures with AI, and sends intelligent alerts.
Usage:
python monitor.py # Run once
python monitor.py --schedule # Run on a schedule (every 2 min)
"""
import asyncio
import sys
from datetime import datetime
from probes import probe_all
from analyzer import analyze_health_results, classify_alert_severity
from alerts import format_alert_message, print_alert
# ──────────────────────────────────────────────────────
# CONFIGURATION — Customize these for your infrastructure
# ──────────────────────────────────────────────────────
ENDPOINTS = [
"https://your-api.example.com/health",
"https://your-app.example.com",
"https://your-db-proxy.example.com/ping",
# Add as many endpoints as you need
]
# How often to check (in seconds) when running on schedule
CHECK_INTERVAL = 120 # 2 minutes
# Optional: webhook URLs for alerts
SLACK_WEBHOOK = None # "https://hooks.slack.com/services/..."
DISCORD_WEBHOOK = None # "https://discord.com/api/webhooks/..."
# ──────────────────────────────────────────────────────
# AGENT LOGIC
# ──────────────────────────────────────────────────────
async def run_check():
"""Run a single monitoring check cycle."""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"\n{'═' * 50}")
print(f" Monitoring Check — {timestamp}")
print(f"{'═' * 50}")
# Step 1: Probe all endpoints
print(f"\n🔍 Probing {len(ENDPOINTS)} endpoints...")
results = await probe_all(ENDPOINTS)
# Step 2: Separate healthy from failing
healthy = [r for r in results if r.status == "healthy"]
failing = [r for r in results if r.status != "healthy"]
# Print quick summary
for r in results:
emoji = {"healthy": "🟢", "degraded": "🟡", "down": "🔴"}[r.status]
print(f" {emoji} {r.endpoint} — {r.response_time_ms}ms")
if not failing:
print(f"\n✅ All {len(healthy)} endpoints healthy!")
return
# Step 3: Analyze failures with OXLO AI
print(f"\n🤖 Analyzing {len(failing)} issues with OXLO AI...")
analysis = analyze_health_results(results)
print(f"\n{analysis}")
# Step 4: Classify each failure and send alerts
for result in failing:
classification = classify_alert_severity(result.error or "Unknown error")
alert = format_alert_message(
result=result,
ai_analysis=analysis,
severity=classification.get("severity", "warning"),
)
# Send alerts (console + optional webhooks)
print_alert(alert)
# Uncomment to enable webhook alerts:
# if SLACK_WEBHOOK:
# await send_to_slack(SLACK_WEBHOOK, alert)
# if DISCORD_WEBHOOK:
# await send_to_discord(DISCORD_WEBHOOK, alert)
print(f"\n📊 Summary: {len(healthy)} healthy, {len(failing)} failing")
async def run_scheduled():
"""Run the agent on a recurring schedule."""
print(f"🚀 Starting monitoring agent (checking every {CHECK_INTERVAL}s)")
print(f" Monitoring {len(ENDPOINTS)} endpoints")
print(f" Press Ctrl+C to stop\n")
while True:
try:
await run_check()
await asyncio.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
print("\n👋 Agent stopped.")
break
if __name__ == "__main__":
if "--schedule" in sys.argv:
asyncio.run(run_scheduled())
else:
asyncio.run(run_check())Step 6 — Run and Test
Run a single check to make sure everything works:
python monitor.pyExpected output:
══════════════════════════════════════════════════
Monitoring Check — 14:32:15
══════════════════════════════════════════════════
🔍 Probing 3 endpoints...
🟢 https://your-api.example.com/health — 124.3ms
🟢 https://your-app.example.com — 89.1ms
🔴 https://your-db-proxy.example.com/ping — 10001.2ms
🤖 Analyzing 1 issues with OXLO AI...
**Status: 2 of 3 services healthy, 1 down**
- ⚠️ your-db-proxy.example.com — Connection timed out after 10s
- Probable cause: Database proxy is unreachable, possibly due to
network partition or the proxy service has crashed
- Action: Check if the proxy container is running; restart if needed
🔴 CRITICAL — Infrastructure Alert
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Endpoint: https://your-db-proxy.example.com/ping
Status: DOWN
Latency: 10001.2ms
Error: Connection timed out
Time: 2025-01-15 14:32:26
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
AI Analysis:
Database proxy is unreachable...
📊 Summary: 2 healthy, 1 failingTo run on a continuous schedule:
python monitor.py --scheduleExtending Your Agent
The agent above is a solid foundation. Here are ideas for extending it based on your application's needs:
Add Model-Specific Probes
If you're running AI services, you can probe them by sending test requests through the OXLO API itself:
# In probes.py — add a model probe function
async def probe_model(model_id: str) -> ProbeResult:
"""
Probe an OXLO model by sending a minimal chat request.
Verifies the model is responding correctly.
"""
from openai import AsyncOpenAI
import os
client = AsyncOpenAI(
api_key=os.getenv("OXLO_API_KEY"),
base_url=os.getenv("OXLO_BASE_URL"),
)
start = time.perf_counter()
try:
resp = await client.chat.completions.create(
model=model_id,
messages=[{"role": "user", "content": "Say 'ok'"}],
max_tokens=5,
)
elapsed_ms = (time.perf_counter() - start) * 1000
if resp.choices and resp.choices[0].message.content:
return ProbeResult(
endpoint=f"model:{model_id}",
status="healthy",
response_time_ms=round(elapsed_ms, 1),
)
return ProbeResult(
endpoint=f"model:{model_id}",
status="degraded",
response_time_ms=round(elapsed_ms, 1),
error="Model returned empty response",
)
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
return ProbeResult(
endpoint=f"model:{model_id}",
status="down",
response_time_ms=round(elapsed_ms, 1),
error=str(e),
)Add Conversation Memory
Make the agent remember previous issues so it can detect recurring patterns:
# In analyzer.py — add history-aware analysis
class MonitorHistory:
"""Track alert history for pattern detection."""
def __init__(self, max_history: int = 50):
self.events: list[dict] = []
self.max_history = max_history
def add_event(self, endpoint: str, status: str, error: str = None):
self.events.append({
"time": datetime.now().isoformat(),
"endpoint": endpoint,
"status": status,
"error": error,
})
# Keep only recent events
if len(self.events) > self.max_history:
self.events = self.events[-self.max_history:]
def get_context(self) -> str:
"""Format recent history as context for the LLM."""
if not self.events:
return "No previous events."
recent = self.events[-10:] # Last 10 events
lines = [f"- {e['time']}: {e['endpoint']} → {e['status']}" for e in recent]
return "\n".join(lines)
# Then in your analysis call, add history as context:
# messages=[
# {"role": "system", "content": system_prompt},
# {"role": "user", "content": f"Recent history:\n{history.get_context()}\n\nCurrent results:\n{results_text}"}
# ]Add Webhook-Based Alerting
Connect to Slack, Discord, Telegram, or any webhook service:
# In monitor.py — enable webhook alerts
# Set your webhook URLs
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
# Uncomment these lines in the run_check() function:
if SLACK_WEBHOOK:
await send_to_slack(SLACK_WEBHOOK, alert)Key Concepts
🔗 OpenAI-Compatible API
OXLO uses the OpenAI SDK format — just change the base_url to https://api.oxlo.ai/v1. Any code written for OpenAI works with OXLO by changing two lines.
🌡️ Temperature for Monitoring
Use low temperature (0.1–0.3) for analysis tasks. This makes the LLM produce consistent, factual responses rather than creative ones — exactly what you want for infrastructure monitoring.
💰 Cost Optimization
Only call the LLM when issues are detected. Healthy checks don't need AI analysis. Use max_tokens: 500 to keep responses concise and costs low.
Full Code Reference
The complete source code for this tutorial is available on GitHub:
| File | Purpose | Lines |
|---|---|---|
| probes.py | HTTP health check functions | ~70 |
| analyzer.py | OXLO Chat API for analysis | ~100 |
| alerts.py | Slack/Discord alert dispatch | ~50 |
| monitor.py | Main agent (wires everything) | ~95 |