Enterprise Audit Logging and Monitoring: The Missing Piece of the Control Plane
Implementing tamper-proof audit logging and real-time monitoring for ephemeral workloads using Go middleware, ClickHouse, and the Decorator pattern.

Series: Secure Workload Orchestration
- Part 1Building a Polymorphic Workload Orchestrator with Go and Kubernetes
- Part 2Hardening the Control Plane: From Pods to MicroVMs
- Part 3Zero-Trust Networking for AI Agents: Protecting Your Internal VPC
- Part 4Enterprise Audit Logging and Monitoring: The Missing Piece of the Control Plane
- Part 5Control Plane Resilience: Handling Cascading Failures in Distributed Orchestration
In the previous parts of this series, we built a robust engine for running untrusted AI agents. We have Polymorphism, Compute Isolation, and Network Lockdown.
But for an enterprise platform, running the workload safely isn't enough. You need to prove what happened and monitor what's happening right now.
Ephemeral workloads present a unique "Black Box" problem: by the time an incident is reported, the Pod, the logs, and the VM are often already deleted. You also need real-time visibility into workload health, resource usage, and security anomalies.
In this part, we will implement a unified Audit Logging and Monitoring system using ClickHouse. We'll use the Decorator Pattern to add transparent logging and metrics collection to our WorkloadRuntime without touching our core business logic. The same system that stores audit logs will power real-time dashboards and security alerting.
The Middleware Pattern
One of the huge benefits of defining our WorkloadRuntime as a Go interface is that we can wrap it.
We don't need to obscure our FirecrackerRuntime code with log.Printf calls. We can create a dedicated AuditMiddleware.
go
package middleware
import (
"context"
"time"
"github.com/my-org/orchestrator/runtime"
)
// AuditMiddleware wraps any WorkloadRuntime
type AuditMiddleware struct {
Next runtime.WorkloadRuntime
Sink AuditSink // Interface for writing logs (DB, Kafka, File)
}
func (a *AuditMiddleware) Provision(ctx context.Context, id string, spec runtime.Spec) error {
start := time.Now()
// 1. Call the underlying provider (Firecracker/K8s)
err := a.Next.Provision(ctx, id, spec)
// 2. Collect metrics during execution
duration := time.Since(start)
metrics := a.collectMetrics(ctx, id) // We'll implement this
entry := AuditEntry{
EventID: uuid.New().String(),
WorkloadID: id,
Action: "PROVISION",
Status: "SUCCESS",
DurationMs: duration.Milliseconds(),
SpecSnapshot: spec, // Save exactly what was requested
Timestamp: time.Now().UTC(),
CPUUsagePercent: metrics.CPUPercent,
MemoryUsageMB: metrics.MemoryMB,
NetworkBytesIn: metrics.NetworkIn,
NetworkBytesOut: metrics.NetworkOut,
TenantID: spec.Metadata["tenant_id"],
WorkloadType: spec.Type,
ImageName: spec.Image,
CostEstimateUSD: a.calculateCost(duration, metrics),
}
if err != nil {
entry.Status = "FAILURE"
entry.Error = err.Error()
}
// Fire and forget - don't block the user if logging slows down
go a.Sink.Write(entry)
return err
}
// collectMetrics queries Kubernetes API for resource usage
func (a *AuditMiddleware) collectMetrics(ctx context.Context, workloadID string) ResourceMetrics {
// Query K8s metrics API or cAdvisor before teardown
// This is a simplified version - in production, you'd query the Metrics API
return ResourceMetrics{
CPUPercent: 45.2,
MemoryMB: 512,
NetworkIn: 1024 * 1024, // 1 MB
NetworkOut: 2048 * 1024, // 2 MB
}
}Logging Architecture
The Schema: What to Log and Monitor?
Standard application logs ("text") are useless for analysis. We need Structured Events that serve dual purposes: audit trails and real-time monitoring.
go
type AuditEntry struct {
EventID string `json:"event_id"`
WorkloadID string `json:"workload_id"`
Action string `json:"action"` // PROVISION, TEARDOWN, EXECUTE
Status string `json:"status"` // SUCCESS, FAILURE
DurationMs int64 `json:"duration_ms"`
Timestamp time.Time `json:"timestamp"`
// Critical for security forensics:
SpecSnapshot runtime.Spec `json:"spec_snapshot"`
Metadata map[string]string `json:"metadata"`
// Resource metrics for monitoring:
CPUUsagePercent float64 `json:"cpu_usage_percent"` // Average CPU during execution
MemoryUsageMB int64 `json:"memory_usage_mb"` // Peak memory usage
NetworkBytesIn int64 `json:"network_bytes_in"` // Total bytes received
NetworkBytesOut int64 `json:"network_bytes_out"` // Total bytes sent
DiskIOReadMB int64 `json:"disk_io_read_mb"` // Disk reads
DiskIOWriteMB int64 `json:"disk_io_write_mb"` // Disk writes
// Business metrics:
TenantID string `json:"tenant_id"` // For multi-tenant tracking
WorkloadType string `json:"workload_type"` // "trusted" or "untrusted"
ImageName string `json:"image_name"` // Container image
CostEstimateUSD float64 `json:"cost_estimate_usd"` // Estimated cost
}By storing the SpecSnapshot, we have a tamper-proof record of exactly what environment variables and image query parameters were requested. By including resource metrics, we can monitor workload health and detect anomalies in real-time.
Asynchronous Sinks
Writing logs to the local disk is brittle (what if the node dies?). Writing synchronously to a database adds latency.
The best pattern is an Async Channel to a robust backend.
go
package middleware
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
)
type KafkaSink struct {
writer *kafka.Writer
}
func NewKafkaSink(brokers []string, topic string) *KafkaSink {
return &KafkaSink{
writer: &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
// Batching improves throughput significantly
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
},
}
}
func (k *KafkaSink) Write(e AuditEntry) {
payload, _ := json.Marshal(e)
// Write asynchronously (fire and forget for this use case)
// In a real system, you might want a separate error channel.
go func() {
err := k.writer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(e.WorkloadID), // Partition by WorkloadID
Value: payload,
},
)
if err != nil {
// Handle error (e.g., increment metrics)
}
}()
}Destination: ClickHouse for Unified Audit + Monitoring
We'll use ClickHouse because it excels at both high-volume log ingestion and real-time analytical queries. Its Kafka Engine allows direct ingestion without a separate consumer service, and its columnar storage makes it perfect for time-series metrics.
sql
-- 1. Create the destination table (where data lives)
CREATE TABLE workload_audit
(
event_id String,
workload_id String,
action LowCardinality(String),
status LowCardinality(String),
duration_ms Int64,
timestamp DateTime,
spec_snapshot String, -- JSON
metadata Map(String, String),
-- Resource metrics
cpu_usage_percent Float64,
memory_usage_mb Int64,
network_bytes_in Int64,
network_bytes_out Int64,
disk_io_read_mb Int64,
disk_io_write_mb Int64,
-- Business metrics
tenant_id String,
workload_type LowCardinality(String),
image_name String,
cost_estimate_usd Float64
) ENGINE = MergeTree()
ORDER BY (timestamp, workload_id)
PARTITION BY toYYYYMM(timestamp);
-- 2. Create the Kafka stream table
CREATE TABLE workload_audit_queue
(
event_id String,
workload_id String,
action String,
status String,
duration_ms Int64,
timestamp DateTime,
spec_snapshot String,
metadata Map(String, String),
cpu_usage_percent Float64,
memory_usage_mb Int64,
network_bytes_in Int64,
network_bytes_out Int64,
disk_io_read_mb Int64,
disk_io_write_mb Int64,
tenant_id String,
workload_type String,
image_name String,
cost_estimate_usd Float64
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'audit-logs',
kafka_group_name = 'clickhouse-consumer',
kafka_format = 'JSONEachRow';
-- 3. Create the materialized view to move data
CREATE MATERIALIZED VIEW workload_audit_mv TO workload_audit AS
SELECT * FROM workload_audit_queue;Now, your data is instantly queryable via SQL for both audit and monitoring:
"Show me all agents that used the 'finance-scraper' image in the last 30 days and failed within 5 seconds."
"What's the average CPU usage across all workloads in the last hour?"
Real-Time Monitoring with Materialized Views
ClickHouse materialized views let us pre-aggregate metrics for real-time dashboards:
sql
-- Aggregated metrics per minute
CREATE MATERIALIZED VIEW workload_metrics_minute
ENGINE = SummingMergeTree()
ORDER BY (timestamp, tenant_id, workload_type)
AS SELECT
toStartOfMinute(timestamp) as timestamp,
tenant_id,
workload_type,
count() as workload_count,
sum(duration_ms) as total_duration_ms,
avg(duration_ms) as avg_duration_ms,
sum(cpu_usage_percent) as total_cpu_percent,
avg(cpu_usage_percent) as avg_cpu_percent,
sum(memory_usage_mb) as total_memory_mb,
sum(cost_estimate_usd) as total_cost_usd,
countIf(status = 'FAILURE') as failure_count
FROM workload_audit
GROUP BY timestamp, tenant_id, workload_type;
-- Per-tenant resource usage (for quota tracking)
CREATE MATERIALIZED VIEW tenant_resource_usage
ENGINE = ReplacingMergeTree()
ORDER BY (tenant_id, toStartOfHour(timestamp))
AS SELECT
tenant_id,
toStartOfHour(timestamp) as hour,
sum(cpu_usage_percent * duration_ms / 1000) as cpu_seconds,
sum(memory_usage_mb * duration_ms / 1000) as memory_mb_seconds,
sum(cost_estimate_usd) as total_cost_usd,
count() as workload_count
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY tenant_id, hour;Security Alerting Queries
Use ClickHouse queries to detect anomalies in real-time:
sql
-- Detect potential crypto miners (workloads running 5x longer than average)
SELECT
workload_id,
image_name,
duration_ms,
cpu_usage_percent,
timestamp
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 1 HOUR
AND duration_ms > (
SELECT avg(duration_ms) * 5
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 24 HOUR
AND workload_type = workload_audit.workload_type
)
ORDER BY duration_ms DESC
LIMIT 10;
-- Detect suspicious images (not in trusted list)
SELECT
workload_id,
image_name,
tenant_id,
timestamp
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 1 HOUR
AND image_name NOT IN (
'trusted-image-1',
'trusted-image-2',
'trusted-image-3'
);
-- Detect resource exhaustion attacks
SELECT
tenant_id,
sum(cpu_usage_percent * duration_ms / 1000) as total_cpu_seconds,
sum(memory_usage_mb * duration_ms / 1000) as total_memory_mb_seconds,
count() as workload_count
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY tenant_id
HAVING total_cpu_seconds > 3600 -- More than 1 CPU-hour in the last hour
ORDER BY total_cpu_seconds DESC;Grafana Integration
Connect Grafana to ClickHouse for real-time dashboards:
yaml
# grafana-datasource.yaml
apiVersion: 1
datasources:
- name: ClickHouse
type: grafana-clickhouse-datasource
access: proxy
url: http://clickhouse:8123
jsonData:
defaultDatabase: default
port: 8123
server: clickhouse
username: defaultExample dashboard queries:
Control Plane Health:
sql
-- Queue depth over time
SELECT
toStartOfMinute(timestamp) as time,
count() as workloads_provisioned
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY time
ORDER BY time;Workload Performance:
sql
-- P95 latency by workload type
SELECT
workload_type,
quantile(0.95)(duration_ms) as p95_latency_ms,
avg(duration_ms) as avg_latency_ms
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY workload_type;Security Overview:
sql
-- Anomalies detected in last hour
SELECT
countIf(duration_ms > avg_duration_ms * 5) as crypto_miner_suspicious,
countIf(image_name NOT IN ('trusted-list')) as untrusted_images,
countIf(cpu_usage_percent > 90) as high_cpu_workloads
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 1 HOUR;Usage in the Worker
Wiring it up in our main loop matches the elegance of the rest of our architecture.
go
func (w *Worker) Start(ctx context.Context) {
// 1. Create the base runtime
baseRuntime := &kubernetes.FirecrackerRuntime{...}
// 2. Wrap it with logging
loggingRuntime := &middleware.AuditMiddleware{
Next: baseRuntime,
Sink: kafkaSink,
}
for {
// ... get task ...
// Use the logged version. The worker doesn't need to know!
go w.handleTask(ctx, loggingRuntime, task)
}
}Automated Alerting
Set up automated alerts using ClickHouse queries or integrate with AlertManager:
go
// Example: Run security queries periodically
func (a *AuditMiddleware) checkAnomalies(ctx context.Context) {
// Query ClickHouse for suspicious workloads
query := `
SELECT workload_id, image_name, duration_ms
FROM workload_audit
WHERE timestamp >= now() - INTERVAL 1 HOUR
AND duration_ms > (SELECT avg(duration_ms) * 5 FROM workload_audit)
`
results := a.clickhouseClient.Query(query)
if len(results) > 0 {
// Send alert to Slack/PagerDuty
a.alertService.Send("Suspicious workloads detected", results)
}
}Conclusion
We have built a platform that is:
- Polymorphic: Runs Pods and VMs.
- Secure: Hardened kernel and network isolation.
- Auditable: Every action is recorded and queryable.
- Observable: Real-time monitoring and alerting from the same data store.
By using ClickHouse for both audit logs and metrics, we've created a unified observability system. The same events that provide compliance audit trails also power real-time dashboards and security alerting, no redundant infrastructure, no data silos.
This is the difference between a "script" and a "platform."