ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/ns_dev/Python/NinoCode/Active_prgs/Redgrave/ATT-PrivLogQC.py
Revision: 887
Committed: Thu May 22 20:04:49 2025 UTC (10 months ago) by nino.borges
Content type: text/x-python
File size: 12844 byte(s)
Log Message:
This program will assist with the process of performing QC on past and present AT&T privilege logs.

File Contents

# Content
1 """
2
3 ATT-PrivLogQC
4
5 Created by:
6 Emanuel Borges
7 03.25.2025
8
9 This program will assist with the process of performing QC on past and present AT&T privilege logs.
10
11 """
12
13 import os, re
14 from collections import namedtuple
15 from MyCode.Tool_Box import FileEncodingLib
16
17
18 class QcPrivLog(object):
19 """A class for automating the process of performing QC on the AT&T privilege logs, including names normalization analysis"""
20 version = '0.1.0'
21
22
23 def __init__(self, cleanedDatExportFileName, metaFromFieldName, plogFromFieldName, metaToFieldName, plogToFieldName, metaCcFieldName, plogCcFieldName, metaBccFieldName, plogBccFieldName, metaAuthorFieldName, plogAuthorFieldName, fileEncoding = 'UTF8'):
24 """Initializes the data structures. cleanedDatExportFileName should be the full path to the file. Assumes the first row of the data file is the header and first column is DocID."""
25 print("Initializing data structures...")
26 self.metadataValuesDict = {}
27 self.formattedValuesDict = {}
28 self.additionalValuesDict = {}
29 self.allPossibleEmailAddressesRegExPattern = r"[\w.+-]+@[\w-]+\.[\w.-]+"
30
31 contents = open(cleanedDatExportFileName,encoding = fileEncoding).readlines()
32 self.cleanedInputDataFileHeader = contents[0].replace("\n","")
33 self.cleanedInputDataFileHeaderList = self.cleanedInputDataFileHeader.split("|")
34 self.cleanedInputDataFileHeaderPositionalMatrix = {v: i for i, v in enumerate(self.cleanedInputDataFileHeaderList)}
35 contents = contents[1:]
36 print (f"There are {len(contents)} rows of data in this input file.\n\n")
37
38 print (f"The data structure will be made of following field pairs:")
39 print(f"{metaFromFieldName} | {plogFromFieldName}")
40 print(f"{metaToFieldName} | {plogToFieldName}")
41 print(f"{metaCcFieldName} | {plogCcFieldName}")
42 print(f"{metaBccFieldName} | {plogBccFieldName}")
43 print(f"{metaAuthorFieldName} | {plogAuthorFieldName}\n\n")
44
45
46
47 RecordValues = namedtuple("RecordValues","fromValues toValues ccValues bccValues docAuthor")
48 self.recordValuesFieldList = RecordValues._fields
49
50
51 for line in contents:
52 line = line.replace("\n","")
53 line = line.split("|")
54 docID = line[0]
55 self.metadataValuesDict[docID] = RecordValues(self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaFromFieldName]]),
56 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaToFieldName]]),
57 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaCcFieldName]]),
58 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaBccFieldName]]),
59 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[metaAuthorFieldName]]))
60 self.formattedValuesDict[docID] = RecordValues(self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogFromFieldName]]),
61 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogToFieldName]]),
62 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogCcFieldName]]),
63 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogBccFieldName]]),
64 self.__SplitAndClean(line[self.cleanedInputDataFileHeaderPositionalMatrix[plogAuthorFieldName]]))
65
66 print("Data structures created.")
67
68
69 def __SplitAndClean(self, rawVal, delim = ";"):
70 """Pseudo-private method which will take a raw string and split this into a list, removing any leading or trailing whitespace"""
71 if rawVal:
72 newVal = [x.strip() for x in rawVal.split(delim)]
73 else: newVal = ""
74 return newVal
75
76
77 def __FieldDedupeByEmailAddress(self, valuesList):
78 """Pseudo-private method which will attempt to deduplicate a list of values from a specific field by pulling out email addresses as the deduplication criteria. Returns deduplicated count."""
79 ## This should ONLY be used for deduplication for counting and not for true deduplication because it removes a duplicate value at random and sometimes this will be the value with more information.
80 ## TODO: update this to be case insensitive.
81 tempEmailList = []
82 newList = []
83 for item in valuesList:
84 result = re.findall(self.allPossibleEmailAddressesRegExPattern, item)
85 if result:
86 for r in result:
87 if r.upper() in tempEmailList:
88 pass
89 else:
90 newList.append(item)
91 tempEmailList.append(r.upper())
92 else:
93 newList.append(item)
94 return len(newList)
95
96
97 def __FieldFullValueDedupe(self, valuesList):
98 """Pseudo-private method which will attempt to deduplicate a list of values from a specific field using the FULL VALUE. This was created because there appears to be duplicate values in the formatted fields"""
99 ## Going to do this the long way because of possible uppercase-lowercase issues. These should all be uppercase but there shouldnt have been dups either...
100 newSet = set()
101 for item in valuesList:
102 newSet.add(item.upper())
103 return len(newSet)
104
105
106 def PerformValueCountChecks(self, countsOnly = True):
107 """Performs the inital value count checks between the metadata values and formatted values, looking for red flags and warnings. By default reports numbers. Set countsOnly to false to export reports."""
108 workList = self.metadataValuesDict.keys()
109 #misCount = 0
110 #redFlagDocList = []
111 #warningDocList = []
112 #misList = []
113 redFlagDocSet = set()
114 redFlagDocMatrix = {}
115 warningDocSet = set()
116 warningDocMatrix = {}
117 #duplicatesInFormattedSet = set()
118 duplicatesInFormattedMatrix = {}
119
120 for docID in workList:
121 for fieldName in self.recordValuesFieldList:
122 metadataFieldValues = self.metadataValuesDict[docID]._asdict()[fieldName]
123 formattedFieldValues = self.formattedValuesDict[docID]._asdict()[fieldName]
124
125 if len(metadataFieldValues) - len(formattedFieldValues) == 0:
126 pass
127 else:
128 if len(metadataFieldValues) == 0:
129 ## Have to account for instances where the meta docAuthor is blank because it's an email and the formatted just has the from value in it.
130 if fieldName == 'docAuthor':
131 if self.metadataValuesDict[docID].fromValues:
132 pass
133 else:
134 redFlagDocSet.add(docID)
135 #print(docID)
136 try:
137 redFlagDocMatrix[docID].append(fieldName+"-No_Metadata_Entries-A")
138 except KeyError:
139 redFlagDocMatrix[docID] = [fieldName+"-No_Metadata_Entries-A",]
140 else:
141 redFlagDocSet.add(docID)
142 try:
143 redFlagDocMatrix[docID].append(fieldName+"-No_Metadata_Entries-B")
144 except KeyError:
145 redFlagDocMatrix[docID] = [fieldName+"-No_Metadata_Entries-B",]
146 elif len(formattedFieldValues) == 0:
147 redFlagDocSet.add(docID)
148 try:
149 redFlagDocMatrix[docID].append(fieldName+"-No_Formatted_Entries")
150 except KeyError:
151 redFlagDocMatrix[docID] = [fieldName+"-No_Formatted_Entries",]
152 else:
153 ## try the count again by deduplicating the metadata field values. Never on the formatted field values.
154 deduplicatedFieldCount = self.__FieldDedupeByEmailAddress(metadataFieldValues)
155 if deduplicatedFieldCount - len(formattedFieldValues) == 0:
156 pass
157 else:
158 distanceBetween = abs(deduplicatedFieldCount - len(formattedFieldValues))
159 if deduplicatedFieldCount > 30:
160 if distanceBetween > (10 * deduplicatedFieldCount)/100:
161 #print(docID,fieldName)
162 redFlagDocSet.add(docID)
163 try:
164 redFlagDocMatrix[docID].append(fieldName)
165 except KeyError:
166 redFlagDocMatrix[docID] = [fieldName,]
167 else:
168 warningDocSet.add(docID)
169 try:
170 warningDocMatrix[docID].append(fieldName)
171 except KeyError:
172 warningDocMatrix[docID]= [fieldName,]
173 else:
174 if distanceBetween > 2:
175 #print(docID,fieldName)
176 redFlagDocSet.add(docID)
177 try:
178 redFlagDocMatrix[docID].append(fieldName)
179 except KeyError:
180 redFlagDocMatrix[docID] = [fieldName,]
181 else:
182 warningDocSet.add(docID)
183 try:
184 warningDocMatrix[docID].append(fieldName)
185 except KeyError:
186 warningDocMatrix[docID]= [fieldName,]
187
188 ## Perform a separate check for duplicates in the formatted field.
189 if len(formattedFieldValues) == self.__FieldFullValueDedupe(formattedFieldValues):
190 pass
191 else:
192 try:
193 duplicatesInFormattedMatrix[docID].append(fieldName)
194 except KeyError:
195 duplicatesInFormattedMatrix[docID] = [fieldName,]
196
197
198 print(f"There are a total of {len(redFlagDocSet)} red flag documents and {len(warningDocSet)} warnings where the matching field value counts that do not match.")
199 if countsOnly == False:
200 warningsOutputFile = open(r"C:\Test_Dir\ATT\warnings.txt",'w')
201 redFladsOutputFile = open(r"C:\Test_Dir\ATT\redFlags.txt",'w')
202 duplicatesInFormattedOutputFile = open(r"C:\Test_Dir\ATT\dupesInFormattedFields.txt",'w')
203 for x in warningDocMatrix:
204 warningsOutputFile.write(f"{x} | {*warningDocMatrix[x],}\n")
205 warningsOutputFile.close()
206 for y in redFlagDocMatrix:
207 redFladsOutputFile.write(f"{y} | {*redFlagDocMatrix[y],}\n")
208 redFladsOutputFile.close()
209 for z in duplicatesInFormattedMatrix:
210 duplicatesInFormattedOutputFile.write(f"{z} | {*duplicatesInFormattedMatrix[z],}\n")
211 duplicatesInFormattedOutputFile.close()
212
213
214 if __name__ == '__main__':
215 cleanedDatExportFileName = r"C:\Users\eborges\OneDrive - Redgrave LLP\Documents\Cases\AT&T\Cybersecurity FCA Case\PLOG_Test\Shiny\20250325-Shiny-PLOG-Export-Test_Converted.txt"
216 qcP = QcPrivLog(cleanedDatExportFileName, "From", "MA Normalized From::Full Name", "To", "MA Normalized To::Full Name",
217 "CC", "MA Normalized Cc::Full Name", "BCC", "MA Normalized Bcc::Full Name", "Author", "DocAuthor", fileEncoding = 'UTF8')
218 print(qcP.cleanedInputDataFileHeaderPositionalMatrix)
219 qcP.PerformValueCountChecks(countsOnly = False)
220 #qcP.PerformValueCountChecks()