ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/ns_dev/Python/NinoCode/Active_prgs/Redgrave/NS-PrivLogQC.py
Revision: 975
Committed: Thu Feb 12 19:58:06 2026 UTC (6 weeks, 1 day ago) by nino.borges
Content type: text/x-python
File size: 22947 byte(s)
Log Message:
This program will assist with the process of performing QC on past and present NS privilege logs.

File Contents

# Content
1 """
2
3 NS-PrivLogQC
4
5 Created by:
6 Emanuel Borges
7 11.17.2025
8
9 This program will assist with the process of performing QC on past and present NS privilege logs.
10
11 """
12
13 import os, re
14 from collections import namedtuple
15 from MyCode.Tool_Box import FileEncodingLib
16 import MyCode.Active_prgs.Redgrave.NS_MasterAttorneyList
17
18
19 class QcPrivLog(object):
20 """A class for automating the process of performing QC on the NS privilege logs, including names normalization analysis"""
21 version = '0.2.0'
22
23
24 def __init__(self, cleanedDatExportFileName, metaFromFieldName, plogFromFieldName, metaToFieldName, plogToFieldName, metaCcFieldName, plogCcFieldName, metaBccFieldName, plogBccFieldName, metaAuthorFieldName, plogAuthorFieldName, fileEncoding = 'UTF8'):
25 """Initializes the data structures. cleanedDatExportFileName should be the full path to the file. Assumes the first row of the data file is the header and first column is DocID."""
26 print("Initializing data structures...")
27 self.issuesMatrix = {}
28 self.metadataValuesDict = {}
29 self.formattedValuesDict = {}
30 self.additionalValuesDict = {}
31 self.allPossibleEmailAddressesRegExPattern = r"[\w.+-]+@[\w-]+\.[\w.-]+"
32
33 contents = open(cleanedDatExportFileName,encoding = fileEncoding).readlines()
34 self.cleanedInputDataFileHeader = contents[0].replace("\n","")
35 self.cleanedInputDataFileHeaderList = self.cleanedInputDataFileHeader.split("|")
36 self.cleanedInputDataFileHeaderPositionalMatrix = {v: i for i, v in enumerate(self.cleanedInputDataFileHeaderList)}
37 contents = contents[1:]
38 print (f"There are {len(contents)} rows of data in this input file.\n\n")
39
40 print (f"The data structure will be made of following field pairs:")
41 print(f"{metaFromFieldName} | {plogFromFieldName}")
42 print(f"{metaToFieldName} | {plogToFieldName}")
43 print(f"{metaCcFieldName} | {plogCcFieldName}")
44 print(f"{metaBccFieldName} | {plogBccFieldName}")
45 print(f"{metaAuthorFieldName} | {plogAuthorFieldName}\n\n")
46
47
48
49 RecordValues = namedtuple("RecordValues","fromValues toValues ccValues bccValues docAuthor")
50 self.recordValuesFieldList = RecordValues._fields
51
52
53 for line in contents:
54 line = line.replace("\n","")
55 line = line.split("|")
56 docID = line[0]
57 self.metadataValuesDict[docID] = RecordValues(self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaFromFieldName]]),
58 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaToFieldName]]),
59 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaCcFieldName]]),
60 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaBccFieldName]]),
61 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaAuthorFieldName]]))
62 self.formattedValuesDict[docID] = RecordValues(self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogFromFieldName]]),
63 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogToFieldName]]),
64 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogCcFieldName]]),
65 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogBccFieldName]]),
66 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogAuthorFieldName]]))
67 #print(self.formattedValuesDict[docID])
68
69 print("Data structures created.")
70
71
72 def __SplitAndClean(self, rawVal, delim = ";"):
73 """Pseudo-private method which will take a raw string and split this into a list, removing any leading or trailing whitespace"""
74 if rawVal:
75 newVal = [x.strip() for x in rawVal.split(delim)]
76 else: newVal = ""
77 return newVal
78
79
80 def __FieldDedupeByEmailAddress(self, valuesList):
81 """Pseudo-private method which will attempt to deduplicate a list of values from a specific field by pulling out email addresses as the deduplication criteria. Returns deduplicated count."""
82 ## This should ONLY be used for deduplication for counting and not for true deduplication because it removes a duplicate value at random and sometimes this will be the value with more information.
83 ## TODO: update this to be case insensitive.
84 tempEmailList = []
85 newList = []
86 for item in valuesList:
87 result = re.findall(self.allPossibleEmailAddressesRegExPattern, item)
88 if result:
89 for r in result:
90 if r.upper() in tempEmailList:
91 pass
92 else:
93 newList.append(item)
94 tempEmailList.append(r.upper())
95 else:
96 newList.append(item)
97 return len(newList)
98
99
100 def __FieldFullValueDedupe(self, valuesList):
101 """Pseudo-private method which will attempt to deduplicate a list of values from a specific field using the FULL VALUE. This was created because there appears to be duplicate values in the formatted fields"""
102 ## Going to do this the long way because of possible uppercase-lowercase issues. These should all be uppercase but there shouldnt have been dups either...
103 newSet = set()
104 for item in valuesList:
105 newSet.add(item.upper())
106 return len(newSet)
107
108
109 def __AddToIssuesMatrix(self,docID,issueMessage):
110 """This method will add a single issue to the issues matrix."""
111 if docID in list(self.issuesMatrix.keys()):
112 self.issuesMatrix[docID].append(issueMessage)
113 else:
114 self.issuesMatrix[docID] = [issueMessage,]
115
116 def PerformValueCountChecks(self, countsOnly = True):
117 """Performs the inital value count checks between the metadata values and formatted values, looking for red flags and warnings. By default reports numbers. Set countsOnly to false to export reports."""
118 workList = self.metadataValuesDict.keys()
119 #misCount = 0
120 #redFlagDocList = []
121 #warningDocList = []
122 #misList = []
123 redFlagDocSet = set()
124 redFlagDocMatrix = {}
125 warningDocSet = set()
126 warningDocMatrix = {}
127 #duplicatesInFormattedSet = set()
128 duplicatesInFormattedMatrix = {}
129
130 for docID in workList:
131 for fieldName in self.recordValuesFieldList:
132 metadataFieldValues = self.metadataValuesDict[docID]._asdict()[fieldName]
133 formattedFieldValues = self.formattedValuesDict[docID]._asdict()[fieldName]
134
135 if len(metadataFieldValues) - len(formattedFieldValues) == 0:
136 pass
137 else:
138 if len(metadataFieldValues) == 0:
139 ## Have to account for instances where the meta docAuthor is blank because it's an email and the formatted just has the from value in it.
140 if fieldName == 'docAuthor':
141 if self.metadataValuesDict[docID].fromValues:
142 pass
143 else:
144 redFlagDocSet.add(docID)
145 #print(docID)
146 try:
147 redFlagDocMatrix[docID].append(fieldName+"-No_Metadata_Entries-A")
148 except KeyError:
149 redFlagDocMatrix[docID] = [fieldName+"-No_Metadata_Entries-A",]
150 else:
151 redFlagDocSet.add(docID)
152 try:
153 redFlagDocMatrix[docID].append(fieldName+"-No_Metadata_Entries-B")
154 except KeyError:
155 redFlagDocMatrix[docID] = [fieldName+"-No_Metadata_Entries-B",]
156 elif len(formattedFieldValues) == 0:
157 redFlagDocSet.add(docID)
158 try:
159 redFlagDocMatrix[docID].append(fieldName+"-No_Formatted_Entries")
160 except KeyError:
161 redFlagDocMatrix[docID] = [fieldName+"-No_Formatted_Entries",]
162 else:
163 ## try the count again by deduplicating the metadata field values. Never on the formatted field values.
164 deduplicatedFieldCount = self.__FieldDedupeByEmailAddress(metadataFieldValues)
165 if deduplicatedFieldCount - len(formattedFieldValues) == 0:
166 pass
167 else:
168 distanceBetween = abs(deduplicatedFieldCount - len(formattedFieldValues))
169 if deduplicatedFieldCount > 30:
170 if distanceBetween > (10 * deduplicatedFieldCount)/100:
171 #print(docID,fieldName)
172 redFlagDocSet.add(docID)
173 try:
174 redFlagDocMatrix[docID].append(fieldName)
175 except KeyError:
176 redFlagDocMatrix[docID] = [fieldName,]
177 else:
178 warningDocSet.add(docID)
179 try:
180 warningDocMatrix[docID].append(fieldName)
181 except KeyError:
182 warningDocMatrix[docID]= [fieldName,]
183 else:
184 if distanceBetween > 2:
185 #print(docID,fieldName)
186 redFlagDocSet.add(docID)
187 try:
188 redFlagDocMatrix[docID].append(fieldName)
189 except KeyError:
190 redFlagDocMatrix[docID] = [fieldName,]
191 else:
192 warningDocSet.add(docID)
193 try:
194 warningDocMatrix[docID].append(fieldName)
195 except KeyError:
196 warningDocMatrix[docID]= [fieldName,]
197
198 ## Perform a separate check for duplicates in the formatted field.
199 if len(formattedFieldValues) == self.__FieldFullValueDedupe(formattedFieldValues):
200 pass
201 else:
202 try:
203 duplicatesInFormattedMatrix[docID].append(fieldName)
204 except KeyError:
205 duplicatesInFormattedMatrix[docID] = [fieldName,]
206
207
208 print(f"There are a total of {len(redFlagDocSet)} red flag documents and {len(warningDocSet)} warnings where the matching field value counts that do not match.")
209 if countsOnly == False:
210 warningsOutputFile = open(r"C:\Test_Dir\NS\warnings.txt",'w')
211 redFladsOutputFile = open(r"C:\Test_Dir\NS\redFlags.txt",'w')
212 duplicatesInFormattedOutputFile = open(r"C:\Test_Dir\NS\dupesInFormattedFields.txt",'w')
213 for x in warningDocMatrix:
214 warningsOutputFile.write(f"{x} | {*warningDocMatrix[x],}\n")
215 warningsOutputFile.close()
216 for y in redFlagDocMatrix:
217 redFladsOutputFile.write(f"{y} | {*redFlagDocMatrix[y],}\n")
218 redFladsOutputFile.close()
219 for z in duplicatesInFormattedMatrix:
220 duplicatesInFormattedOutputFile.write(f"{z} | {*duplicatesInFormattedMatrix[z],}\n")
221 duplicatesInFormattedOutputFile.close()
222
223 def PerformNamesToMalChecks(self):
224 """This method will compare the normalized names to the MAL, using the metadata values."""
225 masterAttorneyListFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\Norfolk Southern\_NS_Current_MAL\RG - NS Cross-Matter Master Attorney List 20260120 (20260120-0136).xlsx"
226 nsMal = MyCode.Active_prgs.Redgrave.NS_MasterAttorneyList.NS_MasterAttorneyList(masterAttorneyListFileName)
227 workList = self.metadataValuesDict.keys()
228 matches = []
229 used_full_names = set()
230 remaining_emails = []
231
232 for docID in workList:
233 for fieldName in self.recordValuesFieldList:
234 matches = []
235 used_full_names = set()
236 remaining_emails = []
237 #print(docID)
238 #print(fieldName)
239 metadataFieldValues = self.metadataValuesDict[docID]._asdict()[fieldName]
240 #print(metadataFieldValues)
241 formattedFieldValues = self.formattedValuesDict[docID]._asdict()[fieldName]
242 #print(formattedFieldValues)
243 normalized_full_names = {name.upper().replace("^","").strip(): name for name in formattedFieldValues}
244 #print(normalized_full_names)
245 #formattedFieldValues = [x.upper().replace("^","").strip() for x in formattedFieldValues]
246 if metadataFieldValues:
247 for metadataFieldValue in metadataFieldValues:
248 result = re.findall(self.allPossibleEmailAddressesRegExPattern, metadataFieldValue)
249 if result:
250 for email in result:
251 person = nsMal.malPeopleList.search_by_email(email.upper().strip())
252 if not person:
253 #remaining_emails.append(email)
254 ##self.__AddToIssuesMatrix(docID,f"There is no MAL match for email address {email}.")
255 continue
256 ## With this single email address, generate all possible names
257 possible_names = [name.upper() for name in nsMal.malPeopleList.return_person_all_name_variations(person)]
258 #print(possible_names)
259 attorneyStatus = person.is_attorney
260
261 ## Attempt to find a full name match.
262 found_match = None
263 for candidate in possible_names:
264 #print(candidate)
265 if candidate in normalized_full_names:
266 full_name = normalized_full_names[candidate]
267 if full_name not in used_full_names:
268 #matches.append((email, full_name))
269 ## TO DO: Change these next 2 lines as soon as you support SPLIT ROLE
270 if attorneyStatus == "SPLIT ROLE":
271 attorneyStatus = "YES"
272
273 if attorneyStatus == "YES" and "^" in full_name:
274 pass
275 elif attorneyStatus == "NO" and "^" not in full_name:
276 pass
277 elif attorneyStatus == "NO":
278 self.__AddToIssuesMatrix(docID,f"{full_name} has an ^ but is a high confidence non-attorney match.")
279 else:
280 self.__AddToIssuesMatrix(docID,f"{full_name} does not have an ^ but is a high confidence attorney match.")
281 used_full_names.add(full_name)
282 found_match = full_name
283 break
284 if not found_match:
285 #remaining_emails.append(email)
286 if attorneyStatus == "YES":
287 self.__AddToIssuesMatrix(docID,f"{email} from {fieldName} does not have a corresponding normalized name match but is a high confidence attorney match.")
288 else:
289 pass
290 #self.__AddToIssuesMatrix(docID,f"{email} from {fieldName} does not have a corresponding normalized name match but is a high confidence non-attorney match.")
291 elif "EXCHANGE" in metadataFieldValue.upper():
292 ## The metadata field parsed value didnt have an email address. Try a username lookup.
293 userNameId = metadataFieldValue.split("-")[-1]
294 ## Finding that there is garbage in the userIDs... cleaning that.
295 userNameId = userNameId.replace("]","")
296 userNameId = userNameId.replace('">','')
297 userNameId = userNameId.replace('"','')
298 if userNameId:
299 userNameId = userNameId.upper()
300 person = nsMal.malPeopleList.search_by_login_id(userNameId.upper().strip())
301 if not person:
302 #remaining_emails.append(email)
303 ##self.__AddToIssuesMatrix(docID,f"There is no MAL match for User Name Id {userNameId}.")
304 continue
305
306 ## With this user login id, generate all possible names
307 possible_names = [name.upper() for name in nsMal.malPeopleList.return_person_all_name_variations(person)]
308 attorneyStatus = person.is_attorney
309
310 ## Attempt to find a full name match.
311 found_match = None
312 for candidate in possible_names:
313 if candidate in normalized_full_names:
314 full_name = normalized_full_names[candidate]
315 if full_name not in used_full_names:
316 #matches.append((email, full_name))
317 ## TO DO: Change these next 2 lines as soon as you support SPLIT ROLE
318 if attorneyStatus == "SPLIT ROLE":
319 attorneyStatus = "YES"
320
321 if attorneyStatus == "YES" and "^" in full_name:
322 pass
323 elif attorneyStatus == "NO" and "^" not in full_name:
324 pass
325 elif attorneyStatus == "NO":
326 self.__AddToIssuesMatrix(docID,f"{full_name} has an ^ but is a high confidence non-attorney match.")
327 else:
328 self.__AddToIssuesMatrix(docID,f"{full_name} does not have an ^ but is a high confidence attorney match.")
329 used_full_names.add(full_name)
330 found_match = full_name
331 break
332 if not found_match:
333 #remaining_emails.append(email)
334 if attorneyStatus == "YES":
335 self.__AddToIssuesMatrix(docID,f"User Login ID {userNameId} from {fieldName} does not have a corresponding normalized name match but is a high confidence attorney match.")
336 else:
337 pass
338
339 ## Compute remaining full names
340 remaining_full_names = [name for name in formattedFieldValues if name not in used_full_names]
341 for y in remaining_full_names:
342 pass
343 #self.__AddToIssuesMatrix(docID,f"{y} from {fieldName} normalized field has no matching metadata value.")
344 return self.issuesMatrix
345 #return matches, remaining_emails, remaining_full_names
346
347
348
349 if __name__ == '__main__':
350 cleanedDatExportFileName = r"C:\Test_Dir\NS\PrivLogTest\20260210\Search 1\export_20260211_182928_Converted.txt"
351
352 qcP = QcPrivLog(cleanedDatExportFileName, "From", "PLOG_Norm_From", "To", "PLOG_Norm_To",
353 "CC", "PLOG_Norm_CC", "BCC", "PLOG_Norm_BCC", "Author", "PLOG_Norm_Author", fileEncoding = 'utf-8')
354 print(qcP.cleanedInputDataFileHeaderPositionalMatrix)
355 qcP.PerformValueCountChecks(countsOnly = False)
356 masterAttorneyListFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\Norfolk Southern\_NS_Current_MAL\RG - NS Cross-Matter Master Attorney List 20260120 (20260120-0136).xlsx"
357 nsMal = MyCode.Active_prgs.Redgrave.NS_MasterAttorneyList.NS_MasterAttorneyList(masterAttorneyListFileName)
358 issuesMatrix = qcP.PerformNamesToMalChecks()
359 outputFile = open(r"C:\Test_Dir\NS\namesNormTestOutput.txt",'w',encoding='utf-8')
360 for docID in list(issuesMatrix.keys()):
361 outputFile.write(f"{docID}|{';'.join(issuesMatrix[docID])}\n")
362 outputFile.close()
363