ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/ns_dev/Python/NinoCode/Active_prgs/Redgrave/ConcordanceDatToDictionary.py
Revision: 940
Committed: Wed Sep 3 20:51:54 2025 UTC (6 months, 3 weeks ago) by nino.borges
Content type: text/x-python
File size: 4193 byte(s)
Log Message:
Adding named tuple method

File Contents

# User Rev Content
1 nino.borges 938 """
2    
3     ConcordanceDatToDictionary
4    
5     Created by:
6     Emanuel Borges
7     09.03.2025
8    
9     This program will take a DAT export from Relativity and will return a dictionary where the column names are the keys.
10    
11     """
12    
13     import csv
14 nino.borges 939 from collections import namedtuple
15     import re
16 nino.borges 938
17    
18     class ConcordanceLoader:
19     def __init__(self, filePath):
20     self.filePath = filePath
21     self.delimiter = '\x14' # ASCII 20
22     self.quotechar = '\xfe' # ASCII 254
23     self.records = []
24    
25    
26     def load(self):
27     with open(self.filePath, 'r', encoding='utf-8', newline='') as file:
28     reader = csv.reader(file, delimiter = self.delimiter, quotechar = self.quotechar)
29     self.records = [row for row in reader]
30    
31    
32     def get_headers(self):
33     return self.records[0] if self.records else []
34    
35     def get_data(self):
36     return self.records[1:] if len(self.records) >1 else []
37    
38     def get_all(self):
39     return self.records
40    
41    
42 nino.borges 940 def load_as_namedtuples(self, document_identifier: str, *, strict: bool = True):
43     """
44     Parse the loaded records into namedtuples keyed by `document_identifier`.
45 nino.borges 938
46 nino.borges 940 Returns:
47     dict[str, ConcordanceRow]: Mapping from the document_identifier value
48     to a namedtuple containing the row's data.
49 nino.borges 938
50 nino.borges 940 Notes:
51     - Header values are sanitized into valid Python identifiers for the
52     namedtuple field names.
53     - Rows shorter than the header are padded with "".
54     - Rows longer than the header are trimmed.
55     - If `strict=True`, duplicate keys raise ValueError; otherwise, the
56     last row wins.
57     """
58     # Ensure we have records loaded
59     if not getattr(self, "records", None):
60     # If your class already has a `load()` method, call it:
61     if hasattr(self, "load") and callable(self.load):
62     self.load()
63     else:
64     raise RuntimeError("No records loaded and no load() method available.")
65 nino.borges 938
66 nino.borges 940 if not self.records:
67     return {}
68 nino.borges 938
69 nino.borges 940 header = self.records[0]
70     data_rows = self.records[1:]
71 nino.borges 938
72 nino.borges 940 # Build index map for the original header
73     idx_map = {h: i for i, h in enumerate(header)}
74     if document_identifier not in idx_map:
75     raise ValueError(
76     f"document_identifier {document_identifier!r} not found in header: {header}"
77     )
78     key_idx = idx_map[document_identifier]
79 nino.borges 938
80 nino.borges 940 # Sanitize header names -> valid unique identifiers for namedtuple fields
81     def _sanitize(name: str) -> str:
82     if name is None:
83     name = ""
84     name = str(name).strip()
85     if not name:
86     name = "field"
87     # Replace non-word chars with underscore, collapse repeats
88     name = re.sub(r"\W+", "_", name)
89     # Names can't start with a digit
90     if re.match(r"^\d", name):
91     name = "_" + name
92     return name
93    
94     sanitized = []
95     seen = set()
96     for raw in header:
97     base = _sanitize(raw)
98     cand = base
99     i = 2
100     while cand in sanitized:
101     cand = f"{base}_{i}"
102     i += 1
103     sanitized.append(cand)
104     seen.add(cand)
105    
106     # Build namedtuple type
107     RowNT = namedtuple("ConcordanceRow", sanitized)
108    
109     # Keep a mapping to help debugging (original header -> namedtuple field)
110     self.header_to_field = dict(zip(header, sanitized))
111     self.namedtuple_type = RowNT # expose the type if you want to use it later
112    
113     result = {}
114     n_fields = len(header)
115    
116     for row_idx, row in enumerate(data_rows, start=2): # 1-based header, so data starts at line 2
117     # Normalize row length
118     if len(row) < n_fields:
119     row = row + [""] * (n_fields - len(row))
120     elif len(row) > n_fields:
121     row = row[:n_fields]
122    
123     key = row[key_idx]
124     nt = RowNT(*row)
125    
126     if strict and key in result:
127     raise ValueError(
128     f"Duplicate key {key!r} found at data line {row_idx} for document_identifier={document_identifier!r}."
129     )
130     result[key] = nt
131    
132     self.namedtuple_records = result # optional: stash for later access
133     return result