Analytics Policy
The analyticsLog policy emits one structured JSON entry per request, designed for aggregation in DuckDB, ClickHouse, or any columnar analytics engine. It runs at priority 0 (alongside requestLog) and wraps the entire policy pipeline to measure end-to-end latency.
import { analyticsLog } from "@vivero/stoma-analytics/policy";Quick start
Add the policy to your gateway — zero configuration required:
import { createGateway } from "@vivero/stoma";import { analyticsLog } from "@vivero/stoma-analytics/policy";
export default createGateway({ name: "my-api", policies: [analyticsLog()], routes: [ // ... ],});Every request now emits a JSON line to console.log:
{ "_type": "stoma_analytics", "timestamp": "2026-02-15T14:23:01.042Z", "gatewayName": "my-api", "routePath": "/users/*", "method": "GET", "statusCode": 200, "durationMs": 12, "responseSize": 4096, "traceId": "a1b2c3d4e5f6a1b2c3d4e5f6a1b2c3d4"}These lines are captured by your log aggregation service — Cloudflare Logpush (via Workers Trace Events), Fluent Bit, Vector, stdout piping, or any other log transport — and delivered to your storage destination (S3-compatible object storage, local filesystem, etc.).
Installation
npm install @vivero/stoma-analytics# oryarn add @vivero/stoma-analyticsPeer dependencies: @vivero/stoma (>=0.1.0) and hono (>=4.0.0).
Configuration
interface AnalyticsLogConfig { /** Static dimensions added to every entry. */ dimensions?: Record<string, string | number | boolean>;
/** Dynamic dimension extractor — called per-request after the response. */ extractDimensions?: (c: { req: { method: string; url: string; header: (name: string) => string | undefined }; res: { status: number; headers: Headers }; get: (key: string) => unknown; }) => Record<string, string | number | boolean>;
/** Custom sink function. Default: console.log(JSON.stringify(entry)). */ sink?: (entry: AnalyticsEntry) => void;
/** Standard policy skip condition. */ skip?: (c: Context) => boolean | Promise<boolean>;}All fields are optional. With no configuration, analyticsLog() emits JSON to console.log with the core fields listed below.
Entry fields
Every analytics entry contains these fields:
| Field | Type | Description |
|---|---|---|
_type | "stoma_analytics" | Discriminator for filtering in mixed log streams. |
timestamp | string | ISO 8601 timestamp when the entry was emitted. |
gatewayName | string | Gateway name from config. Low cardinality. |
routePath | string | Matched route pattern, e.g. "/users/*". Low cardinality. |
method | string | HTTP method (GET, POST, etc.). |
statusCode | number | HTTP response status code. |
durationMs | number | End-to-end latency in milliseconds. |
responseSize | number | Response body size from Content-Length, or 0. |
traceId | string? | W3C trace ID for correlating with request logs. |
dimensions | object? | User-defined key/value metadata (see below). |
Dimensions
Dimensions are extensible low-cardinality key/value pairs attached to every entry. Use them to slice your analytics by environment, region, subscription plan, API version, or any other facet relevant to your business.
Static dimensions
Set once at gateway construction time:
analyticsLog({ dimensions: { env: "production", region: "eu-west-1", apiVersion: "v2", },})Dynamic dimensions
Computed per-request from headers, response status, or values set by upstream policies:
analyticsLog({ extractDimensions: (c) => ({ country: c.req.header("x-geo-country") ?? "unknown", cacheTier: c.res.status === 304 ? "hit" : "miss", }),})Reading from the Stoma context
The c.get(key) method reads values set on the Hono context by earlier policies — JWT claims, RBAC roles, custom attributes from assignAttributes, etc. This is how you derive dimensions from upstream policy state:
analyticsLog({ extractDimensions: (c) => ({ // Read a claim forwarded by jwtAuth plan: String(c.get("plan") ?? "free"), // Read an attribute set by assignAttributes tenant: String(c.get("tenantId") ?? "unknown"), }),})Merging static and dynamic
When both dimensions and extractDimensions are provided, they are shallow-merged. Dynamic dimensions override static ones with the same key:
analyticsLog({ dimensions: { env: "staging", version: "v2" }, extractDimensions: (c) => ({ version: c.req.header("x-api-version") ?? "v2", }),})// If the header is "v3", dimensions = { env: "staging", version: "v3" }Custom sinks
By default, entries are serialized to JSON and written to console.log. Override the sink to route entries elsewhere:
// Useful for testingconst entries: AnalyticsEntry[] = [];
analyticsLog({ sink: (entry) => entries.push(entry),})analyticsLog({ sink: (entry) => { // Fire-and-forget to your ingest endpoint fetch("https://ingest.example.com/analytics", { method: "POST", body: JSON.stringify(entry), }); },})analyticsLog({ sink: (entry) => { // Batch entries in a Durable Object for periodic flush const id = env.ANALYTICS_BUFFER.idFromName("default"); const stub = env.ANALYTICS_BUFFER.get(id); stub.fetch("https://internal/buffer", { method: "POST", body: JSON.stringify(entry), }); },})Data boundary: analytics vs request logs
The analytics policy and the gateway’s requestLog policy serve different purposes and deliberately carry different fields. They can (and should) run side by side.
| Field | Analytics | Request Log | Why |
|---|---|---|---|
timestamp | ✓ | ✓ | Time-series bucketing / grep by time |
gatewayName | ✓ | ✓ | GROUP BY gateway in multi-gateway setups |
routePath | ✓ | ✓ | GROUP BY route pattern (low cardinality) |
method | ✓ | ✓ | GROUP BY HTTP method |
statusCode | ✓ | ✓ | GROUP BY status, error rate dashboards |
durationMs | ✓ | ✓ | AVG/P99 latency, SLA monitoring |
responseSize | ✓ | SUM bandwidth, detect payload bloat | |
traceId | ✓ | ✓ | Drill down from dashboard anomaly to logs |
dimensions | ✓ | Extensible low-cardinality facets | |
requestId | ✓ | Unique per request — grep, not GROUP BY | |
path | ✓ | Actual URL, e.g. /users/42 (high cardinality) | |
clientIp | ✓ | PII, high cardinality — debug/abuse only | |
userAgent | ✓ | High cardinality — debug specific clients | |
spanId | ✓ | Distributed tracing span correlation | |
requestBody | ✓ | Deep debugging (opt-in, redactable) | |
responseBody | ✓ | Deep debugging (opt-in, redactable) |
The traceId is the bridge between the two systems. When an analytics dashboard shows a latency spike on /users/*, query the request logs for that traceId to find the specific request, its full URL, client IP, and body.
Using with requestLog
Both policies run at priority 0 and can coexist in the same pipeline:
import { createGateway, requestLog } from "@vivero/stoma";import { analyticsLog } from "@vivero/stoma-analytics/policy";
export default createGateway({ name: "my-api", policies: [ requestLog(), // Operational debugging — high-cardinality fields analyticsLog(), // Aggregation pipeline — low-cardinality metrics ], routes: [ // ... ],});Both policies write to console.log. Your log aggregation service captures all log lines and delivers them to storage. The downstream processor separates them by _type — only lines with _type: "stoma_analytics" are extracted into Parquet.
Downstream pipeline
The analytics entry emitted by this policy is the starting point of a broader pipeline:
analyticsLog (this policy) → console.log → log aggregation → raw NDJSON in object storage → createProcessor() → Parquet fragment files → createCompactor() → compacted partition files → DuckDB queries Parquet directlySee the architecture overview for details on the processor, compactor, storage adapters, and DuckDB integration.
Example queries
Once your analytics are in Parquet, query them with DuckDB:
-- Error rate by route (last 24 hours)SELECT routePath, COUNT(*) AS total, COUNT(*) FILTER (statusCode >= 500) AS errors, ROUND(100.0 * COUNT(*) FILTER (statusCode >= 500) / COUNT(*), 2) AS error_pctFROM read_parquet('analytics/2026/02/15/*/*.parquet')GROUP BY routePathORDER BY error_pct DESC;
-- P99 latency by gatewaySELECT gatewayName, APPROX_QUANTILE(durationMs, 0.99) AS p99_msFROM read_parquet('analytics/**/*.parquet')GROUP BY gatewayName;
-- Bandwidth by route and daySELECT routePath, DATE_TRUNC('day', timestamp::TIMESTAMP) AS day, SUM(responseSize) / (1024 * 1024) AS mb_transferredFROM read_parquet('analytics/**/*.parquet')GROUP BY routePath, dayORDER BY day DESC;
-- Breakdown by custom dimensionSELECT json_extract_string(dimensions, '$.plan') AS plan, COUNT(*) AS requests, AVG(durationMs) AS avg_latency_msFROM read_parquet('analytics/**/*.parquet')WHERE dimensions IS NOT NULLGROUP BY plan;AnalyticsEntry type
For TypeScript consumers who need to work with the entry type directly:
import { ANALYTICS_TYPE, type AnalyticsEntry } from "@vivero/stoma-analytics";
// ANALYTICS_TYPE = "stoma_analytics"The AnalyticsEntry interface is the contract between the policy (producer), the processor (consumer), and the Parquet schema. All three are kept in sync via the shared type.