Streaming Plan
Overview
Section titled “Overview”This plan explores options for adding streaming input capabilities to rlm-rs, enabling processing of unbounded data streams without loading entire files into memory.
User Requirements:
- Primary use case: CLI pipeline integration (
cat | rlm-rs | grep) - Input sources: stdin, large files, AND network/API streams
- Exploration of all architecture options before committing
Current Architecture Summary
Section titled “Current Architecture Summary”Based on codebase exploration:
| Component | File | Current Pattern |
|---|---|---|
| File I/O | src/io/reader.rs | read_file() → Arc<[u8]> (full file in memory) |
| Chunking | src/chunking/traits.rs | chunk(&str) → Vec<Chunk> (batch processing) |
| Storage | src/storage/sqlite.rs | insert_chunks(Vec<Chunk>) (batch insert) |
| Search | src/search/mod.rs | hybrid_search() → Vec<SearchResult> |
Key Traits: All traits require Send + Sync for thread safety.
Architecture Options Comparison
Section titled “Architecture Options Comparison”Option A: Synchronous Iterator Pattern
Section titled “Option A: Synchronous Iterator Pattern”Approach: Extend existing traits with iterator-based streaming methods.
API Design:
pub trait StreamingChunker: Send + Sync { fn stream_chunks<R: Read + Send>( &self, buffer_id: i64, source: R, metadata: Option<&ChunkMetadata>, ) -> Result<Box<dyn ChunkStream>>;}| Aspect | Assessment |
|---|---|
| Complexity | Low - standard Rust patterns |
| Dependencies | None new |
| Binary size impact | Minimal |
| CLI pipeline fit | Excellent |
| Network support | Limited (blocking I/O) |
| Memory control | Easy to bound |
CLI Usage:
cat large_file.txt | rlm-cli load --stdin --name "piped"rlm-cli search "query" --stream | head -10tail -f /var/log/app.log | rlm-cli load --stdin --incrementalOption B: Async Streams (tokio/futures)
Section titled “Option B: Async Streams (tokio/futures)”Approach: Use Stream trait with async/await for non-blocking I/O.
API Design:
#[async_trait]pub trait AsyncChunker: Send + Sync { async fn stream_chunks<R: AsyncRead + Send + Unpin + 'static>( &self, buffer_id: i64, source: R, ) -> Result<Pin<Box<dyn Stream<Item = Result<Chunk>> + Send>>>;}| Aspect | Assessment |
|---|---|
| Complexity | Medium-High |
| Dependencies | +tokio, +futures, +async-trait |
| Binary size impact | +1-2MB |
| CLI pipeline fit | Good (via block_on) |
| Network support | Excellent (timeouts, cancellation) |
| Memory control | Good |
Option C: Channel-Based Pipeline
Section titled “Option C: Channel-Based Pipeline”Approach: Multi-stage pipeline with dedicated threads.
[Reader Thread] ──channel──> [Chunker Thread] ──channel──> [Storage Thread]| Aspect | Assessment |
|---|---|
| Complexity | High |
| Dependencies | +crossbeam |
| Binary size impact | Minimal |
| CLI pipeline fit | Good |
| Network support | Good |
| Throughput | Highest (parallel stages) |
Best for: Batch processing servers, heavy embedding workloads.
Option D: Hybrid Approach (Recommended for Full Requirements)
Section titled “Option D: Hybrid Approach (Recommended for Full Requirements)”Approach: Sync iterators for file/stdin, async for network (feature-gated).
pub enum InputSource { Sync(Box<dyn Read + Send>), #[cfg(feature = "async-network")] Async(Box<dyn AsyncRead + Send + Unpin>),}| Aspect | Assessment |
|---|---|
| Complexity | Medium |
| CLI pipeline fit | Excellent |
| Network support | Excellent (when enabled) |
| Flexibility | Maximum |
Recommendation Matrix
Section titled “Recommendation Matrix”| Requirement | Recommended Option |
|---|---|
| CLI file/stdin only | Sync Iterator |
| CLI + occasional network | Hybrid |
| Heavy network/API use | Async Streams |
| Batch processing server | Channel Pipeline |
For rlm-rs CLI primary use case: Start with Sync Iterator, extend to Hybrid later.
CLI Pipeline Implementation Details
Section titled “CLI Pipeline Implementation Details”cat file.txt | rlm-rs load --stdin
Section titled “cat file.txt | rlm-rs load --stdin”┌──────┐ ┌─────────────────────────────────────────┐│ cat │───>│ rlm-cli load --stdin ││ │ │ stdin → BufReader → Chunker → Storage │└──────┘ └─────────────────────────────────────────┘Key points:
- Detect pipe vs TTY:
!atty::is(atty::Stream::Stdin) - Use
BufReaderwith 64KB buffer - Batch storage writes (100 chunks per transaction)
- Progress to stderr (doesn’t interfere with pipeline)
rlm-rs search "query" | head -10
Section titled “rlm-rs search "query" | head -10”┌─────────────────────┐ ┌──────┐│ rlm-cli search │───>│ head ││ write(line) │ │ ││ if EPIPE: break │ └──────┘└─────────────────────┘Key points:
- Install
signal(SIGPIPE, SIG_IGN)on Unix - Check
write_all()forBrokenPipeerror - Flush after each line
Signal Handling
Section titled “Signal Handling”// SIGPIPE: Ignore, let EPIPE propagate// SIGINT (Ctrl+C): Set shutdown flag, second Ctrl+C force exitsctrlc::set_handler(|| { if SHUTDOWN.swap(true, Ordering::SeqCst) { std::process::exit(130); // Second Ctrl+C }})?;Implementation Plan
Section titled “Implementation Plan”Phase 1: Sync Iterator Foundation
Section titled “Phase 1: Sync Iterator Foundation”- Add
StdinReadertosrc/io/stdin.rs - Add
StreamingChunkertrait tosrc/chunking/streaming.rs - Implement streaming for
FixedChunker - Add
--stdinflag toloadcommand - Add signal handling (SIGPIPE, SIGINT)
- Add progress reporting to stderr
Phase 2: Streaming Output
Section titled “Phase 2: Streaming Output”- Add
--streamflag tosearchcommand - Line-by-line output mode
- Handle
head -Ngracefully (EPIPE)
Phase 3: Storage Batching
Section titled “Phase 3: Storage Batching”- Add
stream_insert_chunks()with configurable batch size - Transaction batching for efficiency
Phase 4 (Optional): Network Sources
Section titled “Phase 4 (Optional): Network Sources”- Add
async-networkfeature flag - Add
--urloption to load command - Implement
AsyncChunkerwith timeout support
Files to Modify
Section titled “Files to Modify”| File | Changes |
|---|---|
src/io/stdin.rs | NEW - StdinReader implementation |
src/io/mod.rs | Re-export stdin module |
src/chunking/streaming.rs | NEW - StreamingChunker trait |
src/chunking/fixed.rs | Add streaming implementation |
src/chunking/mod.rs | Re-export streaming types |
src/storage/sqlite.rs | Add stream_insert_chunks() |
src/cli/parser.rs | Add --stdin, --stream flags |
src/cli/commands.rs | Wire up streaming pipeline |
src/cli/signals.rs | NEW - Signal handling |
src/cli/progress.rs | NEW - Progress reporting |
src/main.rs | Install signal handlers |
Verification
Section titled “Verification”-
Unit tests:
- Stream chunking with various inputs
- UTF-8 boundary handling at chunk edges
- Overlap buffer correctness
-
Integration tests:
echo "test" | cargo run -- load --stdincargo run -- search "query" | head -5- Large file streaming without OOM
-
Memory tests:
- Process 1GB file, verify <50MB peak memory
valgrind --tool=massifprofiling
-
Signal tests:
- Ctrl+C graceful shutdown
- Pipe to
headwithout errors