-
Notifications
You must be signed in to change notification settings - Fork 0
/
FileReader.py
122 lines (94 loc) · 3.17 KB
/
FileReader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# FileReader
#
import boto3
from pathlib import Path
from Config import state
class FileReader():
def __init__(self, fileURI = None, local_dir = "/tmp", record_separator=",", quote_records=False):
super().__init__()
self.file = None
self.local_dir = local_dir
self.record_separator = record_separator
self.quote_records = quote_records
self._setLocalFile(None)
self.useFileURI(fileURI)
def __del__(self):
self.close()
def useFileURI(self, fileURI):
if self.getFileURI() == fileURI:
return
if self.isOpen():
self.close()
self.cols = []
self.fileURI = fileURI
self.open()
def getFileURI(self):
uri = None
try:
uri = self.fileURI
except Exception as e:
pass
finally:
return uri
def isOpen(self):
return (self.file is not None)
def _getLocalFilePath(self, key):
return "/".join([self.local_dir, key])
def _setLocalFile(self, filename):
self.localFile = filename
def _fetchFromS3(self, bucket, key):
s3 = boto3.client('s3')
localFile = self._getLocalFilePath(key)
result = s3.download_file(bucket, key, localFile)
self.localFile = localFile
def _fetchFileFromURI(self):
try:
handlers = { 's3:': self._fetchFromS3 }
src = self.fileURI.split("/")
protocol = src[0]
bucket = src[2]
key = "/".join(src[3:])
# check local cache before downloading
localFile = self._getLocalFilePath(key)
if Path(localFile).is_file():
self._setLocalFile(localFile)
else:
handlers[protocol](bucket, key)
except Exception as err:
pass
def open(self):
try:
if self.localFile == None:
self._fetchFileFromURI()
self.file = open(self.localFile, 'r')
header = self.file.readline().rstrip()
self.cols = header.split(self.record_separator)
if self.quote_records:
self.cols = [ c.strip('"') for c in self.cols ]
except Exception as err:
print(f'error opening {self.localFile}: {err}')
def close(self):
if self.isOpen():
self.file.close()
self.file = None
self.localFile = None
def _makeSample(self, lineCSV):
sample = {}
line = lineCSV.split(self.record_separator)
if self.quote_records:
line = [ c.strip('"') for c in line ]
for i in range(0, len(self.cols)):
sample[self.cols[i]] = line[i]
return sample
def getSample(self):
readbuffer = {}
try:
readbuffer = self._makeSample(self.file.readline().rstrip())
except IndexError as ie:
print("End of File Reached...")
self.close()
if state['at_end'] == 'repeat':
self.open()
except Exception as e:
print("Exception while reading from file")
return readbuffer