Skip to main content
The streaming API processes text incrementally, making it ideal for real-time LLM output, file processing, and other scenarios where text arrives in chunks.
Streaming is only available in Node.js and Bun. It is not included in the browser build.

Basic Usage

import { createAnonymizerStream } from 'rehydra/streaming';

const stream = await createAnonymizerStream({
  anonymizer: {
    ner: { mode: 'quantized' },
  },
});

// Pipe through the stream
process.stdin.pipe(stream).pipe(process.stdout);

How It Works

The streaming anonymizer uses a sentence buffer that accumulates text until a sentence boundary is detected, then anonymizes complete sentences. This ensures NER has enough context for accurate detection.
Incoming chunks → Sentence Buffer → Anonymize → Output
                  (accumulates)     (per sentence)
An overlap region preserves context between flushes so that entities spanning buffer boundaries are detected correctly.

Stream Configuration

import { createAnonymizerStream } from 'rehydra/streaming';

const stream = await createAnonymizerStream({
  // Anonymizer configuration (same as createAnonymizer)
  anonymizer: {
    ner: { mode: 'quantized' },
    semantic: { enabled: true },
  },

  // Policy override
  policy: { enableLeakScan: true },

  // Locale hint
  locale: 'de-DE',

  // Buffer configuration
  buffer: {
    overlapChars: 100,     // Context overlap between flushes (default: 100)
    maxBufferSize: 8192,   // Force flush at this size (default: 8192)
    minBufferSize: 50,     // Minimum before flushing (default: 50)
    sentenceBoundary: /[.!?]\s+/,  // Custom boundary regex
  },

  // Session persistence
  sessionId: 'chat-123',
  keyProvider: myKeyProvider,
  piiStorageProvider: myStorage,
  saveIntervalMs: 5000,  // Periodic PII map saves (ms)

  // Events
  onChunk: (event) => {
    console.log(`Chunk: ${event.anonymizedText}`);
    console.log(`Entities so far: ${event.totalEntities}`);
  },
  onFinish: (event) => {
    console.log(`Done. Total entities: ${event.totalEntities}`);
    console.log(`Total time: ${event.totalProcessingTimeMs}ms`);
  },
});

Low-Latency Mode

For real-time LLM token streams where latency matters more than NER accuracy:
const stream = await createAnonymizerStream({
  buffer: {
    lowLatency: true,  // Disables NER, reduces buffer sizes, flushes aggressively
  },
});
Low-latency mode:
  • Disables NER (regex-only detection)
  • Reduces buffer sizes for faster flushing
  • Optimized for token-by-token LLM output

Events

onChunk

Fires after each buffered chunk is anonymized:
interface StreamChunkEvent {
  anonymizedText: string;
  entities: DetectedEntity[];
  totalEntities: number;
  processingTimeMs: number;
}

onFinish

Fires when the stream ends:
interface StreamFinishEvent {
  totalEntities: number;
  piiMap?: EncryptedPIIMap;
  totalProcessingTimeMs: number;
}

Accessing the PII Map

After the stream ends, retrieve the cumulative PII map:
stream.on('finish', () => {
  const piiMap = stream.getPiiMap();
  const stats = stream.stats;
});

Session Integration

Persist PII maps across streaming sessions:
import { createAnonymizerStream } from 'rehydra/streaming';
import { InMemoryKeyProvider, SQLitePIIStorageProvider } from 'rehydra';

const storage = new SQLitePIIStorageProvider('./pii.db');
await storage.initialize();

const stream = await createAnonymizerStream({
  sessionId: 'chat-123',
  keyProvider: new InMemoryKeyProvider(),
  piiStorageProvider: storage,
  saveIntervalMs: 5000,  // Save PII map every 5 seconds
});
If a session already has a stored PII map, the stream loads it and continues with consistent entity IDs.

Next Steps

LLM Proxy

Automatic anonymization for LLM API calls

Streaming API Reference

Complete streaming API documentation