| 1 |
nino.borges |
938 |
"""
|
| 2 |
|
|
|
| 3 |
|
|
ConcordanceDatToDictionary
|
| 4 |
|
|
|
| 5 |
|
|
Created by:
|
| 6 |
|
|
Emanuel Borges
|
| 7 |
|
|
09.03.2025
|
| 8 |
|
|
|
| 9 |
|
|
This program will take a DAT export from Relativity and will return a dictionary where the column names are the keys.
|
| 10 |
|
|
|
| 11 |
|
|
"""
|
| 12 |
|
|
|
| 13 |
|
|
import csv
|
| 14 |
nino.borges |
939 |
from collections import namedtuple
|
| 15 |
nino.borges |
950 |
import re, keyword
|
| 16 |
nino.borges |
938 |
|
| 17 |
|
|
|
| 18 |
|
|
class ConcordanceLoader:
|
| 19 |
|
|
def __init__(self, filePath):
|
| 20 |
|
|
self.filePath = filePath
|
| 21 |
|
|
self.delimiter = '\x14' # ASCII 20
|
| 22 |
|
|
self.quotechar = '\xfe' # ASCII 254
|
| 23 |
|
|
self.records = []
|
| 24 |
|
|
|
| 25 |
|
|
|
| 26 |
|
|
def load(self):
|
| 27 |
|
|
with open(self.filePath, 'r', encoding='utf-8', newline='') as file:
|
| 28 |
|
|
reader = csv.reader(file, delimiter = self.delimiter, quotechar = self.quotechar)
|
| 29 |
|
|
self.records = [row for row in reader]
|
| 30 |
|
|
|
| 31 |
|
|
|
| 32 |
|
|
def get_headers(self):
|
| 33 |
|
|
return self.records[0] if self.records else []
|
| 34 |
|
|
|
| 35 |
|
|
def get_data(self):
|
| 36 |
|
|
return self.records[1:] if len(self.records) >1 else []
|
| 37 |
|
|
|
| 38 |
|
|
def get_all(self):
|
| 39 |
|
|
return self.records
|
| 40 |
|
|
|
| 41 |
|
|
|
| 42 |
nino.borges |
940 |
def load_as_namedtuples(self, document_identifier: str, *, strict: bool = True):
|
| 43 |
nino.borges |
950 |
"""
|
| 44 |
|
|
Parse the loaded records into namedtuples keyed by `document_identifier`.
|
| 45 |
nino.borges |
938 |
|
| 46 |
nino.borges |
950 |
Returns:
|
| 47 |
|
|
dict[str, ConcordanceRow]: Mapping from the document_identifier value
|
| 48 |
|
|
to a namedtuple containing the row's data.
|
| 49 |
nino.borges |
938 |
|
| 50 |
nino.borges |
950 |
Notes:
|
| 51 |
|
|
- Header values are sanitized into valid Python identifiers for the
|
| 52 |
|
|
namedtuple field names.
|
| 53 |
|
|
- Rows shorter than the header are padded with "".
|
| 54 |
|
|
- Rows longer than the header are trimmed.
|
| 55 |
|
|
- If `strict=True`, duplicate keys raise ValueError; otherwise, the
|
| 56 |
|
|
last row wins.
|
| 57 |
|
|
"""
|
| 58 |
|
|
# Ensure we have records loaded
|
| 59 |
|
|
if not getattr(self, "records", None):
|
| 60 |
|
|
# If your class already has a `load()` method, call it:
|
| 61 |
|
|
if hasattr(self, "load") and callable(self.load):
|
| 62 |
|
|
self.load()
|
| 63 |
|
|
else:
|
| 64 |
|
|
raise RuntimeError("No records loaded and no load() method available.")
|
| 65 |
nino.borges |
938 |
|
| 66 |
nino.borges |
950 |
if not self.records:
|
| 67 |
|
|
return {}
|
| 68 |
nino.borges |
938 |
|
| 69 |
nino.borges |
950 |
header = self.records[0]
|
| 70 |
|
|
data_rows = self.records[1:]
|
| 71 |
nino.borges |
938 |
|
| 72 |
nino.borges |
950 |
# Build index map for the original header
|
| 73 |
|
|
idx_map = {h: i for i, h in enumerate(header)}
|
| 74 |
|
|
if document_identifier not in idx_map:
|
| 75 |
|
|
raise ValueError(
|
| 76 |
|
|
f"document_identifier {document_identifier!r} not found in header: {header}"
|
| 77 |
|
|
)
|
| 78 |
|
|
key_idx = idx_map[document_identifier]
|
| 79 |
nino.borges |
938 |
|
| 80 |
nino.borges |
950 |
# Sanitize header names -> valid unique identifiers for namedtuple fields
|
| 81 |
|
|
def _sanitize(name: str) -> str:
|
| 82 |
|
|
if name is None:
|
| 83 |
|
|
name = ""
|
| 84 |
|
|
name = str(name).strip()
|
| 85 |
|
|
if not name:
|
| 86 |
|
|
name = "field"
|
| 87 |
|
|
# Replace non-word chars with underscore, collapse repeats
|
| 88 |
|
|
name = re.sub(r"\W+", "_", name)
|
| 89 |
|
|
# Names can't start with a digit
|
| 90 |
|
|
if re.match(r"^\d", name):
|
| 91 |
|
|
name = "f_" + name
|
| 92 |
|
|
return name
|
| 93 |
nino.borges |
940 |
|
| 94 |
nino.borges |
950 |
sanitized = []
|
| 95 |
|
|
seen = set()
|
| 96 |
|
|
for raw in header:
|
| 97 |
|
|
base = _sanitize(raw)
|
| 98 |
|
|
cand = base
|
| 99 |
|
|
i = 2
|
| 100 |
|
|
while cand in sanitized:
|
| 101 |
|
|
cand = f"{base}_{i}"
|
| 102 |
|
|
i += 1
|
| 103 |
|
|
sanitized.append(cand)
|
| 104 |
|
|
seen.add(cand)
|
| 105 |
nino.borges |
940 |
|
| 106 |
nino.borges |
950 |
# Build namedtuple type
|
| 107 |
|
|
RowNT = namedtuple("ConcordanceRow", sanitized)
|
| 108 |
nino.borges |
940 |
|
| 109 |
nino.borges |
950 |
# Keep a mapping to help debugging (original header -> namedtuple field)
|
| 110 |
|
|
self.header_to_field = dict(zip(header, sanitized))
|
| 111 |
|
|
self.namedtuple_type = RowNT # expose the type if you want to use it later
|
| 112 |
nino.borges |
940 |
|
| 113 |
nino.borges |
950 |
result = {}
|
| 114 |
|
|
n_fields = len(header)
|
| 115 |
nino.borges |
940 |
|
| 116 |
nino.borges |
950 |
for row_idx, row in enumerate(data_rows, start=2): # 1-based header, so data starts at line 2
|
| 117 |
|
|
# Normalize row length
|
| 118 |
|
|
if len(row) < n_fields:
|
| 119 |
|
|
row = row + [""] * (n_fields - len(row))
|
| 120 |
|
|
elif len(row) > n_fields:
|
| 121 |
|
|
row = row[:n_fields]
|
| 122 |
nino.borges |
940 |
|
| 123 |
nino.borges |
950 |
key = row[key_idx]
|
| 124 |
|
|
nt = RowNT(*row)
|
| 125 |
nino.borges |
940 |
|
| 126 |
nino.borges |
950 |
if strict and key in result:
|
| 127 |
|
|
raise ValueError(
|
| 128 |
|
|
f"Duplicate key {key!r} found at data line {row_idx} for document_identifier={document_identifier!r}."
|
| 129 |
|
|
)
|
| 130 |
|
|
result[key] = nt
|
| 131 |
nino.borges |
940 |
|
| 132 |
nino.borges |
950 |
self.namedtuple_records = result # optional: stash for later access
|
| 133 |
|
|
return result
|
| 134 |
|
|
|
| 135 |
|
|
|
| 136 |
|
|
|
| 137 |
|
|
if __name__ == '__main__':
|
| 138 |
|
|
## Full path to the input file
|
| 139 |
|
|
inputFilePath = r"C:\Test_Dir\ATT\export_20250903_214056.dat"
|
| 140 |
|
|
loader = ConcordanceLoader(inputFilePath)
|
| 141 |
|
|
loader.load()
|
| 142 |
|
|
|
| 143 |
|
|
records_by_docid = loader.load_as_namedtuples(document_identifier = 'DOJ_Privilege_Log_ Number')
|
| 144 |
|
|
|
| 145 |
|
|
row = records_by_docid['ATT-DOJ-SHINY-PRIV0000001']
|
| 146 |
|
|
print(row._fields)
|
| 147 |
|
|
print(row.DOJ_CID_Final_Privilege_Description) |