ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/ns_dev/Python/NinoCode/Active_prgs/Redgrave/ATT-PrivLogQC.py
Revision: 887
Committed: Thu May 22 20:04:49 2025 UTC (10 months ago) by nino.borges
Content type: text/x-python
File size: 12844 byte(s)
Log Message:
This program will assist with the process of performing QC on past and present AT&T privilege logs.

File Contents

# User Rev Content
1 nino.borges 887 """
2    
3     ATT-PrivLogQC
4    
5     Created by:
6     Emanuel Borges
7     03.25.2025
8    
9     This program will assist with the process of performing QC on past and present AT&T privilege logs.
10    
11     """
12    
13     import os, re
14     from collections import namedtuple
15     from MyCode.Tool_Box import FileEncodingLib
16    
17    
18     class QcPrivLog(object):
19     """A class for automating the process of performing QC on the AT&T privilege logs, including names normalization analysis"""
20     version = '0.1.0'
21    
22    
23     def __init__(self, cleanedDatExportFileName, metaFromFieldName, plogFromFieldName, metaToFieldName, plogToFieldName, metaCcFieldName, plogCcFieldName, metaBccFieldName, plogBccFieldName, metaAuthorFieldName, plogAuthorFieldName, fileEncoding = 'UTF8'):
24     """Initializes the data structures. cleanedDatExportFileName should be the full path to the file. Assumes the first row of the data file is the header and first column is DocID."""
25     print("Initializing data structures...")
26     self.metadataValuesDict = {}
27     self.formattedValuesDict = {}
28     self.additionalValuesDict = {}
29     self.allPossibleEmailAddressesRegExPattern = r"[\w.+-]+@[\w-]+\.[\w.-]+"
30    
31     contents = open(cleanedDatExportFileName,encoding = fileEncoding).readlines()
32     self.cleanedInputDataFileHeader = contents[0].replace("\n","")
33     self.cleanedInputDataFileHeaderList = self.cleanedInputDataFileHeader.split("|")
34     self.cleanedInputDataFileHeaderPositionalMatrix = {v: i for i, v in enumerate(self.cleanedInputDataFileHeaderList)}
35     contents = contents[1:]
36     print (f"There are {len(contents)} rows of data in this input file.\n\n")
37    
38     print (f"The data structure will be made of following field pairs:")
39     print(f"{metaFromFieldName} | {plogFromFieldName}")
40     print(f"{metaToFieldName} | {plogToFieldName}")
41     print(f"{metaCcFieldName} | {plogCcFieldName}")
42     print(f"{metaBccFieldName} | {plogBccFieldName}")
43     print(f"{metaAuthorFieldName} | {plogAuthorFieldName}\n\n")
44    
45    
46    
47     RecordValues = namedtuple("RecordValues","fromValues toValues ccValues bccValues docAuthor")
48     self.recordValuesFieldList = RecordValues._fields
49    
50    
51     for line in contents:
52     line = line.replace("\n","")
53     line = line.split("|")
54     docID = line[0]
55     self.metadataValuesDict[docID] = RecordValues(self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaFromFieldName]]),
56     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaToFieldName]]),
57     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaCcFieldName]]),
58     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaBccFieldName]]),
59     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaAuthorFieldName]]))
60     self.formattedValuesDict[docID] = RecordValues(self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogFromFieldName]]),
61     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogToFieldName]]),
62     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogCcFieldName]]),
63     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogBccFieldName]]),
64     self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogAuthorFieldName]]))
65    
66     print("Data structures created.")
67    
68    
69     def __SplitAndClean(self, rawVal, delim = ";"):
70     """Pseudo-private method which will take a raw string and split this into a list, removing any leading or trailing whitespace"""
71     if rawVal:
72     newVal = [x.strip() for x in rawVal.split(delim)]
73     else: newVal = ""
74     return newVal
75    
76    
77     def __FieldDedupeByEmailAddress(self, valuesList):
78     """Pseudo-private method which will attempt to deduplicate a list of values from a specific field by pulling out email addresses as the deduplication criteria. Returns deduplicated count."""
79     ## This should ONLY be used for deduplication for counting and not for true deduplication because it removes a duplicate value at random and sometimes this will be the value with more information.
80     ## TODO: update this to be case insensitive.
81     tempEmailList = []
82     newList = []
83     for item in valuesList:
84     result = re.findall(self.allPossibleEmailAddressesRegExPattern, item)
85     if result:
86     for r in result:
87     if r.upper() in tempEmailList:
88     pass
89     else:
90     newList.append(item)
91     tempEmailList.append(r.upper())
92     else:
93     newList.append(item)
94     return len(newList)
95    
96    
97     def __FieldFullValueDedupe(self, valuesList):
98     """Pseudo-private method which will attempt to deduplicate a list of values from a specific field using the FULL VALUE. This was created because there appears to be duplicate values in the formatted fields"""
99     ## Going to do this the long way because of possible uppercase-lowercase issues. These should all be uppercase but there shouldnt have been dups either...
100     newSet = set()
101     for item in valuesList:
102     newSet.add(item.upper())
103     return len(newSet)
104    
105    
106     def PerformValueCountChecks(self, countsOnly = True):
107     """Performs the inital value count checks between the metadata values and formatted values, looking for red flags and warnings. By default reports numbers. Set countsOnly to false to export reports."""
108     workList = self.metadataValuesDict.keys()
109     #misCount = 0
110     #redFlagDocList = []
111     #warningDocList = []
112     #misList = []
113     redFlagDocSet = set()
114     redFlagDocMatrix = {}
115     warningDocSet = set()
116     warningDocMatrix = {}
117     #duplicatesInFormattedSet = set()
118     duplicatesInFormattedMatrix = {}
119    
120     for docID in workList:
121     for fieldName in self.recordValuesFieldList:
122     metadataFieldValues = self.metadataValuesDict[docID]._asdict()[fieldName]
123     formattedFieldValues = self.formattedValuesDict[docID]._asdict()[fieldName]
124    
125     if len(metadataFieldValues) - len(formattedFieldValues) == 0:
126     pass
127     else:
128     if len(metadataFieldValues) == 0:
129     ## Have to account for instances where the meta docAuthor is blank because it's an email and the formatted just has the from value in it.
130     if fieldName == 'docAuthor':
131     if self.metadataValuesDict[docID].fromValues:
132     pass
133     else:
134     redFlagDocSet.add(docID)
135     #print(docID)
136     try:
137     redFlagDocMatrix[docID].append(fieldName+"-No_Metadata_Entries-A")
138     except KeyError:
139     redFlagDocMatrix[docID] = [fieldName+"-No_Metadata_Entries-A",]
140     else:
141     redFlagDocSet.add(docID)
142     try:
143     redFlagDocMatrix[docID].append(fieldName+"-No_Metadata_Entries-B")
144     except KeyError:
145     redFlagDocMatrix[docID] = [fieldName+"-No_Metadata_Entries-B",]
146     elif len(formattedFieldValues) == 0:
147     redFlagDocSet.add(docID)
148     try:
149     redFlagDocMatrix[docID].append(fieldName+"-No_Formatted_Entries")
150     except KeyError:
151     redFlagDocMatrix[docID] = [fieldName+"-No_Formatted_Entries",]
152     else:
153     ## try the count again by deduplicating the metadata field values. Never on the formatted field values.
154     deduplicatedFieldCount = self.__FieldDedupeByEmailAddress(metadataFieldValues)
155     if deduplicatedFieldCount - len(formattedFieldValues) == 0:
156     pass
157     else:
158     distanceBetween = abs(deduplicatedFieldCount - len(formattedFieldValues))
159     if deduplicatedFieldCount > 30:
160     if distanceBetween > (10 * deduplicatedFieldCount)/100:
161     #print(docID,fieldName)
162     redFlagDocSet.add(docID)
163     try:
164     redFlagDocMatrix[docID].append(fieldName)
165     except KeyError:
166     redFlagDocMatrix[docID] = [fieldName,]
167     else:
168     warningDocSet.add(docID)
169     try:
170     warningDocMatrix[docID].append(fieldName)
171     except KeyError:
172     warningDocMatrix[docID]= [fieldName,]
173     else:
174     if distanceBetween > 2:
175     #print(docID,fieldName)
176     redFlagDocSet.add(docID)
177     try:
178     redFlagDocMatrix[docID].append(fieldName)
179     except KeyError:
180     redFlagDocMatrix[docID] = [fieldName,]
181     else:
182     warningDocSet.add(docID)
183     try:
184     warningDocMatrix[docID].append(fieldName)
185     except KeyError:
186     warningDocMatrix[docID]= [fieldName,]
187    
188     ## Perform a separate check for duplicates in the formatted field.
189     if len(formattedFieldValues) == self.__FieldFullValueDedupe(formattedFieldValues):
190     pass
191     else:
192     try:
193     duplicatesInFormattedMatrix[docID].append(fieldName)
194     except KeyError:
195     duplicatesInFormattedMatrix[docID] = [fieldName,]
196    
197    
198     print(f"There are a total of {len(redFlagDocSet)} red flag documents and {len(warningDocSet)} warnings where the matching field value counts that do not match.")
199     if countsOnly == False:
200     warningsOutputFile = open(r"C:\Test_Dir\ATT\warnings.txt",'w')
201     redFladsOutputFile = open(r"C:\Test_Dir\ATT\redFlags.txt",'w')
202     duplicatesInFormattedOutputFile = open(r"C:\Test_Dir\ATT\dupesInFormattedFields.txt",'w')
203     for x in warningDocMatrix:
204     warningsOutputFile.write(f"{x} | {*warningDocMatrix[x],}\n")
205     warningsOutputFile.close()
206     for y in redFlagDocMatrix:
207     redFladsOutputFile.write(f"{y} | {*redFlagDocMatrix[y],}\n")
208     redFladsOutputFile.close()
209     for z in duplicatesInFormattedMatrix:
210     duplicatesInFormattedOutputFile.write(f"{z} | {*duplicatesInFormattedMatrix[z],}\n")
211     duplicatesInFormattedOutputFile.close()
212    
213    
214     if __name__ == '__main__':
215     cleanedDatExportFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\AT&T\Cybersecurity FCA Case\PLOG_Test\Shiny\20250325-Shiny-PLOG-Export-Test_Converted.txt"
216     qcP = QcPrivLog(cleanedDatExportFileName, "From", "MA Normalized From::Full Name", "To", "MA Normalized To::Full Name",
217     "CC", "MA Normalized Cc::Full Name", "BCC", "MA Normalized Bcc::Full Name", "Author", "DocAuthor", fileEncoding = 'UTF8')
218     print(qcP.cleanedInputDataFileHeaderPositionalMatrix)
219     qcP.PerformValueCountChecks(countsOnly = False)
220     #qcP.PerformValueCountChecks()