Writing Custom Adapters¶
Extend OpenAICE with your own telemetry and runtime adapters.
Step 1: Choose the Adapter Type¶
| Type | Base Class | Purpose |
|---|---|---|
| Telemetry | TelemetryAdapter |
Ingest metrics/logs from monitoring systems |
| Runtime State | RuntimeStateAdapter |
Read orchestrator/scheduler state |
Step 2: Implement the Interface¶
Telemetry Adapter Example¶
from openaice.adapters.base import TelemetryAdapter
class MyCustomAdapter(TelemetryAdapter):
"""Adapter for my custom monitoring system."""
def initialize(self, config: dict) -> None:
self.endpoint = config.get("endpoint", "http://localhost:8080")
self._client = None # Initialize your client
def collect(self) -> list[dict]:
"""Collect raw records from the monitoring system."""
# Fetch data from your source
raw_data = self._fetch_metrics()
# Return as list of dicts with required fields
return [
{
"source_type": "my_system",
"entity_type": "service",
"entity_id": item["service_name"],
"observed_at": item["timestamp"],
"latency_p95_ms": item["p95"],
"queue_depth": item["queue_size"],
"throughput": item["rps"],
"confidence_score": 0.85,
}
for item in raw_data
]
def health_check(self) -> bool:
"""Check if the monitoring system is reachable."""
try:
resp = httpx.get(f"{self.endpoint}/health")
return resp.status_code == 200
except Exception:
return False
Step 3: Required Fields¶
Every raw record must include:
| Field | Type | Description |
|---|---|---|
source_type |
str |
Identifier for your adapter |
entity_type |
str |
One of the 12 canonical types |
entity_id |
str |
Unique entity identifier |
observed_at |
str/datetime |
Observation timestamp |
All other fields are optional and map to the canonical entity schema.
Step 4: Register the Adapter¶
Add your adapter to the pipeline in the CLI or API:
from my_adapters import MyCustomAdapter
adapter = MyCustomAdapter()
adapter.initialize({"endpoint": "http://my-system:8080"})
raw_records = adapter.collect()
# Feed into the standard pipeline
fragments = normalizer.normalize(raw_records)
state_bus.put_fragments(fragments)
Tips¶
Start with Replay
Before building a live adapter, create a replay scenario with sample data from your system. This lets you validate your entity mapping and policy rules without a live connection.
Confidence Scores
Always set a meaningful confidence_score. The policy engine uses this to gate recommendations — a low confidence score will block high-risk actions.