1use crate::io::traits::{ExportSink, ExportableMemory};
9use crate::{Error, Result};
10use arrow::array::{ArrayRef, StringArray, UInt64Array};
11use arrow::datatypes::{DataType, Field, Schema};
12use arrow::record_batch::RecordBatch;
13use parquet::arrow::ArrowWriter;
14use parquet::file::properties::WriterProperties;
17use std::io::Write;
18use std::sync::Arc;
19
20pub struct ParquetExportSink<W: Write + Send> {
25 writer: Option<W>,
26 memories: Vec<ExportableMemory>,
28}
29
30impl<W: Write + Send> ParquetExportSink<W> {
31 pub const fn new(writer: W) -> Result<Self> {
37 Ok(Self {
38 writer: Some(writer),
39 memories: Vec::new(),
40 })
41 }
42
43 fn schema() -> Schema {
45 Schema::new(vec![
46 Field::new("id", DataType::Utf8, false),
47 Field::new("content", DataType::Utf8, false),
48 Field::new("namespace", DataType::Utf8, false),
49 Field::new("domain", DataType::Utf8, false),
50 Field::new("project_id", DataType::Utf8, true),
51 Field::new("branch", DataType::Utf8, true),
52 Field::new("file_path", DataType::Utf8, true),
53 Field::new("status", DataType::Utf8, false),
54 Field::new("created_at", DataType::UInt64, false),
55 Field::new("updated_at", DataType::UInt64, false),
56 Field::new("tags", DataType::Utf8, false), Field::new("source", DataType::Utf8, true),
58 ])
59 }
60
61 fn to_record_batch(&self) -> Result<RecordBatch> {
63 let schema = Arc::new(Self::schema());
64
65 let ids: StringArray = self.memories.iter().map(|m| Some(m.id.as_str())).collect();
66 let contents: StringArray = self
67 .memories
68 .iter()
69 .map(|m| Some(m.content.as_str()))
70 .collect();
71 let namespaces: StringArray = self
72 .memories
73 .iter()
74 .map(|m| Some(m.namespace.as_str()))
75 .collect();
76 let domains: StringArray = self
77 .memories
78 .iter()
79 .map(|m| Some(m.domain.as_str()))
80 .collect();
81 let project_ids: StringArray = self
82 .memories
83 .iter()
84 .map(|m| m.project_id.as_deref())
85 .collect();
86 let branches: StringArray = self.memories.iter().map(|m| m.branch.as_deref()).collect();
87 let file_paths: StringArray = self
88 .memories
89 .iter()
90 .map(|m| m.file_path.as_deref())
91 .collect();
92 let statuses: StringArray = self
93 .memories
94 .iter()
95 .map(|m| Some(m.status.as_str()))
96 .collect();
97 let created_ats: UInt64Array = self.memories.iter().map(|m| Some(m.created_at)).collect();
98 let updated_ats: UInt64Array = self.memories.iter().map(|m| Some(m.updated_at)).collect();
99 let tags: StringArray = self
100 .memories
101 .iter()
102 .map(|m| Some(m.tags.join(",")))
103 .collect();
104 let sources: StringArray = self.memories.iter().map(|m| m.source.as_deref()).collect();
105
106 let columns: Vec<ArrayRef> = vec![
107 Arc::new(ids),
108 Arc::new(contents),
109 Arc::new(namespaces),
110 Arc::new(domains),
111 Arc::new(project_ids),
112 Arc::new(branches),
113 Arc::new(file_paths),
114 Arc::new(statuses),
115 Arc::new(created_ats),
116 Arc::new(updated_ats),
117 Arc::new(tags),
118 Arc::new(sources),
119 ];
120
121 RecordBatch::try_new(schema, columns).map_err(|e| Error::OperationFailed {
122 operation: "create_record_batch".to_string(),
123 cause: format!("Failed to create record batch: {e}"),
124 })
125 }
126}
127
128impl<W: Write + Send + 'static> ExportSink for ParquetExportSink<W> {
129 fn write(&mut self, memory: &ExportableMemory) -> Result<()> {
130 self.memories.push(memory.clone());
131 Ok(())
132 }
133
134 fn finalize(mut self: Box<Self>) -> Result<()> {
135 if self.memories.is_empty() {
136 return Ok(());
137 }
138
139 let writer = self.writer.take().ok_or_else(|| Error::OperationFailed {
140 operation: "parquet_finalize".to_string(),
141 cause: "Writer already consumed".to_string(),
142 })?;
143
144 let schema = Arc::new(Self::schema());
145 let props = WriterProperties::builder().build();
146
147 let mut arrow_writer = ArrowWriter::try_new(writer, schema, Some(props)).map_err(|e| {
148 Error::OperationFailed {
149 operation: "parquet_writer_create".to_string(),
150 cause: format!("Failed to create Parquet writer: {e}"),
151 }
152 })?;
153
154 let batch = self.to_record_batch()?;
155 arrow_writer
156 .write(&batch)
157 .map_err(|e| Error::OperationFailed {
158 operation: "parquet_write".to_string(),
159 cause: format!("Failed to write Parquet batch: {e}"),
160 })?;
161
162 arrow_writer.close().map_err(|e| Error::OperationFailed {
163 operation: "parquet_close".to_string(),
164 cause: format!("Failed to close Parquet writer: {e}"),
165 })?;
166
167 Ok(())
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use std::io::Write as IoWrite;
175 use std::sync::{Arc, Mutex};
176
177 #[derive(Clone)]
179 struct SharedBuffer(Arc<Mutex<Vec<u8>>>);
180
181 impl SharedBuffer {
182 fn new() -> Self {
183 Self(Arc::new(Mutex::new(Vec::new())))
184 }
185
186 fn into_inner(self) -> Vec<u8> {
187 Arc::try_unwrap(self.0)
188 .map(|m| m.into_inner().unwrap_or_default())
189 .unwrap_or_default()
190 }
191 }
192
193 impl IoWrite for SharedBuffer {
194 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
195 self.0
196 .lock()
197 .map_err(|e| std::io::Error::other(e.to_string()))?
198 .write(buf)
199 }
200
201 fn flush(&mut self) -> std::io::Result<()> {
202 Ok(())
203 }
204 }
205
206 #[test]
207 fn test_parquet_export() {
208 let buffer = SharedBuffer::new();
209 let buffer_clone = buffer.clone();
210
211 let mut sink = ParquetExportSink::new(buffer).unwrap();
212 sink.write(&ExportableMemory {
213 id: "test-1".to_string(),
214 content: "Test memory content".to_string(),
215 namespace: "decisions".to_string(),
216 domain: "project".to_string(),
217 project_id: Some("test-repo".to_string()),
218 branch: Some("main".to_string()),
219 file_path: None,
220 status: "active".to_string(),
221 created_at: 1_234_567_890,
222 updated_at: 1_234_567_890,
223 tags: vec!["rust".to_string(), "test".to_string()],
224 source: Some("test.rs".to_string()),
225 })
226 .unwrap();
227 Box::new(sink).finalize().unwrap();
228
229 let data = buffer_clone.into_inner();
231 assert!(!data.is_empty());
232 assert_eq!(&data[0..4], b"PAR1");
233 }
234
235 #[test]
236 fn test_parquet_empty_export() {
237 let buffer = SharedBuffer::new();
238 let buffer_clone = buffer.clone();
239
240 let sink = ParquetExportSink::new(buffer).unwrap();
241 Box::new(sink).finalize().unwrap();
242
243 let data = buffer_clone.into_inner();
245 assert!(data.is_empty());
246 }
247
248 #[test]
249 fn test_schema_fields() {
250 let schema = ParquetExportSink::<Vec<u8>>::schema();
251 assert_eq!(schema.fields().len(), 12);
252
253 let id_field = schema.field_with_name("id").unwrap();
255 assert!(!id_field.is_nullable());
256
257 let project_id_field = schema.field_with_name("project_id").unwrap();
259 assert!(project_id_field.is_nullable());
260 }
261}