| 13 |
|
import os, re |
| 14 |
|
from collections import namedtuple |
| 15 |
|
from MyCode.Tool_Box import FileEncodingLib |
| 16 |
+ |
import MyCode.Active_prgs.Redgrave.ATT_MasterAttorneyList |
| 17 |
|
|
| 18 |
|
|
| 19 |
|
class QcPrivLog(object): |
| 20 |
|
"""A class for automating the process of performing QC on the AT&T privilege logs, including names normalization analysis""" |
| 21 |
< |
version = '0.1.0' |
| 21 |
> |
version = '0.2.0' |
| 22 |
|
|
| 23 |
|
|
| 24 |
|
def __init__(self, cleanedDatExportFileName, metaFromFieldName, plogFromFieldName, metaToFieldName, plogToFieldName, metaCcFieldName, plogCcFieldName, metaBccFieldName, plogBccFieldName, metaAuthorFieldName, plogAuthorFieldName, fileEncoding = 'UTF8'): |
| 25 |
|
"""Initializes the data structures. cleanedDatExportFileName should be the full path to the file. Assumes the first row of the data file is the header and first column is DocID.""" |
| 26 |
|
print("Initializing data structures...") |
| 27 |
+ |
self.issuesMatrix = {} |
| 28 |
|
self.metadataValuesDict = {} |
| 29 |
|
self.formattedValuesDict = {} |
| 30 |
|
self.additionalValuesDict = {} |
| 64 |
|
self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogCcFieldName]]), |
| 65 |
|
self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogBccFieldName]]), |
| 66 |
|
self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogAuthorFieldName]])) |
| 67 |
+ |
#print(self.formattedValuesDict[docID]) |
| 68 |
|
|
| 69 |
|
print("Data structures created.") |
| 70 |
|
|
| 106 |
|
return len(newSet) |
| 107 |
|
|
| 108 |
|
|
| 109 |
+ |
def __AddToIssuesMatrix(self,docID,issueMessage): |
| 110 |
+ |
"""This method will add a single issue to the issues matrix.""" |
| 111 |
+ |
if docID in list(self.issuesMatrix.keys()): |
| 112 |
+ |
self.issuesMatrix[docID].append(issueMessage) |
| 113 |
+ |
else: |
| 114 |
+ |
self.issuesMatrix[docID] = [issueMessage,] |
| 115 |
+ |
|
| 116 |
|
def PerformValueCountChecks(self, countsOnly = True): |
| 117 |
|
"""Performs the inital value count checks between the metadata values and formatted values, looking for red flags and warnings. By default reports numbers. Set countsOnly to false to export reports.""" |
| 118 |
|
workList = self.metadataValuesDict.keys() |
| 220 |
|
duplicatesInFormattedOutputFile.write(f"{z} | {*duplicatesInFormattedMatrix[z],}\n") |
| 221 |
|
duplicatesInFormattedOutputFile.close() |
| 222 |
|
|
| 223 |
+ |
def PerformNamesToMalChecks(self): |
| 224 |
+ |
"""This method will compare the normalized names to the MAL, using the metadata values.""" |
| 225 |
+ |
masterAttorneyListFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\AT&T\Cybersecurity FCA Case\_ATT_Current_MAL\RG - ATT Cross-Matter Master Attorney List (20251104)(20251104-0207).xlsx" |
| 226 |
+ |
attMal = MyCode.Active_prgs.Redgrave.ATT_MasterAttorneyList.ATT_MasterAttorneyList(masterAttorneyListFileName) |
| 227 |
+ |
workList = self.metadataValuesDict.keys() |
| 228 |
+ |
matches = [] |
| 229 |
+ |
used_full_names = set() |
| 230 |
+ |
remaining_emails = [] |
| 231 |
+ |
|
| 232 |
+ |
for docID in workList: |
| 233 |
+ |
for fieldName in self.recordValuesFieldList: |
| 234 |
+ |
matches = [] |
| 235 |
+ |
used_full_names = set() |
| 236 |
+ |
remaining_emails = [] |
| 237 |
+ |
#print(docID) |
| 238 |
+ |
#print(fieldName) |
| 239 |
+ |
metadataFieldValues = self.metadataValuesDict[docID]._asdict()[fieldName] |
| 240 |
+ |
#print(metadataFieldValues) |
| 241 |
+ |
formattedFieldValues = self.formattedValuesDict[docID]._asdict()[fieldName] |
| 242 |
+ |
#print(formattedFieldValues) |
| 243 |
+ |
normalized_full_names = {name.upper().replace("(ESQ.)","").strip(): name for name in formattedFieldValues} |
| 244 |
+ |
#print(normalized_full_names) |
| 245 |
+ |
#formattedFieldValues = [x.upper().replace("(ESQ)","").strip() for x in formattedFieldValues] |
| 246 |
+ |
if metadataFieldValues: |
| 247 |
+ |
for metadataFieldValue in metadataFieldValues: |
| 248 |
+ |
result = re.findall(self.allPossibleEmailAddressesRegExPattern, metadataFieldValue) |
| 249 |
+ |
if result: |
| 250 |
+ |
for email in result: |
| 251 |
+ |
person = attMal.malPeopleList.search_by_email(email.upper().strip()) |
| 252 |
+ |
if not person: |
| 253 |
+ |
#remaining_emails.append(email) |
| 254 |
+ |
##self.__AddToIssuesMatrix(docID,f"There is no MAL match for email address {email}.") |
| 255 |
+ |
continue |
| 256 |
+ |
## With this single email address, generate all possible names |
| 257 |
+ |
possible_names = [name.upper() for name in attMal.malPeopleList.return_person_all_name_variations(person)] |
| 258 |
+ |
#print(possible_names) |
| 259 |
+ |
attorneyStatus = person.is_attorney |
| 260 |
+ |
|
| 261 |
+ |
## Attempt to find a full name match. |
| 262 |
+ |
found_match = None |
| 263 |
+ |
for candidate in possible_names: |
| 264 |
+ |
#print(candidate) |
| 265 |
+ |
if candidate in normalized_full_names: |
| 266 |
+ |
full_name = normalized_full_names[candidate] |
| 267 |
+ |
if full_name not in used_full_names: |
| 268 |
+ |
#matches.append((email, full_name)) |
| 269 |
+ |
## TO DO: Change these next 2 lines as soon as you support SPLIT ROLE |
| 270 |
+ |
if attorneyStatus == "SPLIT ROLE": |
| 271 |
+ |
attorneyStatus = "YES" |
| 272 |
+ |
|
| 273 |
+ |
if attorneyStatus == "YES" and "(ESQ.)" in full_name: |
| 274 |
+ |
pass |
| 275 |
+ |
elif attorneyStatus == "NO" and "(ESQ.)" not in full_name: |
| 276 |
+ |
pass |
| 277 |
+ |
elif attorneyStatus == "NO": |
| 278 |
+ |
self.__AddToIssuesMatrix(docID,f"{full_name} has an ESQ but is a high confidence non-attorney match.") |
| 279 |
+ |
else: |
| 280 |
+ |
self.__AddToIssuesMatrix(docID,f"{full_name} does not have an ESQ but is a high confidence attorney match.") |
| 281 |
+ |
used_full_names.add(full_name) |
| 282 |
+ |
found_match = full_name |
| 283 |
+ |
break |
| 284 |
+ |
if not found_match: |
| 285 |
+ |
#remaining_emails.append(email) |
| 286 |
+ |
if attorneyStatus == "YES": |
| 287 |
+ |
self.__AddToIssuesMatrix(docID,f"{email} from {fieldName} does not have a corresponding normalized name match but is a high confidence attorney match.") |
| 288 |
+ |
else: |
| 289 |
+ |
pass |
| 290 |
+ |
#self.__AddToIssuesMatrix(docID,f"{email} from {fieldName} does not have a corresponding normalized name match but is a high confidence non-attorney match.") |
| 291 |
+ |
elif "EXCHANGE" in metadataFieldValue.upper(): |
| 292 |
+ |
## The metadata field parsed value didnt have an email address. Try a username lookup. |
| 293 |
+ |
userNameId = metadataFieldValue.split("-")[-1] |
| 294 |
+ |
## Finding that there is garbage in the userIDs... cleaning that. |
| 295 |
+ |
userNameId = userNameId.replace("]","") |
| 296 |
+ |
userNameId = userNameId.replace('">','') |
| 297 |
+ |
userNameId = userNameId.replace('"','') |
| 298 |
+ |
if userNameId: |
| 299 |
+ |
userNameId = userNameId.upper() |
| 300 |
+ |
person = attMal.malPeopleList.search_by_login_id(userNameId.upper().strip()) |
| 301 |
+ |
if not person: |
| 302 |
+ |
#remaining_emails.append(email) |
| 303 |
+ |
##self.__AddToIssuesMatrix(docID,f"There is no MAL match for User Name Id {userNameId}.") |
| 304 |
+ |
continue |
| 305 |
+ |
|
| 306 |
+ |
## With this user login id, generate all possible names |
| 307 |
+ |
possible_names = [name.upper() for name in attMal.malPeopleList.return_person_all_name_variations(person)] |
| 308 |
+ |
attorneyStatus = person.is_attorney |
| 309 |
+ |
|
| 310 |
+ |
## Attempt to find a full name match. |
| 311 |
+ |
found_match = None |
| 312 |
+ |
for candidate in possible_names: |
| 313 |
+ |
if candidate in normalized_full_names: |
| 314 |
+ |
full_name = normalized_full_names[candidate] |
| 315 |
+ |
if full_name not in used_full_names: |
| 316 |
+ |
#matches.append((email, full_name)) |
| 317 |
+ |
## TO DO: Change these next 2 lines as soon as you support SPLIT ROLE |
| 318 |
+ |
if attorneyStatus == "SPLIT ROLE": |
| 319 |
+ |
attorneyStatus = "YES" |
| 320 |
+ |
|
| 321 |
+ |
if attorneyStatus == "YES" and "(ESQ.)" in full_name: |
| 322 |
+ |
pass |
| 323 |
+ |
elif attorneyStatus == "NO" and "(ESQ.)" not in full_name: |
| 324 |
+ |
pass |
| 325 |
+ |
elif attorneyStatus == "NO": |
| 326 |
+ |
self.__AddToIssuesMatrix(docID,f"{full_name} has an ESQ but is a high confidence non-attorney match.") |
| 327 |
+ |
else: |
| 328 |
+ |
self.__AddToIssuesMatrix(docID,f"{full_name} does not have an ESQ but is a high confidence attorney match.") |
| 329 |
+ |
used_full_names.add(full_name) |
| 330 |
+ |
found_match = full_name |
| 331 |
+ |
break |
| 332 |
+ |
if not found_match: |
| 333 |
+ |
#remaining_emails.append(email) |
| 334 |
+ |
if attorneyStatus == "YES": |
| 335 |
+ |
self.__AddToIssuesMatrix(docID,f"User Login ID {userNameId} from {fieldName} does not have a corresponding normalized name match but is a high confidence attorney match.") |
| 336 |
+ |
else: |
| 337 |
+ |
pass |
| 338 |
+ |
|
| 339 |
+ |
## Compute remaining full names |
| 340 |
+ |
remaining_full_names = [name for name in formattedFieldValues if name not in used_full_names] |
| 341 |
+ |
for y in remaining_full_names: |
| 342 |
+ |
pass |
| 343 |
+ |
#self.__AddToIssuesMatrix(docID,f"{y} from {fieldName} normalized field has no matching metadata value.") |
| 344 |
+ |
return self.issuesMatrix |
| 345 |
+ |
#return matches, remaining_emails, remaining_full_names |
| 346 |
+ |
|
| 347 |
+ |
|
| 348 |
|
|
| 349 |
|
if __name__ == '__main__': |
| 350 |
< |
cleanedDatExportFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\AT&T\Cybersecurity FCA Case\PLOG_Test\Shiny\20250325-Shiny-PLOG-Export-Test_Converted.txt" |
| 350 |
> |
cleanedDatExportFileName = r"C:\Test_Dir\ATT\PrivLogTest\ESI_Custodial\export_20250930_215204_Converted.txt" |
| 351 |
> |
#cleanedDatExportFileName = r"C:\Test_Dir\ATT\PrivLogTest\export_20250807_175927_Converted(SHORT).txt" |
| 352 |
|
qcP = QcPrivLog(cleanedDatExportFileName, "From", "MA Normalized From::Full Name", "To", "MA Normalized To::Full Name", |
| 353 |
< |
"CC", "MA Normalized Cc::Full Name", "BCC", "MA Normalized Bcc::Full Name", "Author", "DocAuthor", fileEncoding = 'UTF8') |
| 353 |
> |
"CC", "MA Normalized Cc::Full Name", "BCC", "MA Normalized Bcc::Full Name", "DocAuthor", "Privilege Log From/Author", fileEncoding = 'utf-8') |
| 354 |
|
print(qcP.cleanedInputDataFileHeaderPositionalMatrix) |
| 355 |
|
qcP.PerformValueCountChecks(countsOnly = False) |
| 356 |
+ |
masterAttorneyListFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\AT&T\Cybersecurity FCA Case\_ATT_Current_MAL\RG - ATT Cross-Matter Master Attorney List (20251104)(20251104-0207).xlsx" |
| 357 |
+ |
attMal = MyCode.Active_prgs.Redgrave.ATT_MasterAttorneyList.ATT_MasterAttorneyList(masterAttorneyListFileName) |
| 358 |
+ |
issuesMatrix = qcP.PerformNamesToMalChecks() |
| 359 |
+ |
outputFile = open(r"C:\Test_Dir\ATT\namesNormTestOutput.txt",'w',encoding='utf-8') |
| 360 |
+ |
for docID in list(issuesMatrix.keys()): |
| 361 |
+ |
outputFile.write(f"{docID}|{';'.join(issuesMatrix[docID])}\n") |
| 362 |
+ |
outputFile.close() |
| 363 |
|
#qcP.PerformValueCountChecks() |