This repository was archived by the owner on May 1, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdynamodb-cli
More file actions
executable file
·242 lines (208 loc) · 8.91 KB
/
dynamodb-cli
File metadata and controls
executable file
·242 lines (208 loc) · 8.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import gevent
from gevent import monkey; monkey.patch_all(thread=False)
from itertools import izip, izip_longest
from boto.exception import BotoServerError
from boto.dynamodb.exceptions import DynamoDBValidationError
from utils import utils
import json
import logging
import math
import os
import random
import subprocess
import sys
import time
import argparse
import boto.dynamodb
import gevent.pool
import gevent.queue
#import ipdb
BATCHSIZE = 25 # This is the max for DynamoDB's batch update
datalogger = logging.getLogger('validation-errors')
logger = logging.getLogger('dynamo-cli')
logger.setLevel(logging.INFO)
dfh = logging.FileHandler('validation-errors.log')
dfh.setLevel(logging.INFO)
fh = logging.FileHandler('dynamo-cli.log')
fh.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
formatter = logging.Formatter('%(message)s')
dfh.setFormatter(formatter)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
datalogger.addHandler(dfh)
logger.addHandler(fh)
logger.addHandler(ch)
def pool_single_responses(table, conn, batch):
"""If get_batch_response fails then retry each item in the batch individually."""
greenlets = []
pool = gevent.pool.Pool()
mytable = table
for bwrite in batch:
mydata = bwrite.to_dict()
for row in mydata[1]:
record = row['PutRequest']['Item']
hkey = int(record['personid']['N'])
del(record['personid'])
# Unwrapping the dynamo Batch Item gives us strings instead of Nums
typed_attrs = {}
raw_attrs = dict(zip(record.keys(), [ unicode(x.values()[0], errors='replace') for x in record.values() ]))
for k, v in raw_attrs.items():
k = k.strip()
v = v.strip()
try:
numeric_diff = float(v) - math.floor(float(v))
if k == "street_number" or k == "dma_id":
typed_attrs[k] = str(v)
elif numeric_diff == 0.0:
typed_attrs[k] = int(float(v))
elif numeric_diff > 0.0:
typed_attrs[k] = float(v)
else:
typed_attrs[k] = v
except ValueError:
typed_attrs[k] = v
record = mytable.new_item(
hash_key = hkey,
attrs = typed_attrs
)
greenlets.append( pool.spawn(get_single_response, record))
#logger.info("pooled %s" % record)
logger.info("running single response greenlets")
gevent.joinall(greenlets)
def get_single_response(record):
try:
record.put()
logger.info("%s written" % record)
return None
except UnicodeDecodeError, e:
logger.error(e)
s = json.dumps(record, sort_keys=True, indent=4)
datalogger.error( "%s" % '\n'.join([l.rstrip() for l in s.splitlines()]) )
return None
except DynamoDBValidationError, e:
logger.error(e)
s = json.dumps(record, sort_keys=True, indent=4)
datalogger.error( "%s" % '\n'.join([l.rstrip() for l in s.splitlines()]) )
return None
def get_batch_response(table, conn, batch):
"""Do a batchwrite to dynamodb. If we're being throttled because we've
exceeded our write capacity, Dynamo will return a BotoServerError. In that
case, just sleep for 0-10 seconds and try again...and again...and
again..."""
mytable = table
try:
dynresponse = conn.batch_write_item(batch)
return dynresponse
except UnicodeDecodeError, e:
return pool_single_responses(mytable, conn, batch)
except DynamoDBValidationError, e:
return pool_single_responses(mytable, conn, batch)
except BotoServerError, e:
logger.info( e)
duration = random.randint(1, 10)
logger.info("sleeping %d seconds" % duration)
time.sleep(duration)
return get_batch_response(mytable, conn, batch)
def submit_batch(mytable, conn, queue):
while True:
batch = queue.get()
logger.info("%s grabbed a work item: qsize: %s" % (os.getpid(), queue.qsize()))
#ipdb.set_trace()
starttime = time.time()
try:
response = get_batch_response(mytable, conn, batch)
if not response:
continue
unprocessed = response.get('UnprocessedItems', None)
if not unprocessed:
elapsedtime = time.time() - starttime
logger.info("%s completed work in %s" % (os.getpid(), elapsedtime))
queue.task_done()
gevent.sleep(0)
continue
else:
logger.debug("%d items for %s not processed" % (len(unprocessed), batch))
batch = conn.new_batch_write_list()
unprocessed_list = unprocessed[mytable._dict['TableName']]
batch_items = []
for u in unprocessed_list:
item_attr = u['PutRequest']['Item']
item = mytable.new_item(
attrs=item_attr
)
batch_items.append(item)
batch.add_batch(mytable, puts=batch_items)
except Exception, e:
logger.info(e)
def dynamoit(filename, key, queue, conn, mytable, dynamo_cols=None):
with open(filename, 'r') as f:
if not dynamo_cols:
dynamo_cols = f.readline().rstrip().split('\t')
logger.info("Started reading from %s" % filename)
# Get a tuple of BATCHSIZE lines
for lines in izip_longest(*[f] * BATCHSIZE):
batch = conn.new_batch_write_list()
batch_items = []
for line in lines:
if line:
# File delimiter we're looking for is at the right of the next line
raw_data = dict((izip(dynamo_cols, line.strip().split('\t'))))
data = {}
for k, v in raw_data.items():
if v: # No empty attributes allowed by DynamoDB
try:
numeric_diff = float(v) - math.floor(float(v))
if k == "street_number" == "dma_id":
data[k] = str(v)
elif numeric_diff == 0.0:
data[k] = int(float(v))
elif numeric_diff > 0.0:
data[k] = float(v)
else:
data[k] = v
except ValueError:
data[k] = v
batch_items.append(mytable.new_item(hash_key=(data[key]), attrs=data))
batch.add_batch(mytable, puts=batch_items)
gevent.sleep(0)
queue.put(batch)
logger.info("%s added to queue: size: %s" % (os.getpid(), queue.qsize()))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--columns', help='An unquoted, comma-delimited list of dynamo column headers (personid, first_name); otherwise we assume the first line of the file contains column headers')
parser.add_argument('-cf', '--credfile', help='File containing AWS credentials. Contains 2 lines like AWSAccessKeyId=ABCDE\nAWSSecretKey=12345\n')
parser.add_argument('-f', '--file', help='The input filename; the first line is discarded; the remainder is expected to be tab-delimited')
parser.add_argument('-i', '--id', help='AWS Access Key ID')
parser.add_argument('-k', '--key', help='AWS Secret Key')
parser.add_argument('-H', '--hash', help='DynamoDB hash key')
parser.add_argument('-t', '--table', help='DynamoDB table name', required=True)
parser.add_argument('-T', '--throttle', action='store_true', help='Alter table capcaity. See --read and --write. All other actions ignored!')
parser.add_argument('-R', '--read', help='How many read capacity units if in --throttle mode.')
parser.add_argument('-W', '--write', help='How many write capacity units if in --throttle mode.')
args = parser.parse_args()
AWSAccessKeyId, AWSSecretKey = utils.aws_authentication(args)
table = args.table
if args.throttle:
logger.info("Updating %s to %s read capacity and %s write capcity"
% (table, args.read, args.write))
myconn, mytable = utils.get_table(table, AWSAccessKeyId, AWSSecretKey)
utils.alter_capacity(table, int(args.read), int(args.write),
AWSAccessKeyId, AWSSecretKey)
sys.exit(0)
if not args.throttle and not args.file:
print >> sys.stdout, "Since we're not updating table capacity, I need a file to import"
sys.exit(1)
if args.columns:
dynamo_cols = args.columns.split(',')
myconn, mytable = utils.get_table(table, AWSAccessKeyId, AWSSecretKey)
# This is typically run on an AWS cc.8xl machine so 32 "cores"
queue = gevent.queue.JoinableQueue(512)
pool = gevent.pool.Pool(1024)
pool.spawn(submit_batch, mytable, myconn, queue)
pool.spawn(submit_batch, mytable, myconn, queue)
dynamoit(args.file, dynamo_cols, args.hash, queue, myconn, mytable)
queue.join()