Filter-and-distribute: per-destination views from one tool result
Every Beach application with multi-destination delivery used to rebuild the same primitive: take a routed tool result, apply per-destination filtering, dispatch to N destinations with per-destination views. The third time was the limit. From @cool-ai/beach-core@0.6.0, the primitive ships in core.
This guide describes what the primitive does, how to wire it, and the architectural law it enforces. The surface is generic; the trust model is what makes it Beach-shaped.
The shape
A generalist tool's handler returns an AnnotatedRecord<T> — every field carries its own trust metadata declared at the data source. The framework dispatches that record through FilterAndDistribute. Six canonical destinations receive their own filtered view in parallel:
+-- llm-session (filtered for the LLM's tool_result)
|
+-- ui-streaming (full record over SSE/WebSocket)
|
generalist tool returns +-- cache (full record persisted)
AnnotatedRecord<T> --->|
+-- formatter:* (one per channel format — email-html,
| sms, voice, push, PDF, ...)
|
+-- audit (consumer-registered redaction rules)
|
+-- peer-response (federation inbound; via A2A)
The orchestrator's actor loop awaits the llm-session delivery (the LLM needs its tool_result synchronously); other destinations dispatch in parallel via Promise.all. Failures in non-llm-session destinations log via the observer hook and do not propagate.
The wire shape
AnnotatedRecord<T> is a per-field map of values plus annotations. Annotations declare per-field trust metadata — composition tier, temporal-render hint, and any consumer-registered namespaced slots.
import { type AnnotatedRecord } from '@cool-ai/beach-core';
const result: AnnotatedRecord<{
taskId: number;
status: string;
description: string;
internalId: string;
}> = {
taskId: { value: 42, composition: 'anchored' },
status: { value: 'AwaitingReply', composition: 'anchored' },
description: { value: 'The Heritage Fund', composition: 'narrative' },
internalId: { value: 'INT-99', composition: 'anchored' },
};
Three composition tiers:
anchored— fact-bearing values read from authoritative source data (a database row, an API response, a confirmed booking reference). Rendered by token substitution at the Composer; the LLM never paraphrases an anchored value.validated— deterministically derived from anchored facts, with the derivation flagged in a code comment. Rendered against an invariant declared at the field's schema. Use for aggregates, summaries, classifications.narrative— LLM-authored end-to-end. No structural constraint beyond the channel format. Use for prose descriptions, framing, context.
Plus optional:
temporalRender—'absolute' | 'relative' | 'either'for date/time fields. Lets the rendering boundary decide between "2026-05-13T17:00Z", "in three days", or either.- Namespaced consumer slots — keys with a colon prefix (
legal:provenance,po:confidence, etc.). Beach reserves no canonical names; consumers register their own domain vocabulary.
The canonical filter DSL
Filter rules at each destination are declarative, not function-based. The DSL is JSON-shaped — version-controlled-as-data, auditable from outside the running application, transferrable to a future port of Beach to another language.
import {
applyFilter,
passThrough,
pickFields,
omitFields,
redactField,
renameFields,
truncate,
mapList,
compose,
} from '@cool-ai/beach-core';
const llmFilter = compose(
pickFields(['products']),
mapList('products', pickFields(['ref', 'price', 'summary'])),
);
const cacheFilter = passThrough();
const auditFilter = redactField('contactEmail', '[PII]');
The eight canonical operations cover the 90%+ case:
| Operation | Effect |
|---|---|
passThrough() |
Returns input unchanged. |
pickFields(fields) |
Keeps named fields, drops the rest. |
omitFields(fields) |
Drops named fields, keeps the rest. The destination doesn't know they ever existed. |
redactField(field, placeholder?) |
Preserves the field's presence and tier annotation; replaces the value with a placeholder (default '[redacted]'). |
renameFields({ from: to }) |
Renames at the destination. The framework reverse-renames LLM tool-call args automatically (see "Reverse-rename" below). |
truncate(field, maxChars) |
Truncates a string field to N chars. |
mapList(field, rule) |
Applies rule to each element of a list-of-records field. |
compose(rules...) |
Chains rules left-to-right. |
The custom escape hatch is available for rules genuinely beyond the DSL. The framework strips and re-applies annotations on custom output — custom-rule authors cannot mutate the trust contract.
Architectural law: filter rules are pure projection
This is the rule that distinguishes Beach's primitive from a generic transform pipeline:
Filter rules can strip fields, reshape values, or transform views per destination, but cannot demote tier annotations on the fields that survive.
If a field is composition: 'anchored' at the data source, no filter rule can route it through LLM-rendered narrative downstream; its annotation travels intact. Filters never relabel a field's composition or rewrite a value's namespaced slots.
This is enforced at two architectural seams:
- At dispatch — the canonical filter operations are pure projection by construction. The
customescape hatch's output has annotations stripped and re-applied from the input; custom rules cannot expand the trust contract. - At rendering (the Composer primitive) — anchored fields are token-substituted, validated fields invariant-checked, narrative fields LLM-authored. The Composer never elevates trust beyond the field's declared tier.
Wiring the primitive
Construct one FilterAndDistribute per Beach application. Register destinations at startup.
import {
FilterAndDistribute,
type Destination,
type DestinationTransport,
} from '@cool-ai/beach-core';
const llmTransport: DestinationTransport = {
async deliver(_filteredRecord, _ctx) {
// The actor loop owns the LLM tool_result hand-off; this transport
// is a no-op. The framework returns the filtered record from
// FilterAndDistribute.dispatch() as outcome.llmResult.
},
};
const cacheTransport: DestinationTransport = {
async deliver(filteredRecord, ctx) {
await missiveStore.write({
sessionId: ctx.sessionId!,
turnId: ctx.turnId!,
payload: filteredRecord,
capturedAt: new Date().toISOString(),
});
},
};
const fnd = new FilterAndDistribute({
destinations: [
{ id: 'llm-session', type: 'llm-session', filter: passThrough(), transport: llmTransport },
{ id: 'cache', type: 'cache', filter: passThrough(), transport: cacheTransport, retryPolicy: { attempts: 3, backoff: 'exponential' } },
{ id: 'ui-streaming', type: 'ui-streaming', filter: omitFields(['internalId']), transport: uiTransport },
{ id: 'audit', type: 'audit', filter: passThrough(), transport: auditTransport },
],
});
Pass the instance to callActor:
import { callActor } from '@cool-ai/beach-llm';
const result = await callActor({
config: actorConfig,
messages: [{ role: 'user', content: userInput }],
sessionId,
slotKey: 'main',
registry: toolRegistry,
provider: anthropicProvider,
filterAndDistribute: fnd,
});
When a generalist tool's handler returns an AnnotatedRecord, the framework dispatches inline. The llm-session destination's filtered record (annotations stripped for the LLM's JSON-shaped tool_result) becomes the LLM's tool_result content; the other destinations receive their per-destination filtered views in parallel.
Specialist tools and non-AnnotatedRecord returns continue to use the legacy JSON-stringify path — fully backwards compatible.
Filter precedence: type > class > destination default
The destination's default filter is the floor. Two override mechanisms layer on top:
Class-level overrides (per event class) on the destination:
{
id: 'llm-session',
type: 'llm-session',
filter: passThrough(),
classOverrides: {
flight_results: compose(
pickFields(['products']),
mapList('products', pickFields(['summary', 'highlights'])),
),
hotel_results: pickFields(['summary', 'starRating', 'pricePerNight']),
},
transport,
}
Type-level overrides (per data-part type) on PartTypeRegistry:
import { PartTypeRegistry } from '@cool-ai/beach-core';
PartTypeRegistry.register({
name: 'package-holiday',
description: 'A complete travel package',
filterOverrides: {
'llm-session': pickFields(['packagePrice', 'description', 'startDate', 'endDate']),
// The LLM never sees per-component pricing for a package holiday,
// even though the cache and UI destinations get the full record.
},
});
When dispatching, the framework resolves per destination + context (partType, eventClass):
- If
ctx.partTypeis registered with afilterOverrides[dest.id]entry, that wins. - Else if
ctx.eventClassmatches a key indest.classOverrides, that wins. - Else the destination's default
filterapplies.
Pass dispatchContext through callActor:
await callActor({
// ...
filterAndDistribute: fnd,
dispatchContext: { partType: 'package-holiday', eventClass: 'flight_results' },
});
Reverse-rename: tool-call round trips
When the llm-session destination's filter renames canonical fields to aliases for the LLM's view (renameFields({ totalPrice: 'priceLabel' })), the LLM may write a tool call referencing the alias:
{ "tool": "book_flight", "input": { "priceLabel": 94, "route": "LHR-JFK" } }
Beach reverse-renames the args before invoking the handler. The handler sees the canonical names:
async function bookFlightHandler(input: { totalPrice: number; route: string }) {
// input.totalPrice is 94; input.route is 'LHR-JFK'.
// The handler has no awareness that the LLM saw a renamed view.
}
The reverse-rename is automatic for renameFields rules registered on the llm-session destination's default filter. Beach rejects conflicting rename targets at registration: a destination cannot have two distinct canonical sources mapping to the same alias.
Other destinations (ui-streaming, cache, formatter, audit, peer-response) don't generate tool calls, so reverse-rename is meaningless for them; the rename is purely directional in those cases.
The six canonical destinations
| Destination | Purpose | Default filter | Awaited at dispatch? |
|---|---|---|---|
llm-session |
The orchestrating LLM's tool_result view |
Default unfiltered (the LLM is allowed to see the data; the trust mechanism lives in tier annotations + Composer token-substitution) | Yes |
ui-streaming |
Browser / native UI rendering authoritative data via SSE / WebSocket | Default full pass-through | No |
cache |
Persistent store (Redis, DB) for replay and re-rendering | Default full pass-through | No |
formatter |
Composer input for batched channels (one instance per channel format the consumer uses) | Default unfiltered; tier annotations protect content-rendering trust at the Composer | No |
audit |
Trust-audit log; per-event records for compliance, drift detection, replay | Default full pass-through with consumer-registered redaction rules | No |
peer-response |
Federation inbound: results arriving from a peer via A2A flow into this destination, fan out from there | Default full pass-through; tier annotations cross the wire intact | No |
Multiple formatter:* destinations are the canonical pattern, one per channel format:
{ id: 'formatter:email-html', type: 'formatter', filter: emailHtmlFilter, transport: emailHtmlComposer },
{ id: 'formatter:sms', type: 'formatter', filter: smsFilter, transport: smsComposer },
{ id: 'formatter:voice', type: 'formatter', filter: voiceFilter, transport: voiceComposer },
Each formatter:* destination's filter shapes what its Composer sees. Channels share the destination type (which Beach owns) but never the filter rules (which the consumer authors per channel). This separation keeps formatter channel-blind in core while letting consumers shape per-channel behaviour explicitly.
Failure semantics
- The
llm-sessiondelivery is awaited; if it throws, the actor'stool_resultis the error and the actor handles it as it would any other tool failure. - Non-
llm-sessiondestinations are dispatched in parallel viaPromise.all. Each delivery's outcome is reported to the observer hook (success / failure / latency / attempts). Failures do not propagate. - Per-destination retry policy is opt-in: declare
retryPolicy: { attempts: 3, backoff: 'exponential' }on a destination that needs retry semantics. Default: no retry. - No cross-destination guarantees. A failure in
cachedoes not affect the success ofui-streaming. A consumer that needs "cache must succeed before UI sees the data" should serialise via their own wiring; the framework does not implement transactional cross-destination delivery (planned for a future minor release).
Wire the observer hook for OpenTelemetry span emission, audit logging, or whatever the consumer's observability stack expects:
const fnd = new FilterAndDistribute({
destinations: [...],
observer: {
onDispatch(result) {
// result: { destinationId, destinationType, outcome, attempts, latencyMs, error? }
otelSpan.addEvent('beach.fnd.dispatch', { ...result });
},
},
});
Observer errors are caught and logged to console.error; they never propagate.
The annotateRecord helper
Annotating every field of every record manually is verbose for tools returning lists. The annotateRecord helper takes a per-type schema and applies it uniformly:
import { annotateRecord, type AnnotationSchema } from '@cool-ai/beach-core';
interface FlightProduct {
ref: string;
price: number;
summary: string;
highlights: string;
internalRef: string;
}
const flightProductSchema = {
ref: { composition: 'anchored' },
price: { composition: 'anchored' },
summary: { composition: 'narrative' },
highlights: { composition: 'narrative' },
internalRef: { composition: 'anchored' },
} as const satisfies AnnotationSchema<FlightProduct>;
// In a list-returning tool:
return {
query: { value: input.query, composition: 'anchored' },
products: {
value: results.map((p) => annotateRecord(p, flightProductSchema)),
composition: 'anchored',
},
};
The schema-vs-record field-coverage rule:
- Record has a field the schema doesn't cover → throws at the authoring boundary. An unannotated field reaching the Composer would bypass the tier trust model entirely; this must fail loudly, not silently at render time.
- Schema declares a field the record doesn't have → silently skipped. Domain types have optional fields (a task may or may not have a
dueDate); requiring every schema-declared field to appear on every record would defeat the helper's purpose.
Net rule: the schema is the trust contract; the record may be a subset.
Wider architectural placement
FilterAndDistribute sits between framework-enforced tool routing and the Composer primitive. The chain:
Generalist tool's handler returns AnnotatedRecord<T>
↓
Framework dispatches through FilterAndDistribute (this primitive)
↓
Each destination receives its filtered view; tier annotations survive
↓
The formatter:* destinations feed into Composer instances
↓
The Composer renders authoritative data + narrative into channel artifact
The trust mechanism (tier annotations) and the format mechanism (filter rules) stay structurally independent through the whole pipeline.
Adopting the primitive
Register a FilterAndDistribute instance, return AnnotatedRecord from your generalist tools, pass the instance to callActor. No router rule is needed for the dispatch step — it's inline in the actor loop.
If you were using @cool-ai/beach-starter's filterAndDistributeHandler (removed in @cool-ai/beach-starter@1.1.0): replace each handler with an AnnotatedRecord-returning tool, register a FilterAndDistribute instance per the wiring above, and pass it to callActor. The handler-by-handler walk-through depends on the destinations your application uses; reach out via the issue tracker if a worked example would help.
Reference
- Wire shape:
AnnotatedValue<T>,AnnotatedRecord<T>,FilterRule—@cool-ai/beach-core's public exports. - Helper:
annotateRecord(record, schema)—@cool-ai/beach-core. - Class:
FilterAndDistribute—@cool-ai/beach-core. - Architectural law: adopt-vs-build gate, point 3.