ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/ns_dev/Python/NinoCode/Active_prgs/Redgrave/ATT-PrivLogQC.py
Revision: 949
Committed: Wed Nov 5 18:18:53 2025 UTC (4 months, 3 weeks ago) by nino.borges
Content type: text/x-python
File size: 23237 byte(s)
Log Message:
added support for names to MAL checks which will compare the normalized names to the MAL, using the metadata values.

File Contents

# User Rev Content
1 nino.borges 887 """
2    
3     ATT-PrivLogQC
4    
5     Created by:
6     Emanuel Borges
7     03.25.2025
8    
9     This program will assist with the process of performing QC on past and present AT&T privilege logs.
10    
11     """
12    
13     import os, re
14     from collections import namedtuple
15     from MyCode.Tool_Box import FileEncodingLib
16 nino.borges 949 import MyCode.Active_prgs.Redgrave.ATT_MasterAttorneyList
17 nino.borges 887
18    
19     class QcPrivLog(object):
20     """A class for automating the process of performing QC on the AT&T privilege logs, including names normalization analysis"""
21 nino.borges 949 version = '0.2.0'
22 nino.borges 887
23    
24     def __init__(self, cleanedDatExportFileName, metaFromFieldName, plogFromFieldName, metaToFieldName, plogToFieldName, metaCcFieldName, plogCcFieldName, metaBccFieldName, plogBccFieldName, metaAuthorFieldName, plogAuthorFieldName, fileEncoding = 'UTF8'):
25     """Initializes the data structures. cleanedDatExportFileName should be the full path to the file. Assumes the first row of the data file is the header and first column is DocID."""
26     print("Initializing data structures...")
27 nino.borges 949 self.issuesMatrix = {}
28 nino.borges 887 self.metadataValuesDict = {}
29     self.formattedValuesDict = {}
30     self.additionalValuesDict = {}
31     self.allPossibleEmailAddressesRegExPattern = r"[\w.+-]+@[\w-]+\.[\w.-]+"
32    
33     contents = open(cleanedDatExportFileName,encoding = fileEncoding).readlines()
34     self.cleanedInputDataFileHeader = contents[0].replace("\n","")
35     self.cleanedInputDataFileHeaderList = self.cleanedInputDataFileHeader.split("|")
36     self.cleanedInputDataFileHeaderPositionalMatrix = {v: i for i, v in enumerate(self.cleanedInputDataFileHeaderList)}
37     contents = contents[1:]
38     print (f"There are {len(contents)} rows of data in this input file.\n\n")
39    
40     print (f"The data structure will be made of following field pairs:")
41     print(f"{metaFromFieldName} | {plogFromFieldName}")
42     print(f"{metaToFieldName} | {plogToFieldName}")
43     print(f"{metaCcFieldName} | {plogCcFieldName}")
44     print(f"{metaBccFieldName} | {plogBccFieldName}")
45     print(f"{metaAuthorFieldName} | {plogAuthorFieldName}\n\n")
46    
47    
48    
49     RecordValues = namedtuple("RecordValues","fromValues toValues ccValues bccValues docAuthor")
50     self.recordValuesFieldList = RecordValues._fields
51    
52    
53     for line in contents:
54     line = line.replace("\n","")
55     line = line.split("|")
56     docID = line[0]
57     self.metadataValuesDict[docID] = RecordValues(self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaFromFieldName]]),
58     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaToFieldName]]),
59     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaCcFieldName]]),
60     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaBccFieldName]]),
61     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaAuthorFieldName]]))
62     self.formattedValuesDict[docID] = RecordValues(self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogFromFieldName]]),
63     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogToFieldName]]),
64     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogCcFieldName]]),
65     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogBccFieldName]]),
66     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogAuthorFieldName]]))
67 nino.borges 949 #print(self.formattedValuesDict[docID])
68 nino.borges 887
69     print("Data structures created.")
70    
71    
72     def __SplitAndClean(self, rawVal, delim = ";"):
73     """Pseudo-private method which will take a raw string and split this into a list, removing any leading or trailing whitespace"""
74     if rawVal:
75     newVal = [x.strip() for x in rawVal.split(delim)]
76     else: newVal = ""
77     return newVal
78    
79    
80     def __FieldDedupeByEmailAddress(self, valuesList):
81     """Pseudo-private method which will attempt to deduplicate a list of values from a specific field by pulling out email addresses as the deduplication criteria. Returns deduplicated count."""
82     ## This should ONLY be used for deduplication for counting and not for true deduplication because it removes a duplicate value at random and sometimes this will be the value with more information.
83     ## TODO: update this to be case insensitive.
84     tempEmailList = []
85     newList = []
86     for item in valuesList:
87     result = re.findall(self.allPossibleEmailAddressesRegExPattern, item)
88     if result:
89     for r in result:
90     if r.upper() in tempEmailList:
91     pass
92     else:
93     newList.append(item)
94     tempEmailList.append(r.upper())
95     else:
96     newList.append(item)
97     return len(newList)
98    
99    
100     def __FieldFullValueDedupe(self, valuesList):
101     """Pseudo-private method which will attempt to deduplicate a list of values from a specific field using the FULL VALUE. This was created because there appears to be duplicate values in the formatted fields"""
102     ## Going to do this the long way because of possible uppercase-lowercase issues. These should all be uppercase but there shouldnt have been dups either...
103     newSet = set()
104     for item in valuesList:
105     newSet.add(item.upper())
106     return len(newSet)
107    
108    
109 nino.borges 949 def __AddToIssuesMatrix(self,docID,issueMessage):
110     """This method will add a single issue to the issues matrix."""
111     if docID in list(self.issuesMatrix.keys()):
112     self.issuesMatrix[docID].append(issueMessage)
113     else:
114     self.issuesMatrix[docID] = [issueMessage,]
115    
116 nino.borges 887 def PerformValueCountChecks(self, countsOnly = True):
117     """Performs the inital value count checks between the metadata values and formatted values, looking for red flags and warnings. By default reports numbers. Set countsOnly to false to export reports."""
118     workList = self.metadataValuesDict.keys()
119     #misCount = 0
120     #redFlagDocList = []
121     #warningDocList = []
122     #misList = []
123     redFlagDocSet = set()
124     redFlagDocMatrix = {}
125     warningDocSet = set()
126     warningDocMatrix = {}
127     #duplicatesInFormattedSet = set()
128     duplicatesInFormattedMatrix = {}
129    
130     for docID in workList:
131     for fieldName in self.recordValuesFieldList:
132     metadataFieldValues = self.metadataValuesDict[docID]._asdict()[fieldName]
133     formattedFieldValues = self.formattedValuesDict[docID]._asdict()[fieldName]
134    
135     if len(metadataFieldValues) - len(formattedFieldValues) == 0:
136     pass
137     else:
138     if len(metadataFieldValues) == 0:
139     ## Have to account for instances where the meta docAuthor is blank because it's an email and the formatted just has the from value in it.
140     if fieldName == 'docAuthor':
141     if self.metadataValuesDict[docID].fromValues:
142     pass
143     else:
144     redFlagDocSet.add(docID)
145     #print(docID)
146     try:
147     redFlagDocMatrix[docID].append(fieldName+"-No_Metadata_Entries-A")
148     except KeyError:
149     redFlagDocMatrix[docID] = [fieldName+"-No_Metadata_Entries-A",]
150     else:
151     redFlagDocSet.add(docID)
152     try:
153     redFlagDocMatrix[docID].append(fieldName+"-No_Metadata_Entries-B")
154     except KeyError:
155     redFlagDocMatrix[docID] = [fieldName+"-No_Metadata_Entries-B",]
156     elif len(formattedFieldValues) == 0:
157     redFlagDocSet.add(docID)
158     try:
159     redFlagDocMatrix[docID].append(fieldName+"-No_Formatted_Entries")
160     except KeyError:
161     redFlagDocMatrix[docID] = [fieldName+"-No_Formatted_Entries",]
162     else:
163     ## try the count again by deduplicating the metadata field values. Never on the formatted field values.
164     deduplicatedFieldCount = self.__FieldDedupeByEmailAddress(metadataFieldValues)
165     if deduplicatedFieldCount - len(formattedFieldValues) == 0:
166     pass
167     else:
168     distanceBetween = abs(deduplicatedFieldCount - len(formattedFieldValues))
169     if deduplicatedFieldCount > 30:
170     if distanceBetween > (10 * deduplicatedFieldCount)/100:
171     #print(docID,fieldName)
172     redFlagDocSet.add(docID)
173     try:
174     redFlagDocMatrix[docID].append(fieldName)
175     except KeyError:
176     redFlagDocMatrix[docID] = [fieldName,]
177     else:
178     warningDocSet.add(docID)
179     try:
180     warningDocMatrix[docID].append(fieldName)
181     except KeyError:
182     warningDocMatrix[docID]= [fieldName,]
183     else:
184     if distanceBetween > 2:
185     #print(docID,fieldName)
186     redFlagDocSet.add(docID)
187     try:
188     redFlagDocMatrix[docID].append(fieldName)
189     except KeyError:
190     redFlagDocMatrix[docID] = [fieldName,]
191     else:
192     warningDocSet.add(docID)
193     try:
194     warningDocMatrix[docID].append(fieldName)
195     except KeyError:
196     warningDocMatrix[docID]= [fieldName,]
197    
198     ## Perform a separate check for duplicates in the formatted field.
199     if len(formattedFieldValues) == self.__FieldFullValueDedupe(formattedFieldValues):
200     pass
201     else:
202     try:
203     duplicatesInFormattedMatrix[docID].append(fieldName)
204     except KeyError:
205     duplicatesInFormattedMatrix[docID] = [fieldName,]
206    
207    
208     print(f"There are a total of {len(redFlagDocSet)} red flag documents and {len(warningDocSet)} warnings where the matching field value counts that do not match.")
209     if countsOnly == False:
210     warningsOutputFile = open(r"C:\Test_Dir\ATT\warnings.txt",'w')
211     redFladsOutputFile = open(r"C:\Test_Dir\ATT\redFlags.txt",'w')
212     duplicatesInFormattedOutputFile = open(r"C:\Test_Dir\ATT\dupesInFormattedFields.txt",'w')
213     for x in warningDocMatrix:
214     warningsOutputFile.write(f"{x} | {*warningDocMatrix[x],}\n")
215     warningsOutputFile.close()
216     for y in redFlagDocMatrix:
217     redFladsOutputFile.write(f"{y} | {*redFlagDocMatrix[y],}\n")
218     redFladsOutputFile.close()
219     for z in duplicatesInFormattedMatrix:
220     duplicatesInFormattedOutputFile.write(f"{z} | {*duplicatesInFormattedMatrix[z],}\n")
221     duplicatesInFormattedOutputFile.close()
222    
223 nino.borges 949 def PerformNamesToMalChecks(self):
224     """This method will compare the normalized names to the MAL, using the metadata values."""
225     masterAttorneyListFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\AT&T\Cybersecurity FCA Case\_ATT_Current_MAL\RG - ATT Cross-Matter Master Attorney List (20251104)(20251104-0207).xlsx"
226     attMal = MyCode.Active_prgs.Redgrave.ATT_MasterAttorneyList.ATT_MasterAttorneyList(masterAttorneyListFileName)
227     workList = self.metadataValuesDict.keys()
228     matches = []
229     used_full_names = set()
230     remaining_emails = []
231 nino.borges 887
232 nino.borges 949 for docID in workList:
233     for fieldName in self.recordValuesFieldList:
234     matches = []
235     used_full_names = set()
236     remaining_emails = []
237     #print(docID)
238     #print(fieldName)
239     metadataFieldValues = self.metadataValuesDict[docID]._asdict()[fieldName]
240     #print(metadataFieldValues)
241     formattedFieldValues = self.formattedValuesDict[docID]._asdict()[fieldName]
242     #print(formattedFieldValues)
243     normalized_full_names = {name.upper().replace("(ESQ.)","").strip(): name for name in formattedFieldValues}
244     #print(normalized_full_names)
245     #formattedFieldValues = [x.upper().replace("(ESQ)","").strip() for x in formattedFieldValues]
246     if metadataFieldValues:
247     for metadataFieldValue in metadataFieldValues:
248     result = re.findall(self.allPossibleEmailAddressesRegExPattern, metadataFieldValue)
249     if result:
250     for email in result:
251     person = attMal.malPeopleList.search_by_email(email.upper().strip())
252     if not person:
253     #remaining_emails.append(email)
254     ##self.__AddToIssuesMatrix(docID,f"There is no MAL match for email address {email}.")
255     continue
256     ## With this single email address, generate all possible names
257     possible_names = [name.upper() for name in attMal.malPeopleList.return_person_all_name_variations(person)]
258     #print(possible_names)
259     attorneyStatus = person.is_attorney
260    
261     ## Attempt to find a full name match.
262     found_match = None
263     for candidate in possible_names:
264     #print(candidate)
265     if candidate in normalized_full_names:
266     full_name = normalized_full_names[candidate]
267     if full_name not in used_full_names:
268     #matches.append((email, full_name))
269     ## TO DO: Change these next 2 lines as soon as you support SPLIT ROLE
270     if attorneyStatus == "SPLIT ROLE":
271     attorneyStatus = "YES"
272    
273     if attorneyStatus == "YES" and "(ESQ.)" in full_name:
274     pass
275     elif attorneyStatus == "NO" and "(ESQ.)" not in full_name:
276     pass
277     elif attorneyStatus == "NO":
278     self.__AddToIssuesMatrix(docID,f"{full_name} has an ESQ but is a high confidence non-attorney match.")
279     else:
280     self.__AddToIssuesMatrix(docID,f"{full_name} does not have an ESQ but is a high confidence attorney match.")
281     used_full_names.add(full_name)
282     found_match = full_name
283     break
284     if not found_match:
285     #remaining_emails.append(email)
286     if attorneyStatus == "YES":
287     self.__AddToIssuesMatrix(docID,f"{email} from {fieldName} does not have a corresponding normalized name match but is a high confidence attorney match.")
288     else:
289     pass
290     #self.__AddToIssuesMatrix(docID,f"{email} from {fieldName} does not have a corresponding normalized name match but is a high confidence non-attorney match.")
291     elif "EXCHANGE" in metadataFieldValue.upper():
292     ## The metadata field parsed value didnt have an email address. Try a username lookup.
293     userNameId = metadataFieldValue.split("-")[-1]
294     ## Finding that there is garbage in the userIDs... cleaning that.
295     userNameId = userNameId.replace("]","")
296     userNameId = userNameId.replace('">','')
297     userNameId = userNameId.replace('"','')
298     if userNameId:
299     userNameId = userNameId.upper()
300     person = attMal.malPeopleList.search_by_login_id(userNameId.upper().strip())
301     if not person:
302     #remaining_emails.append(email)
303     ##self.__AddToIssuesMatrix(docID,f"There is no MAL match for User Name Id {userNameId}.")
304     continue
305    
306     ## With this user login id, generate all possible names
307     possible_names = [name.upper() for name in attMal.malPeopleList.return_person_all_name_variations(person)]
308     attorneyStatus = person.is_attorney
309    
310     ## Attempt to find a full name match.
311     found_match = None
312     for candidate in possible_names:
313     if candidate in normalized_full_names:
314     full_name = normalized_full_names[candidate]
315     if full_name not in used_full_names:
316     #matches.append((email, full_name))
317     ## TO DO: Change these next 2 lines as soon as you support SPLIT ROLE
318     if attorneyStatus == "SPLIT ROLE":
319     attorneyStatus = "YES"
320    
321     if attorneyStatus == "YES" and "(ESQ.)" in full_name:
322     pass
323     elif attorneyStatus == "NO" and "(ESQ.)" not in full_name:
324     pass
325     elif attorneyStatus == "NO":
326     self.__AddToIssuesMatrix(docID,f"{full_name} has an ESQ but is a high confidence non-attorney match.")
327     else:
328     self.__AddToIssuesMatrix(docID,f"{full_name} does not have an ESQ but is a high confidence attorney match.")
329     used_full_names.add(full_name)
330     found_match = full_name
331     break
332     if not found_match:
333     #remaining_emails.append(email)
334     if attorneyStatus == "YES":
335     self.__AddToIssuesMatrix(docID,f"User Login ID {userNameId} from {fieldName} does not have a corresponding normalized name match but is a high confidence attorney match.")
336     else:
337     pass
338    
339     ## Compute remaining full names
340     remaining_full_names = [name for name in formattedFieldValues if name not in used_full_names]
341     for y in remaining_full_names:
342     pass
343     #self.__AddToIssuesMatrix(docID,f"{y} from {fieldName} normalized field has no matching metadata value.")
344     return self.issuesMatrix
345     #return matches, remaining_emails, remaining_full_names
346    
347    
348    
349 nino.borges 887 if __name__ == '__main__':
350 nino.borges 949 cleanedDatExportFileName = r"C:\Test_Dir\ATT\PrivLogTest\ESI_Custodial\export_20250930_215204_Converted.txt"
351     #cleanedDatExportFileName = r"C:\Test_Dir\ATT\PrivLogTest\export_20250807_175927_Converted(SHORT).txt"
352 nino.borges 887 qcP = QcPrivLog(cleanedDatExportFileName, "From", "MA Normalized From::Full Name", "To", "MA Normalized To::Full Name",
353 nino.borges 949 "CC", "MA Normalized Cc::Full Name", "BCC", "MA Normalized Bcc::Full Name", "DocAuthor", "Privilege Log From/Author", fileEncoding = 'utf-8')
354 nino.borges 887 print(qcP.cleanedInputDataFileHeaderPositionalMatrix)
355     qcP.PerformValueCountChecks(countsOnly = False)
356 nino.borges 949 masterAttorneyListFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\AT&T\Cybersecurity FCA Case\_ATT_Current_MAL\RG - ATT Cross-Matter Master Attorney List (20251104)(20251104-0207).xlsx"
357     attMal = MyCode.Active_prgs.Redgrave.ATT_MasterAttorneyList.ATT_MasterAttorneyList(masterAttorneyListFileName)
358     issuesMatrix = qcP.PerformNamesToMalChecks()
359     outputFile = open(r"C:\Test_Dir\ATT\namesNormTestOutput.txt",'w',encoding='utf-8')
360     for docID in list(issuesMatrix.keys()):
361     outputFile.write(f"{docID}|{';'.join(issuesMatrix[docID])}\n")
362     outputFile.close()
363 nino.borges 887 #qcP.PerformValueCountChecks()