25
25
from azure .storage .blob import BlockBlobService
26
26
from azure .storage .blob .models import ContentSettings
27
27
from azure .storage .queue import QueueService
28
- from azure .storage .table import TableService
28
+ from azure .storage .table import TableBatch , TableService
29
29
30
30
_PY3 = sys .version_info [0 ] == 3
31
31
@@ -48,8 +48,12 @@ def __init__(self,
48
48
zip_compression = False ,
49
49
max_connections = 1 ,
50
50
max_retries = 5 ,
51
- retry_wait = 1.0 ):
52
- self .service = BlockBlobService (account_name , account_key , protocol )
51
+ retry_wait = 1.0 ,
52
+ is_emulated = False ):
53
+ self .service = BlockBlobService (account_name = account_name ,
54
+ account_key = account_key ,
55
+ is_emulated = is_emulated ,
56
+ protocol = protocol )
53
57
self .container_created = False
54
58
hostname = gethostname ()
55
59
self .meta = {'hostname' : hostname .replace ('_' , '-' ),
@@ -82,7 +86,7 @@ def put_file_into_storage(self, dirName, fileName):
82
86
suffix , content_type = '' , 'text/plain'
83
87
self .service .create_blob_from_path (container_name = self .container ,
84
88
blob_name = fileName + suffix ,
85
- file_path = fileName ,
89
+ file_path = file_path ,
86
90
content_settings = ContentSettings (content_type = content_type ),
87
91
max_connections = self .max_connections
88
92
) # max_retries and retry_wait no longer arguments in azure 0.33
@@ -113,7 +117,8 @@ def __init__(self,
113
117
zip_compression = False ,
114
118
max_connections = 1 ,
115
119
max_retries = 5 ,
116
- retry_wait = 1.0 ):
120
+ retry_wait = 1.0 ,
121
+ is_emulated = False ):
117
122
meta = {'hostname' : gethostname (), 'process' : os .getpid ()}
118
123
RotatingFileHandler .__init__ (self ,
119
124
filename % meta ,
@@ -123,14 +128,15 @@ def __init__(self,
123
128
encoding = encoding ,
124
129
delay = delay )
125
130
_BlobStorageFileHandler .__init__ (self ,
126
- account_name ,
127
- account_key ,
128
- protocol ,
129
- container ,
130
- zip_compression ,
131
- max_connections ,
132
- max_retries ,
133
- retry_wait )
131
+ account_name = account_name ,
132
+ account_key = account_key ,
133
+ protocol = protocol ,
134
+ container = container ,
135
+ zip_compression = zip_compression ,
136
+ max_connections = max_connections ,
137
+ max_retries = max_retries ,
138
+ retry_wait = retry_wait ,
139
+ is_emulated = is_emulated )
134
140
135
141
def doRollover (self ):
136
142
"""
@@ -172,7 +178,8 @@ def __init__(self,
172
178
zip_compression = False ,
173
179
max_connections = 1 ,
174
180
max_retries = 5 ,
175
- retry_wait = 1.0 ):
181
+ retry_wait = 1.0 ,
182
+ is_emulated = False ):
176
183
meta = {'hostname' : gethostname (), 'process' : os .getpid ()}
177
184
TimedRotatingFileHandler .__init__ (self ,
178
185
filename % meta ,
@@ -183,14 +190,15 @@ def __init__(self,
183
190
delay = delay ,
184
191
utc = utc )
185
192
_BlobStorageFileHandler .__init__ (self ,
186
- account_name ,
187
- account_key ,
188
- protocol ,
189
- container ,
190
- zip_compression ,
191
- max_connections ,
192
- max_retries ,
193
- retry_wait )
193
+ account_name = account_name ,
194
+ account_key = account_key ,
195
+ protocol = protocol ,
196
+ container = container ,
197
+ zip_compression = zip_compression ,
198
+ max_connections = max_connections ,
199
+ max_retries = max_retries ,
200
+ retry_wait = retry_wait ,
201
+ is_emulated = is_emulated )
194
202
195
203
def emit (self , record ):
196
204
"""
@@ -233,13 +241,15 @@ def __init__(self,
233
241
message_ttl = None ,
234
242
visibility_timeout = None ,
235
243
base64_encoding = False ,
244
+ is_emulated = False ,
236
245
):
237
246
"""
238
247
Initialize the handler.
239
248
"""
240
249
logging .Handler .__init__ (self )
241
250
self .service = QueueService (account_name = account_name ,
242
251
account_key = account_key ,
252
+ is_emulated = is_emulated ,
243
253
protocol = protocol )
244
254
self .meta = {'hostname' : gethostname (), 'process' : os .getpid ()}
245
255
self .queue = _formatName (queue , self .meta )
@@ -272,6 +282,10 @@ def emit(self, record):
272
282
def _encode_text (self , text ):
273
283
if self .base64_encoding :
274
284
text = b64encode (text .encode ('utf-8' )).decode ('ascii' )
285
+ # fallback for the breaking change in azure-storage 0.33
286
+ elif sys .version_info < (3 ,):
287
+ if not isinstance (text , unicode ):
288
+ text = text .decode ('utf-8' )
275
289
return text
276
290
277
291
@@ -290,13 +304,15 @@ def __init__(self,
290
304
extra_properties = None ,
291
305
partition_key_formatter = None ,
292
306
row_key_formatter = None ,
307
+ is_emulated = False ,
293
308
):
294
309
"""
295
310
Initialize the handler.
296
311
"""
297
312
logging .Handler .__init__ (self )
298
313
self .service = TableService (account_name = account_name ,
299
314
account_key = account_key ,
315
+ is_emulated = is_emulated ,
300
316
protocol = protocol )
301
317
self .meta = {'hostname' : gethostname (), 'process' : os .getpid ()}
302
318
self .table = _formatName (table , self .meta )
@@ -327,10 +343,10 @@ def __init__(self,
327
343
self .extra_property_formatters [extra ] = f
328
344
self .extra_property_names [extra ] = self ._getFormatName (extra )
329
345
# the storage emulator doesn't support batch operations
330
- if batch_size <= 1 or self . service . use_local_storage :
331
- self .batch = False
346
+ if batch_size <= 1 or is_emulated :
347
+ self .batch = None
332
348
else :
333
- self .batch = True
349
+ self .batch = TableBatch ()
334
350
if batch_size > TableStorageHandler .MAX_BATCH_SIZE :
335
351
self .batch_size = TableStorageHandler .MAX_BATCH_SIZE
336
352
else :
@@ -369,8 +385,6 @@ def emit(self, record):
369
385
try :
370
386
if not self .ready :
371
387
self .service .create_table (self .table )
372
- if self .batch :
373
- self .service .begin_batch ()
374
388
self .ready = True
375
389
# generate partition key for the entity
376
390
record .hostname = self .meta ['hostname' ]
@@ -394,12 +408,13 @@ def emit(self, record):
394
408
copy .rowno = self .rowno
395
409
row_key = self .row_key_formatter .format (copy )
396
410
# add entitiy to the table
397
- self .service .insert_or_replace_entity (self .table ,
398
- partition_key ,
399
- row_key ,
400
- entity )
401
- # commit the ongoing batch if it reaches the high mark
402
- if self .batch :
411
+ entity ['PartitionKey' ] = partition_key
412
+ entity ['RowKey' ] = row_key
413
+ if not self .batch :
414
+ self .service .insert_or_replace_entity (self .table , entity )
415
+ else :
416
+ self .batch .insert_or_replace_entity (entity )
417
+ # commit the ongoing batch if it reaches the high mark
403
418
self .rowno += 1
404
419
if self .rowno >= self .batch_size :
405
420
self .flush ()
@@ -414,10 +429,10 @@ def flush(self):
414
429
"""
415
430
if self .batch and self .rowno > 0 :
416
431
try :
417
- self .service .commit_batch ()
432
+ self .service .commit_batch (self . table , self . batch )
418
433
finally :
419
434
self .rowno = 0
420
- self .service . begin_batch ()
435
+ self .batch = TableBatch ()
421
436
422
437
def setFormatter (self , fmt ):
423
438
"""
0 commit comments