Filter-and-distribute: per-destination views from one tool result

Every Beach application with multi-destination delivery used to rebuild the same primitive: take a routed tool result, apply per-destination filtering, dispatch to N destinations with per-destination views. The third time was the limit. From @cool-ai/beach-core@0.6.0, the primitive ships in core.

This guide describes what the primitive does, how to wire it, and the architectural law it enforces. The surface is generic; the trust model is what makes it Beach-shaped.

The shape

A generalist tool's handler returns an AnnotatedRecord<T> — every field carries its own trust metadata declared at the data source. The framework dispatches that record through FilterAndDistribute. Six canonical destinations receive their own filtered view in parallel:

                        +-- llm-session  (filtered for the LLM's tool_result)
                        |
                        +-- ui-streaming  (full record over SSE/WebSocket)
                        |
generalist tool returns +-- cache  (full record persisted)
AnnotatedRecord<T>  --->|
                        +-- formatter:* (one per channel format — email-html,
                        |               sms, voice, push, PDF, ...)
                        |
                        +-- audit  (consumer-registered redaction rules)
                        |
                        +-- peer-response  (federation inbound; via A2A)

The orchestrator's actor loop awaits the llm-session delivery (the LLM needs its tool_result synchronously); other destinations dispatch in parallel via Promise.all. Failures in non-llm-session destinations log via the observer hook and do not propagate.

The wire shape

AnnotatedRecord<T> is a per-field map of values plus annotations. Annotations declare per-field trust metadata — composition tier, temporal-render hint, and any consumer-registered namespaced slots.

import { type AnnotatedRecord } from '@cool-ai/beach-core';

const result: AnnotatedRecord<{
  taskId: number;
  status: string;
  description: string;
  internalId: string;
}> = {
  taskId:      { value: 42,                    composition: 'anchored' },
  status:      { value: 'AwaitingReply',       composition: 'anchored' },
  description: { value: 'The Heritage Fund',   composition: 'narrative' },
  internalId:  { value: 'INT-99',              composition: 'anchored' },
};

Three composition tiers:

  • anchored — fact-bearing values read from authoritative source data (a database row, an API response, a confirmed booking reference). Rendered by token substitution at the Composer; the LLM never paraphrases an anchored value.
  • validated — deterministically derived from anchored facts, with the derivation flagged in a code comment. Rendered against an invariant declared at the field's schema. Use for aggregates, summaries, classifications.
  • narrative — LLM-authored end-to-end. No structural constraint beyond the channel format. Use for prose descriptions, framing, context.

Plus optional:

  • temporalRender'absolute' | 'relative' | 'either' for date/time fields. Lets the rendering boundary decide between "2026-05-13T17:00Z", "in three days", or either.
  • Namespaced consumer slots — keys with a colon prefix (legal:provenance, po:confidence, etc.). Beach reserves no canonical names; consumers register their own domain vocabulary.

The canonical filter DSL

Filter rules at each destination are declarative, not function-based. The DSL is JSON-shaped — version-controlled-as-data, auditable from outside the running application, transferrable to a future port of Beach to another language.

import {
  applyFilter,
  passThrough,
  pickFields,
  omitFields,
  redactField,
  renameFields,
  truncate,
  mapList,
  compose,
} from '@cool-ai/beach-core';

const llmFilter = compose(
  pickFields(['products']),
  mapList('products', pickFields(['ref', 'price', 'summary'])),
);

const cacheFilter = passThrough();

const auditFilter = redactField('contactEmail', '[PII]');

The eight canonical operations cover the 90%+ case:

Operation Effect
passThrough() Returns input unchanged.
pickFields(fields) Keeps named fields, drops the rest.
omitFields(fields) Drops named fields, keeps the rest. The destination doesn't know they ever existed.
redactField(field, placeholder?) Preserves the field's presence and tier annotation; replaces the value with a placeholder (default '[redacted]').
renameFields({ from: to }) Renames at the destination. The framework reverse-renames LLM tool-call args automatically (see "Reverse-rename" below).
truncate(field, maxChars) Truncates a string field to N chars.
mapList(field, rule) Applies rule to each element of a list-of-records field.
compose(rules...) Chains rules left-to-right.

The custom escape hatch is available for rules genuinely beyond the DSL. The framework strips and re-applies annotations on custom output — custom-rule authors cannot mutate the trust contract.

Architectural law: filter rules are pure projection

This is the rule that distinguishes Beach's primitive from a generic transform pipeline:

Filter rules can strip fields, reshape values, or transform views per destination, but cannot demote tier annotations on the fields that survive.

If a field is composition: 'anchored' at the data source, no filter rule can route it through LLM-rendered narrative downstream; its annotation travels intact. Filters never relabel a field's composition or rewrite a value's namespaced slots.

This is enforced at two architectural seams:

  1. At dispatch — the canonical filter operations are pure projection by construction. The custom escape hatch's output has annotations stripped and re-applied from the input; custom rules cannot expand the trust contract.
  2. At rendering (the Composer primitive) — anchored fields are token-substituted, validated fields invariant-checked, narrative fields LLM-authored. The Composer never elevates trust beyond the field's declared tier.

Wiring the primitive

Construct one FilterAndDistribute per Beach application. Register destinations at startup.

import {
  FilterAndDistribute,
  type Destination,
  type DestinationTransport,
} from '@cool-ai/beach-core';

const llmTransport: DestinationTransport = {
  async deliver(_filteredRecord, _ctx) {
    // The actor loop owns the LLM tool_result hand-off; this transport
    // is a no-op. The framework returns the filtered record from
    // FilterAndDistribute.dispatch() as outcome.llmResult.
  },
};

const cacheTransport: DestinationTransport = {
  async deliver(filteredRecord, ctx) {
    await missiveStore.write({
      sessionId: ctx.sessionId!,
      turnId: ctx.turnId!,
      payload: filteredRecord,
      capturedAt: new Date().toISOString(),
    });
  },
};

const fnd = new FilterAndDistribute({
  destinations: [
    { id: 'llm-session', type: 'llm-session', filter: passThrough(),   transport: llmTransport },
    { id: 'cache',       type: 'cache',       filter: passThrough(),   transport: cacheTransport, retryPolicy: { attempts: 3, backoff: 'exponential' } },
    { id: 'ui-streaming', type: 'ui-streaming', filter: omitFields(['internalId']), transport: uiTransport },
    { id: 'audit',       type: 'audit',       filter: passThrough(),   transport: auditTransport },
  ],
});

Pass the instance to callActor:

import { callActor } from '@cool-ai/beach-llm';

const result = await callActor({
  config: actorConfig,
  messages: [{ role: 'user', content: userInput }],
  sessionId,
  slotKey: 'main',
  registry: toolRegistry,
  provider: anthropicProvider,
  filterAndDistribute: fnd,
});

When a generalist tool's handler returns an AnnotatedRecord, the framework dispatches inline. The llm-session destination's filtered record (annotations stripped for the LLM's JSON-shaped tool_result) becomes the LLM's tool_result content; the other destinations receive their per-destination filtered views in parallel.

Specialist tools and non-AnnotatedRecord returns continue to use the legacy JSON-stringify path — fully backwards compatible.

Filter precedence: type > class > destination default

The destination's default filter is the floor. Two override mechanisms layer on top:

Class-level overrides (per event class) on the destination:

{
  id: 'llm-session',
  type: 'llm-session',
  filter: passThrough(),
  classOverrides: {
    flight_results: compose(
      pickFields(['products']),
      mapList('products', pickFields(['summary', 'highlights'])),
    ),
    hotel_results: pickFields(['summary', 'starRating', 'pricePerNight']),
  },
  transport,
}

Type-level overrides (per data-part type) on PartTypeRegistry:

import { PartTypeRegistry } from '@cool-ai/beach-core';

PartTypeRegistry.register({
  name: 'package-holiday',
  description: 'A complete travel package',
  filterOverrides: {
    'llm-session': pickFields(['packagePrice', 'description', 'startDate', 'endDate']),
    // The LLM never sees per-component pricing for a package holiday,
    // even though the cache and UI destinations get the full record.
  },
});

When dispatching, the framework resolves per destination + context (partType, eventClass):

  1. If ctx.partType is registered with a filterOverrides[dest.id] entry, that wins.
  2. Else if ctx.eventClass matches a key in dest.classOverrides, that wins.
  3. Else the destination's default filter applies.

Pass dispatchContext through callActor:

await callActor({
  // ...
  filterAndDistribute: fnd,
  dispatchContext: { partType: 'package-holiday', eventClass: 'flight_results' },
});

Reverse-rename: tool-call round trips

When the llm-session destination's filter renames canonical fields to aliases for the LLM's view (renameFields({ totalPrice: 'priceLabel' })), the LLM may write a tool call referencing the alias:

{ "tool": "book_flight", "input": { "priceLabel": 94, "route": "LHR-JFK" } }

Beach reverse-renames the args before invoking the handler. The handler sees the canonical names:

async function bookFlightHandler(input: { totalPrice: number; route: string }) {
  // input.totalPrice is 94; input.route is 'LHR-JFK'.
  // The handler has no awareness that the LLM saw a renamed view.
}

The reverse-rename is automatic for renameFields rules registered on the llm-session destination's default filter. Beach rejects conflicting rename targets at registration: a destination cannot have two distinct canonical sources mapping to the same alias.

Other destinations (ui-streaming, cache, formatter, audit, peer-response) don't generate tool calls, so reverse-rename is meaningless for them; the rename is purely directional in those cases.

The six canonical destinations

Destination Purpose Default filter Awaited at dispatch?
llm-session The orchestrating LLM's tool_result view Default unfiltered (the LLM is allowed to see the data; the trust mechanism lives in tier annotations + Composer token-substitution) Yes
ui-streaming Browser / native UI rendering authoritative data via SSE / WebSocket Default full pass-through No
cache Persistent store (Redis, DB) for replay and re-rendering Default full pass-through No
formatter Composer input for batched channels (one instance per channel format the consumer uses) Default unfiltered; tier annotations protect content-rendering trust at the Composer No
audit Trust-audit log; per-event records for compliance, drift detection, replay Default full pass-through with consumer-registered redaction rules No
peer-response Federation inbound: results arriving from a peer via A2A flow into this destination, fan out from there Default full pass-through; tier annotations cross the wire intact No

Multiple formatter:* destinations are the canonical pattern, one per channel format:

{ id: 'formatter:email-html', type: 'formatter', filter: emailHtmlFilter, transport: emailHtmlComposer },
{ id: 'formatter:sms',        type: 'formatter', filter: smsFilter,       transport: smsComposer },
{ id: 'formatter:voice',      type: 'formatter', filter: voiceFilter,     transport: voiceComposer },

Each formatter:* destination's filter shapes what its Composer sees. Channels share the destination type (which Beach owns) but never the filter rules (which the consumer authors per channel). This separation keeps formatter channel-blind in core while letting consumers shape per-channel behaviour explicitly.

Failure semantics

  • The llm-session delivery is awaited; if it throws, the actor's tool_result is the error and the actor handles it as it would any other tool failure.
  • Non-llm-session destinations are dispatched in parallel via Promise.all. Each delivery's outcome is reported to the observer hook (success / failure / latency / attempts). Failures do not propagate.
  • Per-destination retry policy is opt-in: declare retryPolicy: { attempts: 3, backoff: 'exponential' } on a destination that needs retry semantics. Default: no retry.
  • No cross-destination guarantees. A failure in cache does not affect the success of ui-streaming. A consumer that needs "cache must succeed before UI sees the data" should serialise via their own wiring; the framework does not implement transactional cross-destination delivery (planned for a future minor release).

Wire the observer hook for OpenTelemetry span emission, audit logging, or whatever the consumer's observability stack expects:

const fnd = new FilterAndDistribute({
  destinations: [...],
  observer: {
    onDispatch(result) {
      // result: { destinationId, destinationType, outcome, attempts, latencyMs, error? }
      otelSpan.addEvent('beach.fnd.dispatch', { ...result });
    },
  },
});

Observer errors are caught and logged to console.error; they never propagate.

The annotateRecord helper

Annotating every field of every record manually is verbose for tools returning lists. The annotateRecord helper takes a per-type schema and applies it uniformly:

import { annotateRecord, type AnnotationSchema } from '@cool-ai/beach-core';

interface FlightProduct {
  ref: string;
  price: number;
  summary: string;
  highlights: string;
  internalRef: string;
}

const flightProductSchema = {
  ref:         { composition: 'anchored' },
  price:       { composition: 'anchored' },
  summary:     { composition: 'narrative' },
  highlights:  { composition: 'narrative' },
  internalRef: { composition: 'anchored' },
} as const satisfies AnnotationSchema<FlightProduct>;

// In a list-returning tool:
return {
  query:    { value: input.query, composition: 'anchored' },
  products: {
    value: results.map((p) => annotateRecord(p, flightProductSchema)),
    composition: 'anchored',
  },
};

The schema-vs-record field-coverage rule:

  • Record has a field the schema doesn't cover → throws at the authoring boundary. An unannotated field reaching the Composer would bypass the tier trust model entirely; this must fail loudly, not silently at render time.
  • Schema declares a field the record doesn't have → silently skipped. Domain types have optional fields (a task may or may not have a dueDate); requiring every schema-declared field to appear on every record would defeat the helper's purpose.

Net rule: the schema is the trust contract; the record may be a subset.

Wider architectural placement

FilterAndDistribute sits between framework-enforced tool routing and the Composer primitive. The chain:

Generalist tool's handler returns AnnotatedRecord<T>
       ↓
Framework dispatches through FilterAndDistribute (this primitive)
       ↓
Each destination receives its filtered view; tier annotations survive
       ↓
The formatter:* destinations feed into Composer instances
       ↓
The Composer renders authoritative data + narrative into channel artifact

The trust mechanism (tier annotations) and the format mechanism (filter rules) stay structurally independent through the whole pipeline.

Adopting the primitive

Register a FilterAndDistribute instance, return AnnotatedRecord from your generalist tools, pass the instance to callActor. No router rule is needed for the dispatch step — it's inline in the actor loop.

If you were using @cool-ai/beach-starter's filterAndDistributeHandler (removed in @cool-ai/beach-starter@1.1.0): replace each handler with an AnnotatedRecord-returning tool, register a FilterAndDistribute instance per the wiring above, and pass it to callActor. The handler-by-handler walk-through depends on the destinations your application uses; reach out via the issue tracker if a worked example would help.

Reference

  • Wire shape: AnnotatedValue<T>, AnnotatedRecord<T>, FilterRule@cool-ai/beach-core's public exports.
  • Helper: annotateRecord(record, schema)@cool-ai/beach-core.
  • Class: FilterAndDistribute@cool-ai/beach-core.
  • Architectural law: adopt-vs-build gate, point 3.