ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/ns_dev/Python/NinoCode/Active_prgs/Redgrave/RelativityAuditFlattenerXml.py
Revision: 964
Committed: Tue Dec 9 18:56:37 2025 UTC (3 months, 2 weeks ago) by nino.borges
Content type: text/x-python
File size: 12708 byte(s)
Log Message:
Updated by adding a method that is a faster bulk write of the Excel API call and then another method that just writes to CSV and doesnt use the API at all.

File Contents

# User Rev Content
1 nino.borges 960 """
2    
3     RelativityAuditFlattenerXml
4    
5     Created by
6     Emanuel Borges
7     12.08.2025
8    
9     Using my XML inspector, this program will attempt to take a raw Relativity audit history report and generate a flatened spreadsheet report.
10    
11     """
12    
13     import csv
14     import xml.etree.ElementTree as ET
15 nino.borges 962 from typing import Any, Dict, List, Optional
16 nino.borges 960
17    
18 nino.borges 962
19 nino.borges 963
20 nino.borges 962 def load_choice_map(
21     choice_csv_path: str,
22 nino.borges 963 artifact_id_col: str = "Artifact ID",
23 nino.borges 962 name_col: str = "Name",
24     ) -> Dict[str, str]:
25     """
26     Load a CSV containing workspace choices into a dict:
27     { artifact_id (str) -> choice_name (str) }
28    
29     By default expects columns named:
30     ArtifactID, Name
31    
32     If your file uses different headers, adjust artifact_id_col / name_col.
33     """
34     choice_map: Dict[str, str] = {}
35    
36     with open(choice_csv_path, newline="", encoding="utf-8-sig") as f:
37     reader = csv.DictReader(f)
38     for row in reader:
39     artifact_id = (row.get(artifact_id_col) or "").strip()
40     name = (row.get(name_col) or "").strip()
41     if artifact_id and name:
42     choice_map[artifact_id] = name
43    
44     return choice_map
45    
46    
47    
48 nino.borges 960 class RelativityAuditFlattenerXml:
49     """
50     Reads a Relativity audit CSV and flattens XML 'Details' so that
51     each <field> modification becomes its own row, with the original
52     columns (Name through Request Origination) repeated.
53 nino.borges 961
54 nino.borges 962 For choice fields, setChoice/unSetChoice/unsetChoice values are
55     artifactIDs; we map them to choice names using a choice_map dict.
56 nino.borges 960 """
57    
58     # The base columns we want to carry through from the CSV
59     BASE_COLUMNS = [
60     "Name",
61     "Action",
62     "User Name",
63     "Timestamp",
64     "ArtifactID",
65     "Execution Time",
66     "ID",
67     "Object Type",
68     "Request Origination",
69     ]
70    
71 nino.borges 961 # New columns derived from the XML
72 nino.borges 960 DETAIL_COLUMNS = [
73 nino.borges 961 "Field Name", # from <field name="...">
74     "Field Type", # from <field type="...">
75 nino.borges 962 "unSetChoice", # choice names removed
76     "setChoice", # choice names added
77     "oldValue", # free-text oldValue
78     "newValue", # free-text newValue
79 nino.borges 960 ]
80    
81 nino.borges 962 def __init__(self, csv_path: str, choice_map: Optional[Dict[str, str]] = None) -> None:
82 nino.borges 960 self.csv_path = csv_path
83     self.rows: List[Dict[str, Any]] = []
84 nino.borges 962 self.choice_map: Dict[str, str] = choice_map or {}
85 nino.borges 960
86    
87     def load_and_flatten(self) -> None:
88     """
89     Read the CSV, parse Details XML, and populate self.rows with
90     one row per <field> modification.
91     """
92     with open(self.csv_path, newline="", encoding="utf-8-sig") as f:
93     reader = csv.DictReader(f)
94    
95     for src_row in reader:
96     details_str = (src_row.get("Details") or "").strip()
97     field_change_entries = self._parse_details_xml(details_str)
98    
99     # If we couldn't parse any field-level changes, still output
100     # a single row so the event isn't lost.
101     if not field_change_entries:
102     out_row = self._copy_base_columns(src_row)
103     out_row.update({
104 nino.borges 961 "Field Name": "",
105     "Field Type": "",
106     "unSetChoice": "",
107     "setChoice": "",
108     "oldValue": "",
109     "newValue": "",
110 nino.borges 960 })
111     self.rows.append(out_row)
112     continue
113    
114     # One row per <field> node
115     for fc in field_change_entries:
116     out_row = self._copy_base_columns(src_row)
117     out_row.update({
118     "Field Name": fc.get("field_name", ""),
119 nino.borges 961 "Field Type": fc.get("field_type", ""),
120     "unSetChoice": fc.get("unset_choice", ""),
121     "setChoice": fc.get("set_choice", ""),
122     "oldValue": fc.get("old_value", ""),
123     "newValue": fc.get("new_value", ""),
124 nino.borges 960 })
125     self.rows.append(out_row)
126    
127     def total_field_changes(self) -> int:
128     """Total number of field-level rows."""
129     return len(self.rows)
130    
131 nino.borges 964
132     ## This works well but it's too slow on reallly large reports...
133 nino.borges 960 def export_to_excel(self, excel_path: str) -> None:
134     """
135 nino.borges 963 Export flattened rows to Excel using win32com.
136 nino.borges 960 """
137     from win32com.client import Dispatch
138    
139     xlApp = Dispatch("Excel.Application")
140 nino.borges 963 xlApp.Visible = False
141 nino.borges 960
142     wb = xlApp.Workbooks.Add()
143     ws = wb.Worksheets(1)
144     ws.Name = "Audit Changes"
145    
146     headers = self.BASE_COLUMNS + self.DETAIL_COLUMNS
147    
148     # Header row
149     for col_idx, header in enumerate(headers, start=1):
150     ws.Cells(1, col_idx).Value = header
151    
152     # Data rows
153     for row_idx, row in enumerate(self.rows, start=2):
154     for col_idx, header in enumerate(headers, start=1):
155     ws.Cells(row_idx, col_idx).Value = row.get(header, "")
156    
157     # Autofit and (optionally) make a table
158     last_row = len(self.rows) + 1
159     last_col = len(headers)
160     data_range = ws.Range(ws.Cells(1, 1), ws.Cells(last_row, last_col))
161     data_range.EntireColumn.AutoFit()
162    
163     try:
164     table = ws.ListObjects.Add(
165     SourceType=0, # xlSrcRange
166     Source=data_range,
167     XlListObjectHasHeaders=1, # xlYes
168     )
169     table.Name = "AuditChangesTable"
170     table.TableStyle = "TableStyleLight9"
171     except Exception:
172     # If ListObjects/TableStyle fails, just ignore it
173     pass
174    
175     wb.SaveAs(excel_path)
176     wb.Close(SaveChanges=True)
177     xlApp.Quit()
178    
179 nino.borges 964 ## Still using the Excel win32com API but as a bulk write, which is more complicated but faster.
180     def export_to_excel_fast(self, excel_path: str) -> None:
181     """
182     Export flattened rows to Excel using win32com, but in a single
183     bulk write for speed.
184     """
185     from win32com.client import Dispatch, constants
186 nino.borges 960
187 nino.borges 964 headers = self.BASE_COLUMNS + self.DETAIL_COLUMNS
188     n_rows = len(self.rows)
189     n_cols = len(headers)
190    
191     # Build a 2D list: first row = headers, rest = data
192     data_matrix = [headers]
193     for row in self.rows:
194     data_matrix.append([row.get(h, "") for h in headers])
195    
196     xlApp = Dispatch("Excel.Application")
197     xlApp.Visible = False
198     # Turn off some expensive stuff
199     xlApp.ScreenUpdating = False
200     xlApp.DisplayAlerts = False
201     calc_original = xlApp.Calculation
202     xlApp.Calculation = constants.xlCalculationManual
203    
204     try:
205     wb = xlApp.Workbooks.Add()
206     ws = wb.Worksheets(1)
207     ws.Name = "Audit Changes"
208    
209     # Define the full range (including header row)
210     top_left = ws.Cells(1, 1)
211     bottom_right = ws.Cells(n_rows + 1, n_cols)
212     data_range = ws.Range(top_left, bottom_right)
213    
214     # Bulk write: ONE COM call instead of millions
215     data_range.Value = data_matrix
216    
217     # Autofit and optionally make a table
218     data_range.EntireColumn.AutoFit()
219    
220     try:
221     table = ws.ListObjects.Add(
222     SourceType=0, # xlSrcRange
223     Source=data_range,
224     XlListObjectHasHeaders=1, # xlYes
225     )
226     table.Name = "AuditChangesTable"
227     table.TableStyle = "TableStyleLight9"
228     except Exception:
229     # If ListObjects/TableStyle fails, ignore
230     pass
231    
232     wb.SaveAs(excel_path)
233     wb.Close(SaveChanges=True)
234     finally:
235     # Restore calculation mode and quit
236     xlApp.Calculation = calc_original
237     xlApp.ScreenUpdating = True
238     xlApp.DisplayAlerts = True
239     xlApp.Quit()
240    
241     ## Here I'm trying without using Excel API at all.
242     def export_to_csv(self, csv_path: str) -> None:
243     """
244     Export flattened rows as a CSV file (very fast).
245     Excel opens CSV directly.
246     """
247     headers = self.BASE_COLUMNS + self.DETAIL_COLUMNS
248    
249     with open(csv_path, "w", newline="", encoding="utf-8-sig") as f:
250     writer = csv.writer(f)
251     writer.writerow(headers)
252     for row in self.rows:
253     writer.writerow([row.get(h, "") for h in headers])
254    
255    
256 nino.borges 960 def _copy_base_columns(self, src_row: Dict[str, Any]) -> Dict[str, Any]:
257     """
258     Copy only the base columns from the source CSV row.
259     """
260     out: Dict[str, Any] = {}
261     for col in self.BASE_COLUMNS:
262     out[col] = src_row.get(col, "")
263     return out
264    
265 nino.borges 962 def _map_choice_id_to_name(self, choice_id: str) -> str:
266     """
267     Convert a choice artifactID to its choice name using self.choice_map.
268     If not found, fall back to the original ID string.
269     """
270     choice_id = (choice_id or "").strip()
271     if not choice_id:
272     return ""
273     return self.choice_map.get(choice_id, choice_id)
274    
275 nino.borges 960 def _parse_details_xml(self, xml_str: str) -> List[Dict[str, str]]:
276     """
277 nino.borges 961 Parse the XML in Details and return a list of dicts with keys:
278     - field_name
279     - field_type
280 nino.borges 962 - unset_choice (names, combined)
281     - set_choice (names, combined)
282 nino.borges 961 - old_value
283     - new_value
284 nino.borges 960 """
285     xml_str = (xml_str or "").strip()
286     if not xml_str:
287     return []
288    
289     try:
290     root = ET.fromstring(xml_str)
291     except ET.ParseError:
292     return []
293    
294     results: List[Dict[str, str]] = []
295    
296     # Find all <field> elements anywhere under the root
297     for field_elem in root.findall(".//field"):
298     attrib = field_elem.attrib
299    
300     field_name = attrib.get("name", "")
301 nino.borges 961 field_type = attrib.get("type", "")
302 nino.borges 960
303 nino.borges 962 # Collect choice changes as IDs
304     set_choice_ids = [
305 nino.borges 960 (child.text or "").strip()
306     for child in field_elem.findall("setChoice")
307     if (child.text or "").strip()
308     ]
309    
310 nino.borges 962 unset_choice_ids = [
311 nino.borges 960 (child.text or "").strip()
312     for child in field_elem.findall("unSetChoice")
313     if (child.text or "").strip()
314     ]
315 nino.borges 962 unset_choice_ids += [
316 nino.borges 960 (child.text or "").strip()
317     for child in field_elem.findall("unsetChoice")
318     if (child.text or "").strip()
319     ]
320    
321 nino.borges 962 # Map IDs -> names
322     set_choice_names = [self._map_choice_id_to_name(cid) for cid in set_choice_ids]
323     unset_choice_names = [self._map_choice_id_to_name(cid) for cid in unset_choice_ids]
324 nino.borges 961
325 nino.borges 962 set_choice_str = "; ".join(name for name in set_choice_names if name)
326     unset_choice_str = "; ".join(name for name in unset_choice_names if name)
327    
328 nino.borges 961 # Gather old/new value tags (for non-choice cases)
329 nino.borges 960 old_val_node = field_elem.find("oldValue")
330     new_val_node = field_elem.find("newValue")
331 nino.borges 961 old_val_text = (old_val_node.text or "").strip() if (old_val_node is not None and old_val_node.text) else ""
332     new_val_text = (new_val_node.text or "").strip() if (new_val_node is not None and new_val_node.text) else ""
333 nino.borges 960
334     results.append(
335     {
336     "field_name": field_name,
337 nino.borges 961 "field_type": field_type,
338     "unset_choice": unset_choice_str,
339     "set_choice": set_choice_str,
340     "old_value": old_val_text,
341     "new_value": new_val_text,
342 nino.borges 960 }
343     )
344    
345     return results
346    
347    
348    
349     if __name__ == "__main__":
350 nino.borges 963 audit_csv_path = r"C:\Test_Dir\NS\RelExport-DocHistory-DateFiltered(MED).csv"
351     choice_csv_path = r"C:\Test_Dir\NS\20251208 - NS-AllChoicesLookupList.csv"
352     output_excel = r"C:\Test_Dir\NS\RelativityAuditReport_Flattened_XML.xlsx"
353 nino.borges 960
354 nino.borges 963 ## 1) Load the choice map (artifactID -> choice name)
355 nino.borges 962 choice_map = load_choice_map(choice_csv_path)
356     print(f"Loaded {len(choice_map)} choices from {choice_csv_path}")
357    
358 nino.borges 963 ## 2) Flatten the audit CSV, using the choice map
359 nino.borges 962 flattener = RelativityAuditFlattenerXml(audit_csv_path, choice_map=choice_map)
360 nino.borges 960 flattener.load_and_flatten()
361     print(f"Total field-level changes: {flattener.total_field_changes()}")
362    
363 nino.borges 963 ## 3) Export to Excel
364 nino.borges 960 flattener.export_to_excel(output_excel)
365     print(f"Excel report written to: {output_excel}")