The streaming API processes text incrementally, making it ideal for real-time LLM output, file processing, and other scenarios where text arrives in chunks.
Streaming is only available in Node.js and Bun. It is not included in the browser build.
Basic Usage
import { createAnonymizerStream } from 'rehydra/streaming' ;
const stream = await createAnonymizerStream ({
anonymizer: {
ner: { mode: 'quantized' },
},
});
// Pipe through the stream
process . stdin . pipe ( stream ). pipe ( process . stdout );
How It Works
The streaming anonymizer uses a sentence buffer that accumulates text until a sentence boundary is detected, then anonymizes complete sentences. This ensures NER has enough context for accurate detection.
Incoming chunks → Sentence Buffer → Anonymize → Output
(accumulates) (per sentence)
An overlap region preserves context between flushes so that entities spanning buffer boundaries are detected correctly.
Stream Configuration
import { createAnonymizerStream } from 'rehydra/streaming' ;
const stream = await createAnonymizerStream ({
// Anonymizer configuration (same as createAnonymizer)
anonymizer: {
ner: { mode: 'quantized' },
semantic: { enabled: true },
},
// Policy override
policy: { enableLeakScan: true },
// Locale hint
locale: 'de-DE' ,
// Buffer configuration
buffer: {
overlapChars: 100 , // Context overlap between flushes (default: 100)
maxBufferSize: 8192 , // Force flush at this size (default: 8192)
minBufferSize: 50 , // Minimum before flushing (default: 50)
sentenceBoundary: / [ .!? ] \s + / , // Custom boundary regex
},
// Session persistence
sessionId: 'chat-123' ,
keyProvider: myKeyProvider ,
piiStorageProvider: myStorage ,
saveIntervalMs: 5000 , // Periodic PII map saves (ms)
// Events
onChunk : ( event ) => {
console . log ( `Chunk: ${ event . anonymizedText } ` );
console . log ( `Entities so far: ${ event . totalEntities } ` );
},
onFinish : ( event ) => {
console . log ( `Done. Total entities: ${ event . totalEntities } ` );
console . log ( `Total time: ${ event . totalProcessingTimeMs } ms` );
},
});
Low-Latency Mode
For real-time LLM token streams where latency matters more than NER accuracy:
const stream = await createAnonymizerStream ({
buffer: {
lowLatency: true , // Disables NER, reduces buffer sizes, flushes aggressively
},
});
Low-latency mode:
Disables NER (regex-only detection)
Reduces buffer sizes for faster flushing
Optimized for token-by-token LLM output
Events
onChunk
Fires after each buffered chunk is anonymized:
interface StreamChunkEvent {
anonymizedText : string ;
entities : DetectedEntity [];
totalEntities : number ;
processingTimeMs : number ;
}
onFinish
Fires when the stream ends:
interface StreamFinishEvent {
totalEntities : number ;
piiMap ?: EncryptedPIIMap ;
totalProcessingTimeMs : number ;
}
Accessing the PII Map
After the stream ends, retrieve the cumulative PII map:
stream . on ( 'finish' , () => {
const piiMap = stream . getPiiMap ();
const stats = stream . stats ;
});
Session Integration
Persist PII maps across streaming sessions:
import { createAnonymizerStream } from 'rehydra/streaming' ;
import { InMemoryKeyProvider , SQLitePIIStorageProvider } from 'rehydra' ;
const storage = new SQLitePIIStorageProvider ( './pii.db' );
await storage . initialize ();
const stream = await createAnonymizerStream ({
sessionId: 'chat-123' ,
keyProvider: new InMemoryKeyProvider (),
piiStorageProvider: storage ,
saveIntervalMs: 5000 , // Save PII map every 5 seconds
});
If a session already has a stored PII map, the stream loads it and continues with consistent entity IDs.
Next Steps
LLM Proxy Automatic anonymization for LLM API calls
Streaming API Reference Complete streaming API documentation