ViewVC Help
View File | Revision Log | Show Annotations | View Changeset | Root Listing
root/ns_dev/Python/NinoCode/Active_prgs/Redgrave/NTRS-TopSenderAnalysis.py
Revision: 803
Committed: Thu Oct 12 16:00:49 2023 UTC (2 years, 5 months ago) by nino.borges
Content type: text/x-python
File size: 9423 byte(s)
Log Message:
This version adds support for gathering all possible email addresses across all CSV files, which also let me gather the unique email address count in the TO field per CSV.  Also adds the date column, as captured in the folder name, and changes the NTRS Domain Criteria to @NTRS.com or .NTRS.COM.

File Contents

# User Rev Content
1 nino.borges 797 """
2    
3     NTRS-TopSenderAnalysis
4    
5     Created by:
6     Emanuel Borges
7     09.20.2023
8    
9     Very simple program that will read multiple CSV files and export a report based on LanID and other general information.
10 nino.borges 798 To-Do: Method to QC for any parsing errors. There should be the same number of fields across all of the CSVs.;
11 nino.borges 797
12     """
13    
14 nino.borges 798 import csv, os, re
15 nino.borges 797
16 nino.borges 801 class TopSendersAnalyzer(object):
17 nino.borges 803 version = "0.04"
18 nino.borges 801
19     def __init__(self):
20     self.startDir = r"C:\Users\eborges\Documents\Cases\Northern Trust\20230919 - FileNetTopSenderAnalysis-Req"
21     #self.startDir = r"C:\Users\eborges\Documents\Cases\Northern Trust\20230919 - FileNetTopSenderAnalysis-Req\_LocalVersion\2023-08-14 FileNet Messages Delete Project - Top sender analysis\8_14_2023\xact_report_Dec_2014_CSV"
22    
23 nino.borges 803
24     ## All possible email addresses across all CSV files
25     self.allPossibleEmailAddressesSet = set()
26     self.allPossibleEmailAddressesOutputFileName = r"C:\Users\eborges\Documents\Cases\Northern Trust\Extracted_AllEmailAddresses_All-CSVs.txt"
27    
28 nino.borges 801 ## All email addresses with an @NTRS.COM domain. Currently unsupported.
29     #self.allToNtrsAddressesSet = set()
30     #self.allToNtrsAddressesOutputFileName = r""
31    
32     ## All true NTRS LAN ID matches, per specification provided to me.
33     self.trueLanIdAddressesSet = set()
34     self.trueLanIdAddressesOutputFileName = r"C:\Users\eborges\Documents\Cases\Northern Trust\Extracted_TRUE_LanAddresses.txt"
35    
36     ## False positive NTRS LAN ID matches, per specification provided to me. Close but just outside of specification. (for analysis)
37     self.falsePositiveLanIdAddressesSet = set()
38     self.falsePositiveLanIdAddressesOutputFileName = r"C:\Users\eborges\Documents\Cases\Northern Trust\Extracted_FALSE-POSTIVE_LanAddresses.txt"
39 nino.borges 800
40 nino.borges 803 #self.lanIdRegExPattern = '[A-Za-z]{2}[0-9]{2,3}@NTRS.COM'
41     self.lanIdRegExPattern = '[A-Za-z]{1,15}[0-9]{1,3}@NTRS.COM'
42 nino.borges 800
43 nino.borges 803
44     ## Simple match to pull out the date as recorded in the path
45     self.dateInPathRegExPattern = '2023-[0-9]{2}-[0-9]{2}'
46    
47    
48     ## Match for pulling out all email addresses, regardless of domain.
49     #self.allPossibleEmailAddressesRegExPattern = '([A-Za-z0-9]+[.-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(\.[A-Z|a-z]{2,})+'
50     #self.allPossibleEmailAddressesRegExPattern = """(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])"""
51     #self.allPossibleEmailAddressesRegExPattern = r"([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\"([]!#-[^-~ \t]|(\\[\t -~]))+\")@([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\[[\t -Z^-~]*])"
52     #self.allPossibleEmailAddressesRegExPattern = r"[\w\.-]+@[\w\.-]+\.\w+"
53     self.allPossibleEmailAddressesRegExPattern = r"[\w.+-]+@[\w-]+\.[\w.-]+"
54    
55     def AnalyzeTopSenders(self, writeTrueLanIDLogFile = False, writeFalsePositiveLanIDLogFile = False, writeAllPossibleEmailAddressesLogFile = False):
56 nino.borges 801 """Main Method in this program"""
57     for (root,dirs,files) in os.walk(self.startDir):
58     for fl in files:
59     fileRowCount = 0
60     eightyCharRowCount = 0
61     nonEightyCharRowCount = 0
62     charsOverEighty = False
63     noNtrsDomainBucketCount = 0
64     ntrsAndLanBucketCount = 0
65     ntrsNoLanBucketCount = 0
66 nino.borges 803 dateInPath = re.findall(self.dateInPathRegExPattern, root)
67     allEmailAddressesInCSVSet = set()
68 nino.borges 801
69     with open(os.path.join(root,fl),mode='r',encoding='ANSI') as csv_file:
70     csv_reader = csv.DictReader(csv_file)
71     for row in csv_reader:
72     fileRowCount += 1
73     if len(row['To']) == 80:
74     eightyCharRowCount +=1
75     else:
76     nonEightyCharRowCount +=1
77     if len(row['To']) > 80:
78     charsOverEighty = True
79     toValue = row['To']
80     toValue = toValue.upper()
81 nino.borges 803
82     ## Match and gather all possible email addresses, adding it to the per CSV set.
83     allEmailAddresses = re.findall(self.allPossibleEmailAddressesRegExPattern, toValue)
84     for eAddress in allEmailAddresses:
85     allEmailAddressesInCSVSet.add(eAddress)
86    
87     ## Perform the main logic tests
88     if "@NTRS.COM" in toValue or ".NTRS.COM" in toValue:
89 nino.borges 801 ## The domain was found. Apply next test.
90 nino.borges 803 ntrsLanAddressesList = re.findall(self.lanIdRegExPattern, toValue)
91 nino.borges 801 ntrsLanIDTrueTestResult = self.LanIDTrueTest(ntrsLanAddressesList)
92     if ntrsLanIDTrueTestResult:
93     ## At least 1 LAN ID was found, using the True Test
94     ntrsAndLanBucketCount +=1
95     #for a in ntrsLanAddressesList:
96     # allToLanAddressesSet.add(a)
97     else:
98     ## Not 1 true LAN ID was found, using the True Test
99     ntrsNoLanBucketCount +=1
100     else:
101     ## No ntrs addresses found at all,
102     noNtrsDomainBucketCount +=1
103 nino.borges 803 print(f"{fl}|{dateInPath}|{fileRowCount}|{eightyCharRowCount}|{nonEightyCharRowCount}|{charsOverEighty}|{len(allEmailAddressesInCSVSet)}|{noNtrsDomainBucketCount}|{ntrsAndLanBucketCount}|{ntrsNoLanBucketCount}")
104 nino.borges 801 csv_file.close()
105 nino.borges 803 ## Update the global all email addresses set, if they selected this option
106     if writeAllPossibleEmailAddressesLogFile:
107     self.allPossibleEmailAddressesSet.update(allEmailAddressesInCSVSet)
108 nino.borges 801 if writeTrueLanIDLogFile:
109     print("Writing the True LAN ID log file...")
110     self.WriteLogFile(self.trueLanIdAddressesSet, self.trueLanIdAddressesOutputFileName)
111     print("Done.\n")
112     if writeFalsePositiveLanIDLogFile:
113     print("Writing the False-Positive LAN ID log file...")
114     self.WriteLogFile(self.falsePositiveLanIdAddressesSet, self.falsePositiveLanIdAddressesOutputFileName)
115     print("Done.\n")
116 nino.borges 803 if writeAllPossibleEmailAddressesLogFile:
117     print("Writing the All Possible Email Addresses Across All CSV Files log file...")
118     self.WriteLogFile(self.allPossibleEmailAddressesSet, self.allPossibleEmailAddressesOutputFileName)
119     print("Done.\n")
120 nino.borges 801
121    
122     def LanIDTrueTest(self, listOfIds):
123     """A need for a more complicated LAN ID test was needed. Returns True if at least 1 true LAN ID is found in the list."""
124     lanIDTestResult = False
125     for lanID in listOfIds:
126     alphaOnly = [x.lower() for x in lanID if x.isalpha()]
127     if len(alphaOnly) > 10:
128     ## I'm too big to be a true LAN ID
129     self.falsePositiveLanIdAddressesSet.add(lanID)
130     else:
131     self.trueLanIdAddressesSet.add(lanID)
132     lanIDTestResult = True
133     return lanIDTestResult
134    
135    
136     def WriteLogFile(self, setOfValues, outputFilePath):
137     """Takes a Set containing values, sorts these, and then writes them to the given outputFilePath)"""
138     fileNameInc = 0
139     while os.path.isfile(outputFilePath):
140     fileNameInc +=1
141     outputFile, extension = os.path.splitext(outputFilePath)
142     outputFilePath = outputFile + str(fileNameInc) + extension
143     outFl = open(outputFilePath,'w')
144     tempList = list(setOfValues)
145     tempList.sort()
146     for i in tempList:
147     outFl.write(f"{i}\n")
148     outFl.close()
149    
150 nino.borges 797 if __name__ == '__main__':
151    
152 nino.borges 801 tsa = TopSendersAnalyzer()
153 nino.borges 803 tsa.AnalyzeTopSenders(writeTrueLanIDLogFile = True, writeFalsePositiveLanIDLogFile = True, writeAllPossibleEmailAddressesLogFile = True)
154 nino.borges 797
155 nino.borges 801
156 nino.borges 800 #print(f"There are {len(allToLanAddressesSet)} unique LAN ID addresses.")
157     #outputFile = open(r"C:\Users\eborges\Documents\Cases\Northern Trust\ExtractedLanAddresses.txt",'w')
158     #allToLanAddressesList = list(allToLanAddressesSet)
159     #allToLanAddressesList.sort()
160     #for i in allToLanAddressesList:
161     # outputFile.write(f"{i}\n")
162     #outputFile.close()
163 nino.borges 797
164     ## Initially gathering some very basic information across the CSV files, not using csv lib
165     # for (root,dirs,files) in os.walk(r"C:\Users\eborges\Documents\Cases\Northern Trust"):
166     # for fl in files:
167     # contents = open(os.path.join(root,fl), encoding='ANSI').readlines()
168     # print(f"{fl}|{len(contents)-1}")