Skip to main content

subcog/io/formats/
parquet.rs

1//! Apache Parquet format adapter for export.
2//!
3//! Provides columnar storage format for efficient analytics queries.
4//! Requires the `parquet-export` feature.
5//!
6//! Note: Parquet import is not supported as it's primarily an analytics format.
7
8use crate::io::traits::{ExportSink, ExportableMemory};
9use crate::{Error, Result};
10use arrow::array::{ArrayRef, StringArray, UInt64Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow::record_batch::RecordBatch;
13use parquet::arrow::ArrowWriter;
14// Note: SNAPPY/ZSTD compression requires additional features on parquet crate.
15// Using uncompressed for simplicity since memory export files are typically small.
16use parquet::file::properties::WriterProperties;
17use std::io::Write;
18use std::sync::Arc;
19
20/// Parquet export sink.
21///
22/// Buffers memories and writes them as a Parquet file on finalize.
23/// Uses columnar storage with Snappy compression.
24pub struct ParquetExportSink<W: Write + Send> {
25    writer: Option<W>,
26    /// Buffered memories for batch writing.
27    memories: Vec<ExportableMemory>,
28}
29
30impl<W: Write + Send> ParquetExportSink<W> {
31    /// Creates a new Parquet export sink.
32    ///
33    /// # Errors
34    ///
35    /// Returns an error if initialization fails.
36    pub const fn new(writer: W) -> Result<Self> {
37        Ok(Self {
38            writer: Some(writer),
39            memories: Vec::new(),
40        })
41    }
42
43    /// Creates the Arrow schema for memories.
44    fn schema() -> Schema {
45        Schema::new(vec![
46            Field::new("id", DataType::Utf8, false),
47            Field::new("content", DataType::Utf8, false),
48            Field::new("namespace", DataType::Utf8, false),
49            Field::new("domain", DataType::Utf8, false),
50            Field::new("project_id", DataType::Utf8, true),
51            Field::new("branch", DataType::Utf8, true),
52            Field::new("file_path", DataType::Utf8, true),
53            Field::new("status", DataType::Utf8, false),
54            Field::new("created_at", DataType::UInt64, false),
55            Field::new("updated_at", DataType::UInt64, false),
56            Field::new("tags", DataType::Utf8, false), // Stored as comma-separated
57            Field::new("source", DataType::Utf8, true),
58        ])
59    }
60
61    /// Converts buffered memories to a record batch.
62    fn to_record_batch(&self) -> Result<RecordBatch> {
63        let schema = Arc::new(Self::schema());
64
65        let ids: StringArray = self.memories.iter().map(|m| Some(m.id.as_str())).collect();
66        let contents: StringArray = self
67            .memories
68            .iter()
69            .map(|m| Some(m.content.as_str()))
70            .collect();
71        let namespaces: StringArray = self
72            .memories
73            .iter()
74            .map(|m| Some(m.namespace.as_str()))
75            .collect();
76        let domains: StringArray = self
77            .memories
78            .iter()
79            .map(|m| Some(m.domain.as_str()))
80            .collect();
81        let project_ids: StringArray = self
82            .memories
83            .iter()
84            .map(|m| m.project_id.as_deref())
85            .collect();
86        let branches: StringArray = self.memories.iter().map(|m| m.branch.as_deref()).collect();
87        let file_paths: StringArray = self
88            .memories
89            .iter()
90            .map(|m| m.file_path.as_deref())
91            .collect();
92        let statuses: StringArray = self
93            .memories
94            .iter()
95            .map(|m| Some(m.status.as_str()))
96            .collect();
97        let created_ats: UInt64Array = self.memories.iter().map(|m| Some(m.created_at)).collect();
98        let updated_ats: UInt64Array = self.memories.iter().map(|m| Some(m.updated_at)).collect();
99        let tags: StringArray = self
100            .memories
101            .iter()
102            .map(|m| Some(m.tags.join(",")))
103            .collect();
104        let sources: StringArray = self.memories.iter().map(|m| m.source.as_deref()).collect();
105
106        let columns: Vec<ArrayRef> = vec![
107            Arc::new(ids),
108            Arc::new(contents),
109            Arc::new(namespaces),
110            Arc::new(domains),
111            Arc::new(project_ids),
112            Arc::new(branches),
113            Arc::new(file_paths),
114            Arc::new(statuses),
115            Arc::new(created_ats),
116            Arc::new(updated_ats),
117            Arc::new(tags),
118            Arc::new(sources),
119        ];
120
121        RecordBatch::try_new(schema, columns).map_err(|e| Error::OperationFailed {
122            operation: "create_record_batch".to_string(),
123            cause: format!("Failed to create record batch: {e}"),
124        })
125    }
126}
127
128impl<W: Write + Send + 'static> ExportSink for ParquetExportSink<W> {
129    fn write(&mut self, memory: &ExportableMemory) -> Result<()> {
130        self.memories.push(memory.clone());
131        Ok(())
132    }
133
134    fn finalize(mut self: Box<Self>) -> Result<()> {
135        if self.memories.is_empty() {
136            return Ok(());
137        }
138
139        let writer = self.writer.take().ok_or_else(|| Error::OperationFailed {
140            operation: "parquet_finalize".to_string(),
141            cause: "Writer already consumed".to_string(),
142        })?;
143
144        let schema = Arc::new(Self::schema());
145        let props = WriterProperties::builder().build();
146
147        let mut arrow_writer = ArrowWriter::try_new(writer, schema, Some(props)).map_err(|e| {
148            Error::OperationFailed {
149                operation: "parquet_writer_create".to_string(),
150                cause: format!("Failed to create Parquet writer: {e}"),
151            }
152        })?;
153
154        let batch = self.to_record_batch()?;
155        arrow_writer
156            .write(&batch)
157            .map_err(|e| Error::OperationFailed {
158                operation: "parquet_write".to_string(),
159                cause: format!("Failed to write Parquet batch: {e}"),
160            })?;
161
162        arrow_writer.close().map_err(|e| Error::OperationFailed {
163            operation: "parquet_close".to_string(),
164            cause: format!("Failed to close Parquet writer: {e}"),
165        })?;
166
167        Ok(())
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use std::io::Write as IoWrite;
175    use std::sync::{Arc, Mutex};
176
177    /// Shared buffer writer for tests that satisfies `'static` bound.
178    #[derive(Clone)]
179    struct SharedBuffer(Arc<Mutex<Vec<u8>>>);
180
181    impl SharedBuffer {
182        fn new() -> Self {
183            Self(Arc::new(Mutex::new(Vec::new())))
184        }
185
186        fn into_inner(self) -> Vec<u8> {
187            Arc::try_unwrap(self.0)
188                .map(|m| m.into_inner().unwrap_or_default())
189                .unwrap_or_default()
190        }
191    }
192
193    impl IoWrite for SharedBuffer {
194        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
195            self.0
196                .lock()
197                .map_err(|e| std::io::Error::other(e.to_string()))?
198                .write(buf)
199        }
200
201        fn flush(&mut self) -> std::io::Result<()> {
202            Ok(())
203        }
204    }
205
206    #[test]
207    fn test_parquet_export() {
208        let buffer = SharedBuffer::new();
209        let buffer_clone = buffer.clone();
210
211        let mut sink = ParquetExportSink::new(buffer).unwrap();
212        sink.write(&ExportableMemory {
213            id: "test-1".to_string(),
214            content: "Test memory content".to_string(),
215            namespace: "decisions".to_string(),
216            domain: "project".to_string(),
217            project_id: Some("test-repo".to_string()),
218            branch: Some("main".to_string()),
219            file_path: None,
220            status: "active".to_string(),
221            created_at: 1_234_567_890,
222            updated_at: 1_234_567_890,
223            tags: vec!["rust".to_string(), "test".to_string()],
224            source: Some("test.rs".to_string()),
225        })
226        .unwrap();
227        Box::new(sink).finalize().unwrap();
228
229        // Verify Parquet magic bytes (PAR1)
230        let data = buffer_clone.into_inner();
231        assert!(!data.is_empty());
232        assert_eq!(&data[0..4], b"PAR1");
233    }
234
235    #[test]
236    fn test_parquet_empty_export() {
237        let buffer = SharedBuffer::new();
238        let buffer_clone = buffer.clone();
239
240        let sink = ParquetExportSink::new(buffer).unwrap();
241        Box::new(sink).finalize().unwrap();
242
243        // Empty export should produce empty output
244        let data = buffer_clone.into_inner();
245        assert!(data.is_empty());
246    }
247
248    #[test]
249    fn test_schema_fields() {
250        let schema = ParquetExportSink::<Vec<u8>>::schema();
251        assert_eq!(schema.fields().len(), 12);
252
253        // Verify required fields are non-nullable
254        let id_field = schema.field_with_name("id").unwrap();
255        assert!(!id_field.is_nullable());
256
257        // Verify optional fields are nullable
258        let project_id_field = schema.field_with_name("project_id").unwrap();
259        assert!(project_id_field.is_nullable());
260    }
261}