ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/ns_dev/Python/NinoCode/Active_prgs/Redgrave/NTRS-TopSenderAnalysis.py
Revision: 803
Committed: Thu Oct 12 16:00:49 2023 UTC (2 years, 5 months ago) by nino.borges
Content type: text/x-python
File size: 9423 byte(s)
Log Message:
This version adds support for gathering all possible email addresses across all CSV files, which also let me gather the unique email address count in the TO field per CSV.  Also adds the date column, as captured in the folder name, and changes the NTRS Domain Criteria to @NTRS.com or .NTRS.COM.

File Contents

# Content
1 """
2
3 NTRS-TopSenderAnalysis
4
5 Created by:
6 Emanuel Borges
7 09.20.2023
8
9 Very simple program that will read multiple CSV files and export a report based on LanID and other general information.
10 To-Do: Method to QC for any parsing errors. There should be the same number of fields across all of the CSVs.;
11
12 """
13
14 import csv, os, re
15
16 class TopSendersAnalyzer(object):
17 version = "0.04"
18
19 def __init__(self):
20 self.startDir = r"C:\Users\eborges\Documents\Cases\Northern Trust\20230919 - FileNetTopSenderAnalysis-Req"
21 #self.startDir = r"C:\Users\eborges\Documents\Cases\Northern Trust\20230919 - FileNetTopSenderAnalysis-Req\_LocalVersion\2023-08-14 FileNet Messages Delete Project - Top sender analysis\8_14_2023\xact_report_Dec_2014_CSV"
22
23
24 ## All possible email addresses across all CSV files
25 self.allPossibleEmailAddressesSet = set()
26 self.allPossibleEmailAddressesOutputFileName = r"C:\Users\eborges\Documents\Cases\Northern Trust\Extracted_AllEmailAddresses_All-CSVs.txt"
27
28 ## All email addresses with an @NTRS.COM domain. Currently unsupported.
29 #self.allToNtrsAddressesSet = set()
30 #self.allToNtrsAddressesOutputFileName = r""
31
32 ## All true NTRS LAN ID matches, per specification provided to me.
33 self.trueLanIdAddressesSet = set()
34 self.trueLanIdAddressesOutputFileName = r"C:\Users\eborges\Documents\Cases\Northern Trust\Extracted_TRUE_LanAddresses.txt"
35
36 ## False positive NTRS LAN ID matches, per specification provided to me. Close but just outside of specification. (for analysis)
37 self.falsePositiveLanIdAddressesSet = set()
38 self.falsePositiveLanIdAddressesOutputFileName = r"C:\Users\eborges\Documents\Cases\Northern Trust\Extracted_FALSE-POSTIVE_LanAddresses.txt"
39
40 #self.lanIdRegExPattern = '[A-Za-z]{2}[0-9]{2,3}@NTRS.COM'
41 self.lanIdRegExPattern = '[A-Za-z]{1,15}[0-9]{1,3}@NTRS.COM'
42
43
44 ## Simple match to pull out the date as recorded in the path
45 self.dateInPathRegExPattern = '2023-[0-9]{2}-[0-9]{2}'
46
47
48 ## Match for pulling out all email addresses, regardless of domain.
49 #self.allPossibleEmailAddressesRegExPattern = '([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\.[A-Z|a-z]{2,})+'
50 #self.allPossibleEmailAddressesRegExPattern = """(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"""
51 #self.allPossibleEmailAddressesRegExPattern = r"([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\"([]!#-[^-~ \t]|(\\[\t -~]))+\")@([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\[[\t -Z^-~]*])"
52 #self.allPossibleEmailAddressesRegExPattern = r"[\w\.-]+@[\w\.-]+\.\w+"
53 self.allPossibleEmailAddressesRegExPattern = r"[\w.+-]+@[\w-]+\.[\w.-]+"
54
55 def AnalyzeTopSenders(self, writeTrueLanIDLogFile = False, writeFalsePositiveLanIDLogFile = False, writeAllPossibleEmailAddressesLogFile = False):
56 """Main Method in this program"""
57 for (root,dirs,files) in os.walk(self.startDir):
58 for fl in files:
59 fileRowCount = 0
60 eightyCharRowCount = 0
61 nonEightyCharRowCount = 0
62 charsOverEighty = False
63 noNtrsDomainBucketCount = 0
64 ntrsAndLanBucketCount = 0
65 ntrsNoLanBucketCount = 0
66 dateInPath = re.findall(self.dateInPathRegExPattern, root)
67 allEmailAddressesInCSVSet = set()
68
69 with open(os.path.join(root,fl),mode='r',encoding='ANSI') as csv_file:
70 csv_reader = csv.DictReader(csv_file)
71 for row in csv_reader:
72 fileRowCount += 1
73 if len(row['To']) == 80:
74 eightyCharRowCount +=1
75 else:
76 nonEightyCharRowCount +=1
77 if len(row['To']) > 80:
78 charsOverEighty = True
79 toValue = row['To']
80 toValue = toValue.upper()
81
82 ## Match and gather all possible email addresses, adding it to the per CSV set.
83 allEmailAddresses = re.findall(self.allPossibleEmailAddressesRegExPattern, toValue)
84 for eAddress in allEmailAddresses:
85 allEmailAddressesInCSVSet.add(eAddress)
86
87 ## Perform the main logic tests
88 if "@NTRS.COM" in toValue or ".NTRS.COM" in toValue:
89 ## The domain was found. Apply next test.
90 ntrsLanAddressesList = re.findall(self.lanIdRegExPattern, toValue)
91 ntrsLanIDTrueTestResult = self.LanIDTrueTest(ntrsLanAddressesList)
92 if ntrsLanIDTrueTestResult:
93 ## At least 1 LAN ID was found, using the True Test
94 ntrsAndLanBucketCount +=1
95 #for a in ntrsLanAddressesList:
96 # allToLanAddressesSet.add(a)
97 else:
98 ## Not 1 true LAN ID was found, using the True Test
99 ntrsNoLanBucketCount +=1
100 else:
101 ## No ntrs addresses found at all,
102 noNtrsDomainBucketCount +=1
103 print(f"{fl}|{dateInPath}|{fileRowCount}|{eightyCharRowCount}|{nonEightyCharRowCount}|{charsOverEighty}|{len(allEmailAddressesInCSVSet)}|{noNtrsDomainBucketCount}|{ntrsAndLanBucketCount}|{ntrsNoLanBucketCount}")
104 csv_file.close()
105 ## Update the global all email addresses set, if they selected this option
106 if writeAllPossibleEmailAddressesLogFile:
107 self.allPossibleEmailAddressesSet.update(allEmailAddressesInCSVSet)
108 if writeTrueLanIDLogFile:
109 print("Writing the True LAN ID log file...")
110 self.WriteLogFile(self.trueLanIdAddressesSet, self.trueLanIdAddressesOutputFileName)
111 print("Done.\n")
112 if writeFalsePositiveLanIDLogFile:
113 print("Writing the False-Positive LAN ID log file...")
114 self.WriteLogFile(self.falsePositiveLanIdAddressesSet, self.falsePositiveLanIdAddressesOutputFileName)
115 print("Done.\n")
116 if writeAllPossibleEmailAddressesLogFile:
117 print("Writing the All Possible Email Addresses Across All CSV Files log file...")
118 self.WriteLogFile(self.allPossibleEmailAddressesSet, self.allPossibleEmailAddressesOutputFileName)
119 print("Done.\n")
120
121
122 def LanIDTrueTest(self, listOfIds):
123 """A need for a more complicated LAN ID test was needed. Returns True if at least 1 true LAN ID is found in the list."""
124 lanIDTestResult = False
125 for lanID in listOfIds:
126 alphaOnly = [x.lower() for x in lanID if x.isalpha()]
127 if len(alphaOnly) > 10:
128 ## I'm too big to be a true LAN ID
129 self.falsePositiveLanIdAddressesSet.add(lanID)
130 else:
131 self.trueLanIdAddressesSet.add(lanID)
132 lanIDTestResult = True
133 return lanIDTestResult
134
135
136 def WriteLogFile(self, setOfValues, outputFilePath):
137 """Takes a Set containing values, sorts these, and then writes them to the given outputFilePath)"""
138 fileNameInc = 0
139 while os.path.isfile(outputFilePath):
140 fileNameInc +=1
141 outputFile, extension = os.path.splitext(outputFilePath)
142 outputFilePath = outputFile + str(fileNameInc) + extension
143 outFl = open(outputFilePath,'w')
144 tempList = list(setOfValues)
145 tempList.sort()
146 for i in tempList:
147 outFl.write(f"{i}\n")
148 outFl.close()
149
150 if __name__ == '__main__':
151
152 tsa = TopSendersAnalyzer()
153 tsa.AnalyzeTopSenders(writeTrueLanIDLogFile = True, writeFalsePositiveLanIDLogFile = True, writeAllPossibleEmailAddressesLogFile = True)
154
155
156 #print(f"There are {len(allToLanAddressesSet)} unique LAN ID addresses.")
157 #outputFile = open(r"C:\Users\eborges\Documents\Cases\Northern Trust\ExtractedLanAddresses.txt",'w')
158 #allToLanAddressesList = list(allToLanAddressesSet)
159 #allToLanAddressesList.sort()
160 #for i in allToLanAddressesList:
161 # outputFile.write(f"{i}\n")
162 #outputFile.close()
163
164 ## Initially gathering some very basic information across the CSV files, not using csv lib
165 # for (root,dirs,files) in os.walk(r"C:\Users\eborges\Documents\Cases\Northern Trust"):
166 # for fl in files:
167 # contents = open(os.path.join(root,fl), encoding='ANSI').readlines()
168 # print(f"{fl}|{len(contents)-1}")