Skip to content

Streaming Plan

This plan explores options for adding streaming input capabilities to rlm-rs, enabling processing of unbounded data streams without loading entire files into memory.

User Requirements:

  • Primary use case: CLI pipeline integration (cat | rlm-rs | grep)
  • Input sources: stdin, large files, AND network/API streams
  • Exploration of all architecture options before committing

Based on codebase exploration:

ComponentFileCurrent Pattern
File I/Osrc/io/reader.rsread_file()Arc<[u8]> (full file in memory)
Chunkingsrc/chunking/traits.rschunk(&str)Vec<Chunk> (batch processing)
Storagesrc/storage/sqlite.rsinsert_chunks(Vec<Chunk>) (batch insert)
Searchsrc/search/mod.rshybrid_search()Vec<SearchResult>

Key Traits: All traits require Send + Sync for thread safety.


Approach: Extend existing traits with iterator-based streaming methods.

API Design:

pub trait StreamingChunker: Send + Sync {
fn stream_chunks<R: Read + Send>(
&self,
buffer_id: i64,
source: R,
metadata: Option<&ChunkMetadata>,
) -> Result<Box<dyn ChunkStream>>;
}
AspectAssessment
ComplexityLow - standard Rust patterns
DependenciesNone new
Binary size impactMinimal
CLI pipeline fitExcellent
Network supportLimited (blocking I/O)
Memory controlEasy to bound

CLI Usage:

Terminal window
cat large_file.txt | rlm-cli load --stdin --name "piped"
rlm-cli search "query" --stream | head -10
tail -f /var/log/app.log | rlm-cli load --stdin --incremental

Approach: Use Stream trait with async/await for non-blocking I/O.

API Design:

#[async_trait]
pub trait AsyncChunker: Send + Sync {
async fn stream_chunks<R: AsyncRead + Send + Unpin + 'static>(
&self,
buffer_id: i64,
source: R,
) -> Result<Pin<Box<dyn Stream<Item = Result<Chunk>> + Send>>>;
}
AspectAssessment
ComplexityMedium-High
Dependencies+tokio, +futures, +async-trait
Binary size impact+1-2MB
CLI pipeline fitGood (via block_on)
Network supportExcellent (timeouts, cancellation)
Memory controlGood

Approach: Multi-stage pipeline with dedicated threads.

[Reader Thread] ──channel──> [Chunker Thread] ──channel──> [Storage Thread]
AspectAssessment
ComplexityHigh
Dependencies+crossbeam
Binary size impactMinimal
CLI pipeline fitGood
Network supportGood
ThroughputHighest (parallel stages)

Best for: Batch processing servers, heavy embedding workloads.


Section titled “Option D: Hybrid Approach (Recommended for Full Requirements)”

Approach: Sync iterators for file/stdin, async for network (feature-gated).

pub enum InputSource {
Sync(Box<dyn Read + Send>),
#[cfg(feature = "async-network")]
Async(Box<dyn AsyncRead + Send + Unpin>),
}
AspectAssessment
ComplexityMedium
CLI pipeline fitExcellent
Network supportExcellent (when enabled)
FlexibilityMaximum

RequirementRecommended Option
CLI file/stdin onlySync Iterator
CLI + occasional networkHybrid
Heavy network/API useAsync Streams
Batch processing serverChannel Pipeline

For rlm-rs CLI primary use case: Start with Sync Iterator, extend to Hybrid later.


┌──────┐ ┌─────────────────────────────────────────┐
│ cat │───>│ rlm-cli load --stdin │
│ │ │ stdin → BufReader → Chunker → Storage │
└──────┘ └─────────────────────────────────────────┘

Key points:

  • Detect pipe vs TTY: !atty::is(atty::Stream::Stdin)
  • Use BufReader with 64KB buffer
  • Batch storage writes (100 chunks per transaction)
  • Progress to stderr (doesn’t interfere with pipeline)
┌─────────────────────┐ ┌──────┐
│ rlm-cli search │───>│ head │
│ write(line) │ │ │
│ if EPIPE: break │ └──────┘
└─────────────────────┘

Key points:

  • Install signal(SIGPIPE, SIG_IGN) on Unix
  • Check write_all() for BrokenPipe error
  • Flush after each line
// SIGPIPE: Ignore, let EPIPE propagate
// SIGINT (Ctrl+C): Set shutdown flag, second Ctrl+C force exits
ctrlc::set_handler(|| {
if SHUTDOWN.swap(true, Ordering::SeqCst) {
std::process::exit(130); // Second Ctrl+C
}
})?;

  1. Add StdinReader to src/io/stdin.rs
  2. Add StreamingChunker trait to src/chunking/streaming.rs
  3. Implement streaming for FixedChunker
  4. Add --stdin flag to load command
  5. Add signal handling (SIGPIPE, SIGINT)
  6. Add progress reporting to stderr
  1. Add --stream flag to search command
  2. Line-by-line output mode
  3. Handle head -N gracefully (EPIPE)
  1. Add stream_insert_chunks() with configurable batch size
  2. Transaction batching for efficiency
  1. Add async-network feature flag
  2. Add --url option to load command
  3. Implement AsyncChunker with timeout support

FileChanges
src/io/stdin.rsNEW - StdinReader implementation
src/io/mod.rsRe-export stdin module
src/chunking/streaming.rsNEW - StreamingChunker trait
src/chunking/fixed.rsAdd streaming implementation
src/chunking/mod.rsRe-export streaming types
src/storage/sqlite.rsAdd stream_insert_chunks()
src/cli/parser.rsAdd --stdin, --stream flags
src/cli/commands.rsWire up streaming pipeline
src/cli/signals.rsNEW - Signal handling
src/cli/progress.rsNEW - Progress reporting
src/main.rsInstall signal handlers

  1. Unit tests:

    • Stream chunking with various inputs
    • UTF-8 boundary handling at chunk edges
    • Overlap buffer correctness
  2. Integration tests:

    • echo "test" | cargo run -- load --stdin
    • cargo run -- search "query" | head -5
    • Large file streaming without OOM
  3. Memory tests:

    • Process 1GB file, verify <50MB peak memory
    • valgrind --tool=massif profiling
  4. Signal tests:

    • Ctrl+C graceful shutdown
    • Pipe to head without errors