1use crate::io::traits::{ExportSink, ExportableMemory, ImportSource, ImportedMemory};
6use crate::{Error, Result};
7use std::io::{BufRead, Write};
8
9pub struct CsvImportSource<R: BufRead> {
14 reader: csv::Reader<R>,
16 column_map: ColumnMap,
18}
19
20#[derive(Debug, Default)]
22struct ColumnMap {
23 content: Option<usize>,
24 namespace: Option<usize>,
25 domain: Option<usize>,
26 tags: Option<usize>,
27 source: Option<usize>,
28 created_at: Option<usize>,
29 ttl_seconds: Option<usize>,
30}
31
32impl ColumnMap {
33 fn from_headers(headers: &csv::StringRecord) -> Result<Self> {
35 let mut map = Self::default();
36
37 for (i, header) in headers.iter().enumerate() {
38 match header.to_lowercase().as_str() {
39 "content" | "text" | "memory" | "body" => map.content = Some(i),
40 "namespace" | "ns" | "category" | "type" => map.namespace = Some(i),
41 "domain" | "scope" => map.domain = Some(i),
42 "tags" | "labels" | "keywords" => map.tags = Some(i),
43 "source" | "src" | "origin" | "file" => map.source = Some(i),
44 "created_at" | "created" | "timestamp" | "date" => map.created_at = Some(i),
45 "ttl" | "ttl_seconds" | "expiry" => map.ttl_seconds = Some(i),
46 _ => {}, }
48 }
49
50 if map.content.is_none() {
52 return Err(Error::InvalidInput(
53 "CSV must have a 'content' column (or 'text', 'memory', 'body')".to_string(),
54 ));
55 }
56
57 Ok(map)
58 }
59}
60
61impl<R: BufRead> CsvImportSource<R> {
62 pub fn new(reader: R) -> Result<Self> {
68 let mut csv_reader = csv::ReaderBuilder::new()
69 .has_headers(true)
70 .flexible(true) .trim(csv::Trim::All)
72 .from_reader(reader);
73
74 let headers = csv_reader
75 .headers()
76 .map_err(|e| Error::OperationFailed {
77 operation: "read_csv_headers".to_string(),
78 cause: e.to_string(),
79 })?
80 .clone();
81 let column_map = ColumnMap::from_headers(&headers)?;
82
83 Ok(Self {
84 reader: csv_reader,
85 column_map,
86 })
87 }
88
89 fn parse_record(&self, record: &csv::StringRecord) -> Result<ImportedMemory> {
91 let get_field = |idx: Option<usize>| -> Option<String> {
92 idx.and_then(|i| record.get(i))
93 .map(str::trim)
94 .filter(|s| !s.is_empty())
95 .map(String::from)
96 };
97
98 let content = get_field(self.column_map.content)
99 .ok_or_else(|| Error::InvalidInput("Missing content field".to_string()))?;
100
101 let tags = get_field(self.column_map.tags)
102 .map(|t| {
103 t.split([',', ';', '|'])
104 .map(str::trim)
105 .filter(|s| !s.is_empty())
106 .map(String::from)
107 .collect()
108 })
109 .unwrap_or_default();
110
111 let created_at = get_field(self.column_map.created_at).and_then(|s| s.parse::<u64>().ok());
112
113 let ttl_seconds =
114 get_field(self.column_map.ttl_seconds).and_then(|s| s.parse::<u64>().ok());
115
116 Ok(ImportedMemory {
117 content,
118 namespace: get_field(self.column_map.namespace),
119 domain: get_field(self.column_map.domain),
120 tags,
121 source: get_field(self.column_map.source),
122 created_at,
123 ttl_seconds,
124 })
125 }
126}
127
128impl<R: BufRead> ImportSource for CsvImportSource<R> {
129 fn next(&mut self) -> Result<Option<ImportedMemory>> {
130 let mut record = csv::StringRecord::new();
131
132 let has_record =
133 self.reader
134 .read_record(&mut record)
135 .map_err(|e| Error::OperationFailed {
136 operation: "read_csv".to_string(),
137 cause: e.to_string(),
138 })?;
139 if !has_record {
140 return Ok(None);
141 }
142
143 let memory = self.parse_record(&record)?;
144 Ok(Some(memory))
145 }
146}
147
148pub struct CsvExportSink<W: Write> {
152 writer: csv::Writer<W>,
153 headers_written: bool,
155}
156
157impl<W: Write> CsvExportSink<W> {
158 pub fn new(writer: W) -> Result<Self> {
164 let csv_writer = csv::WriterBuilder::new()
165 .has_headers(false) .from_writer(writer);
167
168 Ok(Self {
169 writer: csv_writer,
170 headers_written: false,
171 })
172 }
173
174 fn ensure_headers(&mut self) -> Result<()> {
176 if !self.headers_written {
177 self.writer
178 .write_record([
179 "id",
180 "content",
181 "namespace",
182 "domain",
183 "project_id",
184 "branch",
185 "file_path",
186 "status",
187 "created_at",
188 "updated_at",
189 "tags",
190 "source",
191 ])
192 .map_err(|e| Error::OperationFailed {
193 operation: "write_csv_headers".to_string(),
194 cause: e.to_string(),
195 })?;
196 self.headers_written = true;
197 }
198 Ok(())
199 }
200}
201
202impl<W: Write + Send> ExportSink for CsvExportSink<W> {
203 fn write(&mut self, memory: &ExportableMemory) -> Result<()> {
204 self.ensure_headers()?;
205
206 self.writer
207 .write_record([
208 &memory.id,
209 &memory.content,
210 &memory.namespace,
211 &memory.domain,
212 memory.project_id.as_deref().unwrap_or(""),
213 memory.branch.as_deref().unwrap_or(""),
214 memory.file_path.as_deref().unwrap_or(""),
215 &memory.status,
216 &memory.created_at.to_string(),
217 &memory.updated_at.to_string(),
218 &memory.tags.join(","),
219 memory.source.as_deref().unwrap_or(""),
220 ])
221 .map_err(|e| Error::OperationFailed {
222 operation: "write_csv".to_string(),
223 cause: e.to_string(),
224 })?;
225
226 Ok(())
227 }
228
229 fn finalize(mut self: Box<Self>) -> Result<()> {
230 self.writer.flush().map_err(|e| Error::OperationFailed {
231 operation: "flush_csv".to_string(),
232 cause: e.to_string(),
233 })?;
234 Ok(())
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use std::io::Cursor;
242
243 #[test]
244 fn test_import_basic_csv() {
245 let input = r#"content,namespace,tags
246"First memory",decisions,"rust,test"
247"Second memory",learnings,""
248"#;
249 let mut source = CsvImportSource::new(Cursor::new(input)).unwrap();
250
251 let first = source.next().unwrap().unwrap();
252 assert_eq!(first.content, "First memory");
253 assert_eq!(first.namespace, Some("decisions".to_string()));
254 assert_eq!(first.tags, vec!["rust", "test"]);
255
256 let second = source.next().unwrap().unwrap();
257 assert_eq!(second.content, "Second memory");
258 assert_eq!(second.namespace, Some("learnings".to_string()));
259 assert!(second.tags.is_empty());
260
261 assert!(source.next().unwrap().is_none());
262 }
263
264 #[test]
265 fn test_import_alternative_headers() {
266 let input = r#"text,category,labels
267"Memory content",decisions,"tag1|tag2"
268"#;
269 let mut source = CsvImportSource::new(Cursor::new(input)).unwrap();
270
271 let memory = source.next().unwrap().unwrap();
272 assert_eq!(memory.content, "Memory content");
273 assert_eq!(memory.namespace, Some("decisions".to_string()));
274 assert_eq!(memory.tags, vec!["tag1", "tag2"]);
275 }
276
277 #[test]
278 fn test_import_missing_content_column() {
279 let input = "namespace,tags\ndecisions,test\n";
280 let result = CsvImportSource::new(Cursor::new(input));
281 assert!(result.is_err());
282 }
283
284 #[test]
285 fn test_export_csv() {
286 let mut output = Vec::new();
287 {
288 let mut sink = CsvExportSink::new(&mut output).unwrap();
289 sink.write(&ExportableMemory {
290 id: "1".to_string(),
291 content: "Test memory".to_string(),
292 namespace: "decisions".to_string(),
293 domain: "project".to_string(),
294 project_id: Some("repo".to_string()),
295 branch: None,
296 file_path: None,
297 status: "active".to_string(),
298 created_at: 1_234_567_890,
299 updated_at: 1_234_567_890,
300 tags: vec!["rust".to_string(), "test".to_string()],
301 source: None,
302 })
303 .unwrap();
304 Box::new(sink).finalize().unwrap();
305 }
306
307 let output_str = String::from_utf8(output).unwrap();
308 assert!(output_str.contains("id,content,namespace"));
309 assert!(output_str.contains("Test memory"));
310 assert!(output_str.contains("rust,test"));
311 }
312
313 #[test]
314 fn test_tag_delimiters() {
315 let input = r#"content,tags
317"Memory 1","a,b,c"
318"Memory 2","x;y;z"
319"Memory 3","p|q|r"
320"#;
321 let mut source = CsvImportSource::new(Cursor::new(input)).unwrap();
322
323 let m1 = source.next().unwrap().unwrap();
324 assert_eq!(m1.tags, vec!["a", "b", "c"]);
325
326 let m2 = source.next().unwrap().unwrap();
327 assert_eq!(m2.tags, vec!["x", "y", "z"]);
328
329 let m3 = source.next().unwrap().unwrap();
330 assert_eq!(m3.tags, vec!["p", "q", "r"]);
331 }
332}