Skip to main content

subcog/io/formats/
csv.rs

1//! CSV format adapter for import/export.
2//!
3//! Supports configurable column mapping with sensible defaults.
4
5use crate::io::traits::{ExportSink, ExportableMemory, ImportSource, ImportedMemory};
6use crate::{Error, Result};
7use std::io::{BufRead, Write};
8
9/// CSV import source.
10///
11/// Reads CSV files with configurable column mapping.
12/// First row is expected to be headers unless configured otherwise.
13pub struct CsvImportSource<R: BufRead> {
14    /// CSV reader.
15    reader: csv::Reader<R>,
16    /// Column indices for each field.
17    column_map: ColumnMap,
18}
19
20/// Maps CSV column indices to memory fields.
21#[derive(Debug, Default)]
22struct ColumnMap {
23    content: Option<usize>,
24    namespace: Option<usize>,
25    domain: Option<usize>,
26    tags: Option<usize>,
27    source: Option<usize>,
28    created_at: Option<usize>,
29    ttl_seconds: Option<usize>,
30}
31
32impl ColumnMap {
33    /// Creates a column map from CSV headers.
34    fn from_headers(headers: &csv::StringRecord) -> Result<Self> {
35        let mut map = Self::default();
36
37        for (i, header) in headers.iter().enumerate() {
38            match header.to_lowercase().as_str() {
39                "content" | "text" | "memory" | "body" => map.content = Some(i),
40                "namespace" | "ns" | "category" | "type" => map.namespace = Some(i),
41                "domain" | "scope" => map.domain = Some(i),
42                "tags" | "labels" | "keywords" => map.tags = Some(i),
43                "source" | "src" | "origin" | "file" => map.source = Some(i),
44                "created_at" | "created" | "timestamp" | "date" => map.created_at = Some(i),
45                "ttl" | "ttl_seconds" | "expiry" => map.ttl_seconds = Some(i),
46                _ => {}, // Ignore unknown columns
47            }
48        }
49
50        // Content is required
51        if map.content.is_none() {
52            return Err(Error::InvalidInput(
53                "CSV must have a 'content' column (or 'text', 'memory', 'body')".to_string(),
54            ));
55        }
56
57        Ok(map)
58    }
59}
60
61impl<R: BufRead> CsvImportSource<R> {
62    /// Creates a new CSV import source.
63    ///
64    /// # Errors
65    ///
66    /// Returns an error if headers cannot be read or 'content' column is missing.
67    pub fn new(reader: R) -> Result<Self> {
68        let mut csv_reader = csv::ReaderBuilder::new()
69            .has_headers(true)
70            .flexible(true) // Allow varying number of fields
71            .trim(csv::Trim::All)
72            .from_reader(reader);
73
74        let headers = csv_reader
75            .headers()
76            .map_err(|e| Error::OperationFailed {
77                operation: "read_csv_headers".to_string(),
78                cause: e.to_string(),
79            })?
80            .clone();
81        let column_map = ColumnMap::from_headers(&headers)?;
82
83        Ok(Self {
84            reader: csv_reader,
85            column_map,
86        })
87    }
88
89    /// Parses a record into an imported memory.
90    fn parse_record(&self, record: &csv::StringRecord) -> Result<ImportedMemory> {
91        let get_field = |idx: Option<usize>| -> Option<String> {
92            idx.and_then(|i| record.get(i))
93                .map(str::trim)
94                .filter(|s| !s.is_empty())
95                .map(String::from)
96        };
97
98        let content = get_field(self.column_map.content)
99            .ok_or_else(|| Error::InvalidInput("Missing content field".to_string()))?;
100
101        let tags = get_field(self.column_map.tags)
102            .map(|t| {
103                t.split([',', ';', '|'])
104                    .map(str::trim)
105                    .filter(|s| !s.is_empty())
106                    .map(String::from)
107                    .collect()
108            })
109            .unwrap_or_default();
110
111        let created_at = get_field(self.column_map.created_at).and_then(|s| s.parse::<u64>().ok());
112
113        let ttl_seconds =
114            get_field(self.column_map.ttl_seconds).and_then(|s| s.parse::<u64>().ok());
115
116        Ok(ImportedMemory {
117            content,
118            namespace: get_field(self.column_map.namespace),
119            domain: get_field(self.column_map.domain),
120            tags,
121            source: get_field(self.column_map.source),
122            created_at,
123            ttl_seconds,
124        })
125    }
126}
127
128impl<R: BufRead> ImportSource for CsvImportSource<R> {
129    fn next(&mut self) -> Result<Option<ImportedMemory>> {
130        let mut record = csv::StringRecord::new();
131
132        let has_record =
133            self.reader
134                .read_record(&mut record)
135                .map_err(|e| Error::OperationFailed {
136                    operation: "read_csv".to_string(),
137                    cause: e.to_string(),
138                })?;
139        if !has_record {
140            return Ok(None);
141        }
142
143        let memory = self.parse_record(&record)?;
144        Ok(Some(memory))
145    }
146}
147
148/// CSV export sink.
149///
150/// Writes memories as CSV with headers.
151pub struct CsvExportSink<W: Write> {
152    writer: csv::Writer<W>,
153    /// Whether headers have been written.
154    headers_written: bool,
155}
156
157impl<W: Write> CsvExportSink<W> {
158    /// Creates a new CSV export sink.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the writer cannot be created.
163    pub fn new(writer: W) -> Result<Self> {
164        let csv_writer = csv::WriterBuilder::new()
165            .has_headers(false) // We write headers manually
166            .from_writer(writer);
167
168        Ok(Self {
169            writer: csv_writer,
170            headers_written: false,
171        })
172    }
173
174    /// Writes headers if not already written.
175    fn ensure_headers(&mut self) -> Result<()> {
176        if !self.headers_written {
177            self.writer
178                .write_record([
179                    "id",
180                    "content",
181                    "namespace",
182                    "domain",
183                    "project_id",
184                    "branch",
185                    "file_path",
186                    "status",
187                    "created_at",
188                    "updated_at",
189                    "tags",
190                    "source",
191                ])
192                .map_err(|e| Error::OperationFailed {
193                    operation: "write_csv_headers".to_string(),
194                    cause: e.to_string(),
195                })?;
196            self.headers_written = true;
197        }
198        Ok(())
199    }
200}
201
202impl<W: Write + Send> ExportSink for CsvExportSink<W> {
203    fn write(&mut self, memory: &ExportableMemory) -> Result<()> {
204        self.ensure_headers()?;
205
206        self.writer
207            .write_record([
208                &memory.id,
209                &memory.content,
210                &memory.namespace,
211                &memory.domain,
212                memory.project_id.as_deref().unwrap_or(""),
213                memory.branch.as_deref().unwrap_or(""),
214                memory.file_path.as_deref().unwrap_or(""),
215                &memory.status,
216                &memory.created_at.to_string(),
217                &memory.updated_at.to_string(),
218                &memory.tags.join(","),
219                memory.source.as_deref().unwrap_or(""),
220            ])
221            .map_err(|e| Error::OperationFailed {
222                operation: "write_csv".to_string(),
223                cause: e.to_string(),
224            })?;
225
226        Ok(())
227    }
228
229    fn finalize(mut self: Box<Self>) -> Result<()> {
230        self.writer.flush().map_err(|e| Error::OperationFailed {
231            operation: "flush_csv".to_string(),
232            cause: e.to_string(),
233        })?;
234        Ok(())
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use std::io::Cursor;
242
243    #[test]
244    fn test_import_basic_csv() {
245        let input = r#"content,namespace,tags
246"First memory",decisions,"rust,test"
247"Second memory",learnings,""
248"#;
249        let mut source = CsvImportSource::new(Cursor::new(input)).unwrap();
250
251        let first = source.next().unwrap().unwrap();
252        assert_eq!(first.content, "First memory");
253        assert_eq!(first.namespace, Some("decisions".to_string()));
254        assert_eq!(first.tags, vec!["rust", "test"]);
255
256        let second = source.next().unwrap().unwrap();
257        assert_eq!(second.content, "Second memory");
258        assert_eq!(second.namespace, Some("learnings".to_string()));
259        assert!(second.tags.is_empty());
260
261        assert!(source.next().unwrap().is_none());
262    }
263
264    #[test]
265    fn test_import_alternative_headers() {
266        let input = r#"text,category,labels
267"Memory content",decisions,"tag1|tag2"
268"#;
269        let mut source = CsvImportSource::new(Cursor::new(input)).unwrap();
270
271        let memory = source.next().unwrap().unwrap();
272        assert_eq!(memory.content, "Memory content");
273        assert_eq!(memory.namespace, Some("decisions".to_string()));
274        assert_eq!(memory.tags, vec!["tag1", "tag2"]);
275    }
276
277    #[test]
278    fn test_import_missing_content_column() {
279        let input = "namespace,tags\ndecisions,test\n";
280        let result = CsvImportSource::new(Cursor::new(input));
281        assert!(result.is_err());
282    }
283
284    #[test]
285    fn test_export_csv() {
286        let mut output = Vec::new();
287        {
288            let mut sink = CsvExportSink::new(&mut output).unwrap();
289            sink.write(&ExportableMemory {
290                id: "1".to_string(),
291                content: "Test memory".to_string(),
292                namespace: "decisions".to_string(),
293                domain: "project".to_string(),
294                project_id: Some("repo".to_string()),
295                branch: None,
296                file_path: None,
297                status: "active".to_string(),
298                created_at: 1_234_567_890,
299                updated_at: 1_234_567_890,
300                tags: vec!["rust".to_string(), "test".to_string()],
301                source: None,
302            })
303            .unwrap();
304            Box::new(sink).finalize().unwrap();
305        }
306
307        let output_str = String::from_utf8(output).unwrap();
308        assert!(output_str.contains("id,content,namespace"));
309        assert!(output_str.contains("Test memory"));
310        assert!(output_str.contains("rust,test"));
311    }
312
313    #[test]
314    fn test_tag_delimiters() {
315        // Test various tag delimiters
316        let input = r#"content,tags
317"Memory 1","a,b,c"
318"Memory 2","x;y;z"
319"Memory 3","p|q|r"
320"#;
321        let mut source = CsvImportSource::new(Cursor::new(input)).unwrap();
322
323        let m1 = source.next().unwrap().unwrap();
324        assert_eq!(m1.tags, vec!["a", "b", "c"]);
325
326        let m2 = source.next().unwrap().unwrap();
327        assert_eq!(m2.tags, vec!["x", "y", "z"]);
328
329        let m3 = source.next().unwrap().unwrap();
330        assert_eq!(m3.tags, vec!["p", "q", "r"]);
331    }
332}