fork
Run multiple steps in parallel with race, all, or settle semantics.
Quick Example
import { fork, step } from '@noetic-tools/core';
const fastest = fork({
id: 'fastest-response',
mode: 'race',
paths: (input: string) => [
step.llm({
id: 'model-a',
model: 'openai/gpt-4o',
}),
step.llm({
id: 'model-b',
model: 'openai/gpt-4o-mini',
}),
],
});What It Does
fork runs multiple steps concurrently and combines the results. There are three modes that determine how results are collected.
Modes
race
Returns the output of the first step to complete. All other paths are discarded.
import { fork, step } from '@noetic-tools/core';
const raceExample = fork({
id: 'race-providers',
mode: 'race',
paths: (query: string) => [
step.run({
id: 'provider-a',
execute: async (q) => fetchProviderA(q),
}),
step.run({
id: 'provider-b',
execute: async (q) => fetchProviderB(q),
}),
],
});No merge function is needed -- the winner's output is used directly.
all
Waits for every path to complete, then calls merge with an array of all results.
import { fork, step } from '@noetic-tools/core';
const allExample = fork({
id: 'gather-research',
mode: 'all',
paths: (topic: string) => [
step.run({
id: 'academic',
execute: async (t) => searchAcademic(t),
}),
step.run({
id: 'news',
execute: async (t) => searchNews(t),
}),
step.run({
id: 'social',
execute: async (t) => searchSocial(t),
}),
],
merge: (results) => results.flat(),
});If any path throws, the entire fork throws fork_partial. The first genuine failure also cancels the remaining paths: siblings still queued behind the concurrency limit are skipped, and in-flight siblings are aborted cooperatively (they stop at their next step boundary or blocked channel operation) and then awaited. Cancelled siblings appear in fork_partial.failed with error.kind === 'cancelled'; paths that completed before the failure stay in succeeded. If the parent context itself is aborted mid-fork, the fork throws cancelled instead of fork_partial.
settle
Like all, but collects both successes and failures. Each result is a SettleResult with a status field.
import type { SettleResult } from '@noetic-tools/core';
import { fork, step } from '@noetic-tools/core';
const settleExample = fork({
id: 'resilient-search',
mode: 'settle',
paths: (query: string) => [
step.run({
id: 'source-a',
execute: async (q) => fetchSourceA(q),
}),
step.run({
id: 'source-b',
execute: async (q) => fetchSourceB(q),
}),
],
merge: (results: SettleResult<string>[]) => {
const successes = results
.filter((r) => r.status === 'fulfilled')
.map((r) => r.value);
return successes.join('\n');
},
});SettleResult Type
| Property | Type | Description |
|---|---|---|
stepId | string | The id of the step that produced this result |
status | 'fulfilled' | 'rejected' | Whether the step succeeded or failed |
value | O | undefined | The output value (present when fulfilled) |
error | NoeticError | undefined | The error (present when rejected) |
API Reference
Common Options (all modes)
| Property | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Unique step identifier |
mode | 'race' | 'all' | 'settle' | Yes | Execution mode |
paths | (input: I, ctx: Context<TMemory>) => Step<TMemory, I, O>[] | Yes | Function returning the steps to run |
concurrency | number | No | Max parallel executions |
Mode-Specific Options
| Property | Modes | Type | Description |
|---|---|---|---|
merge | all | (results: O[], ctx: Context<TMemory>) => O | Combine all outputs into one |
merge | settle | (results: SettleResult<O>[], ctx: Context<TMemory>) => O | Combine settled results into one |
Concurrency Control
Limit how many paths run at once with concurrency.
import { fork, step } from '@noetic-tools/core';
const throttled = fork({
id: 'throttled-fork',
mode: 'all',
paths: (input: string) => generateManySteps(input),
merge: (results) => results,
concurrency: 3, // at most 3 paths execute simultaneously
});