Stream API
The stream API provides reactive state management for streaming HTTP responses. It handles incremental data updates as chunks are received, making it perfect for real-time data or large responses that should be processed as they arrive.
Optimistic Model
The stream API uses an optimistic model where the initial state is used to display loading states. This means that the initial state is displayed while the request is pending. It's also useful to build a streaming UI such as a chat app.
streamState
Create a reactive stream state object that handles streaming responses and updates incrementally as data chunks are received.
Type Signature
type streamState = <T, S extends LinkableSchema = LinkableSchema>(
init: T,
options?: StreamOptions<T, S>
) => FetchState<T>;
Parameters
init
- The initial data value for the stateoptions
- Configuration options including URL, standardRequestInit
parameters, and optional transform function
Return Value
Returns a reactive FetchState
object with the following properties:
data
- The streamed data (initially the provided initial value, grows as chunks arrive)status
- Current request status (pending
,success
, orerror
)error
- Error object if the request failedresponse
- Raw Response object from the fetch API
Options
The options object extends FetchOptions
and includes:
url
- The URL to fetch (string or URL object)transform
- Optional function to transform each chunk before appending it to the data- All standard
RequestInit
properties like method, headers, body, etc.
Transform Function
The transform function allows you to customize how chunks are appended to the existing data:
type transform = (current: T, chunk: T) => T;
Depending on the data type:
- For strings: chunks are concatenated
- For objects: chunks are merged using
anchor.assign
- For arrays: chunk elements are pushed to the array
Usage
import { streamState, derive } from '@anchorlib/core';
// Stream text data
const textStream = streamState(
'', // Initial empty string
{
url: '/api/stream-text',
}
);
// Stream JSON array data
const arrayStream = streamState(
[], // Initial empty array
{
url: '/api/stream-array',
transform: (current, chunk) => {
// Custom transform logic
return [...current, ...chunk];
},
}
);
// Subscribe to state changes
derive(textStream, (state) => {
switch (state.status) {
case 'pending':
console.log('Stream connecting...');
break;
case 'success':
console.log('Stream complete. Final data:', state.data);
break;
case 'error':
console.error('Stream error:', state.error);
break;
default:
// While streaming, data is updated incrementally
console.log('Received data:', state.data);
}
});
streamState.promise
Convert a stream state object to a Promise for traditional async/await usage.
Type Signature
type promise = <T, S extends FetchState<T>>(state: S) => Promise<S>;
Parameters
state
- A stream state object created bystreamState
Return Value
Returns a Promise that resolves with the final state when the stream completes, or rejects with the error when the stream fails.
Usage
import { streamState } from '@anchorlib/core';
const stream = streamState('', { url: '/api/stream' });
try {
const finalState = await streamState.promise(stream);
console.log('Stream complete. Final data:', finalState.data);
} catch (error) {
console.error('Stream failed:', error);
}
Examples
Streaming Text Data
import { streamState } from '@anchorlib/core';
// Stream a large text file
const logStream = streamState('', {
url: '/api/logs',
});
// As chunks arrive, logStream.data will grow with the received text
Streaming JSON Array
import { streamState } from '@anchorlib/core';
interface Item {
id: number;
name: string;
}
// Stream an array of items
const itemStream = streamState<Item[]>([], {
url: '/api/items/stream',
});
// As chunks of items arrive, they will be pushed to the array
Custom Transform Function
import { streamState } from '@anchorlib/core';
interface DataChunk {
items: string[];
timestamp: number;
}
// Stream with custom transform
const customStream = streamState<DataChunk[]>([], {
url: '/api/data/stream',
transform: (current, chunk) => {
// Add timestamp to each chunk
return [...current, { ...chunk, timestamp: Date.now() }];
},
});
Real-time Updates
import { streamState, derive } from '@anchorlib/core';
const chatStream = streamState<string>('', {
url: '/api/chat/stream',
});
// Update UI in real-time as messages arrive
derive(chatStream, (state) => {
if (state.status === 'success') {
// Stream completed
document.getElementById('status')!.textContent = 'Connection closed';
} else if (state.status === 'error') {
// Stream error
document.getElementById('status')!.textContent = 'Connection error';
} else {
// Update with new data as it arrives
document.getElementById('chat')!.textContent = state.data;
}
});
Best Practices
1. Choose Appropriate Initial Values
Initialize stream states with appropriate values that can be incrementally updated:
// For text streaming
const textStream = streamState('', { url: '/api/stream-text' });
// For array streaming
const arrayStream = streamState<Item[]>([], { url: '/api/stream-array' });
// For object merging
const objectStream = streamState<Partial<Data>>({}, { url: '/api/stream-object' });
2. Use Transform Functions for Complex Data Handling
When you need custom logic for handling chunks, use transform functions:
const stream = streamState<Item[]>([], {
url: '/api/items',
transform: (current, chunk) => {
// Deduplicate items based on ID
const existingIds = new Set(current.map((item) => item.id));
const newItems = chunk.filter((item) => !existingIds.has(item.id));
return [...current, ...newItems];
},
});
3. Handle Different Stream Completion States
Make sure to handle all possible states of a stream:
import { streamState, derive } from '@anchorlib/core';
const stream = streamState<string>('', { url: '/api/stream' });
derive(stream, (state) => {
switch (state.status) {
case 'pending':
showConnectingStatus();
break;
case 'success':
showCompletedStatus();
break;
case 'error':
showStreamError(state.error);
break;
default:
// Streaming in progress
updateWithIncrementalData(state.data);
}
});
4. Clean Up Long-running Streams
For long-running streams, ensure you clean up properly:
import { streamState, derive } from '@anchorlib/core';
const stream = streamState('', { url: '/api/stream' });
const unsubscribe = derive(stream, (state) => {
// Handle streaming data
});
// When component unmounts or stream no longer needed:
// unsubscribe();
5. Handle Large Data Streams Efficiently
For streams that produce large amounts of data, consider throttling updates:
import { streamState, derive } from '@anchorlib/core';
const stream = streamState('', { url: '/api/large-stream' });
let lastUpdate = Date.now();
const UPDATE_INTERVAL = 100; // ms
derive(stream, (state) => {
const now = Date.now();
// Only update UI at most every 100ms to prevent performance issues
if (state.status === 'pending' && now - lastUpdate > UPDATE_INTERVAL) {
updateUIWithStreamData(state.data);
lastUpdate = now;
} else if (state.status !== 'pending') {
// Always update for final states
updateUIWithStreamData(state.data);
}
});
6. Validate Stream Data
When working with streamed JSON data, validate chunks before processing:
import { streamState } from '@anchorlib/core';
interface ValidatedItem {
id: number;
name: string;
}
const stream = streamState<ValidatedItem[]>([], {
url: '/api/stream',
transform: (current, chunk) => {
// Validate chunk before adding to current data
if (Array.isArray(chunk) && chunk.every((item) => typeof item.id === 'number' && typeof item.name === 'string')) {
return [...current, ...chunk];
}
console.warn('Invalid chunk received, skipping:', chunk);
return current;
},
});