From 15e250153d2b9d822bc797718d48524c0c26b236 Mon Sep 17 00:00:00 2001 From: Florian Meinel Date: Wed, 17 Jan 2018 17:16:01 +0100 Subject: [PATCH 1/4] add copy-all-script to move all contents of one fossildb into another --- client/copy-all-script | 76 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100755 client/copy-all-script diff --git a/client/copy-all-script b/client/copy-all-script new file mode 100755 index 0000000..3822eaf --- /dev/null +++ b/client/copy-all-script @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +import argparse +import grpc +import sys + +import fossildbapi_pb2 as proto +import fossildbapi_pb2_grpc as proto_rpc + + +def main(): + verbose = False + + collections = ['skeletons', 'volumes', 'volumeData', 'skeletonUpdates'] + + srcPort = 7156 + dstPort = 7155 + + listKeysBatchSize = 2 + + + + + srcChannel = grpc.insecure_channel('localhost:{}'.format(srcPort)) + srcStub = proto_rpc.FossilDBStub(srcChannel) + + dstChannel = grpc.insecure_channel('localhost:{}'.format(dstPort)) + dstStub = proto_rpc.FossilDBStub(dstChannel) + + testHealth(srcStub, 'source fossildb at {}'.format(srcPort)) + testHealth(dstStub, 'destination fossildb at {}'.format(dstPort)) + + putCount = 0 + + for collection in collections: + print('copying collection ' + collection) + lastKey = '' + while True: + listKeysReply = srcStub.ListKeys(proto.ListKeysRequest(collection=collection, limit=listKeysBatchSize, startAfterKey=lastKey)) + assertSuccess(listKeysReply) + if len(listKeysReply.keys) == 0: + break + if verbose: + print(' copying key batch ', listKeysReply.keys) + for key in listKeysReply.keys: + if verbose: + print(' copying key ', key) + getMultipleVersionsReply = srcStub.GetMultipleVersions(proto.GetMultipleVersionsRequest(collection=collection, key=key)) + assertSuccess(getMultipleVersionsReply) + for versionValueTuple in zip(getMultipleVersionsReply.versions, getMultipleVersionsReply.values): + if verbose: + print(' copying version ', versionValueTuple[0]) + putReply = dstStub.Put(proto.PutRequest(collection=collection, key=key, version=versionValueTuple[0], value=versionValueTuple[1])) + assertSuccess(putReply) + putCount += 1 + if (verbose and putCount % 10 == 0) or putCount % 10000 == 0: + print("total put count:", putCount) + + lastKey = listKeysReply.keys[-1] + print("total put count:", putCount) + +def testHealth(stub, label): + try: + reply = stub.Health(proto.HealthRequest()) + assertSuccess(reply) + print('successfully connected to ' + label) + except Exception as e: + print('failed to connect to ' + label + ': ' + str(e)) + sys.exit(1) + +def assertSuccess(reply): + if not reply.success: + raise Exception("reply.success failed: " + reply.errorMessage) + +if __name__ == '__main__': + main() From 23c07406761a8b977b8ccc3105bc846345ffb409 Mon Sep 17 00:00:00 2001 From: Florian Meinel Date: Wed, 17 Jan 2018 17:19:22 +0100 Subject: [PATCH 2/4] copy-all-script: increase batch size --- client/copy-all-script | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/copy-all-script b/client/copy-all-script index 3822eaf..e32c4d3 100755 --- a/client/copy-all-script +++ b/client/copy-all-script @@ -16,7 +16,7 @@ def main(): srcPort = 7156 dstPort = 7155 - listKeysBatchSize = 2 + listKeysBatchSize = 300 @@ -57,7 +57,7 @@ def main(): print("total put count:", putCount) lastKey = listKeysReply.keys[-1] - print("total put count:", putCount) + print("Done. total put count:", putCount) def testHealth(stub, label): try: From 3c4219cce9ae51c7d02a4a3121e04716c777ae1c Mon Sep 17 00:00:00 2001 From: Florian Meinel Date: Thu, 18 Jan 2018 11:40:32 +0100 Subject: [PATCH 3/4] add copy-some script, reading tracing references from json file --- client/copy-some-script | 67 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100755 client/copy-some-script diff --git a/client/copy-some-script b/client/copy-some-script new file mode 100755 index 0000000..6139215 --- /dev/null +++ b/client/copy-some-script @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +import json +import grpc +import sys + +import fossildbapi_pb2 as proto +import fossildbapi_pb2_grpc as proto_rpc + +def main(): + verbose = False + + collectionsByTyp = { + 'skeleton': ['skeletons', 'skeletonUpdates'], + 'volume': ['volumes', 'volumeData'] + } + + srcPort = 7156 + dstPort = 7156 + + tracingReferences = json.load(open('tracingReferences.json')) + + srcChannel = grpc.insecure_channel('localhost:{}'.format(srcPort)) + srcStub = proto_rpc.FossilDBStub(srcChannel) + + dstChannel = grpc.insecure_channel('localhost:{}'.format(dstPort)) + dstStub = proto_rpc.FossilDBStub(dstChannel) + + testHealth(srcStub, 'source fossildb at {}'.format(srcPort)) + testHealth(dstStub, 'destination fossildb at {}'.format(dstPort)) + + putCount = 0 + + for tracingReference in tracingReferences: + key = tracingReference['id'] + if verbose: + print(' copying key ', key) + for collection in collectionsByTyp[tracingReference['typ']]: + getMultipleVersionsReply = srcStub.GetMultipleVersions(proto.GetMultipleVersionsRequest(collection=collection, key=key)) + assertSuccess(getMultipleVersionsReply) + if len(getMultipleVersionsReply.versions) == 0: + print('[warn] no data for', key, 'in', collection) + for versionValueTuple in zip(getMultipleVersionsReply.versions, getMultipleVersionsReply.values): + if verbose: + print(' copying version ', versionValueTuple[0]) + putReply = dstStub.Put(proto.PutRequest(collection=collection, key=key, version=versionValueTuple[0], value=versionValueTuple[1])) + assertSuccess(putReply) + putCount += 1 + if (verbose and putCount % 10 == 0) or putCount % 10000 == 0: + print("total put count:", putCount) + print("Done. total put count:", putCount) + +def testHealth(stub, label): + try: + reply = stub.Health(proto.HealthRequest()) + assertSuccess(reply) + print('successfully connected to ' + label) + except Exception as e: + print('failed to connect to ' + label + ': ' + str(e)) + sys.exit(1) + +def assertSuccess(reply): + if not reply.success: + raise Exception("reply.success failed: " + reply.errorMessage) + +if __name__ == '__main__': + main() From de5bab0bf68d60bca7f6964e20be61f59216e09f Mon Sep 17 00:00:00 2001 From: root Date: Wed, 31 Jan 2018 14:00:12 +0100 Subject: [PATCH 4/4] adds client scripts --- client/copy-all-script | 18 +++++++++--------- client/copy-some-script | 14 +++++++++----- docker-compose.yaml | 8 ++++++++ 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/client/copy-all-script b/client/copy-all-script index e32c4d3..67bd683 100755 --- a/client/copy-all-script +++ b/client/copy-all-script @@ -7,24 +7,24 @@ import sys import fossildbapi_pb2 as proto import fossildbapi_pb2_grpc as proto_rpc +MAX_MESSAGE_LENGTH = 1073741824 def main(): - verbose = False + verbose = True collections = ['skeletons', 'volumes', 'volumeData', 'skeletonUpdates'] - srcPort = 7156 - dstPort = 7155 - listKeysBatchSize = 300 + srcPort = 2000 + dstPort = 7155 - - - srcChannel = grpc.insecure_channel('localhost:{}'.format(srcPort)) + srcChannel = grpc.insecure_channel('localhost:{}'.format(srcPort), options=[('grpc.max_send_message_length', MAX_MESSAGE_LENGTH), ( + 'grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)]) srcStub = proto_rpc.FossilDBStub(srcChannel) - dstChannel = grpc.insecure_channel('localhost:{}'.format(dstPort)) + dstChannel = grpc.insecure_channel('localhost:{}'.format(dstPort), options=[('grpc.max_send_message_length', MAX_MESSAGE_LENGTH), ( + 'grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)]) dstStub = proto_rpc.FossilDBStub(dstChannel) testHealth(srcStub, 'source fossildb at {}'.format(srcPort)) @@ -34,7 +34,7 @@ def main(): for collection in collections: print('copying collection ' + collection) - lastKey = '' + lastKey = None while True: listKeysReply = srcStub.ListKeys(proto.ListKeysRequest(collection=collection, limit=listKeysBatchSize, startAfterKey=lastKey)) assertSuccess(listKeysReply) diff --git a/client/copy-some-script b/client/copy-some-script index 6139215..7c9cc74 100755 --- a/client/copy-some-script +++ b/client/copy-some-script @@ -7,23 +7,27 @@ import sys import fossildbapi_pb2 as proto import fossildbapi_pb2_grpc as proto_rpc +MAX_MESSAGE_LENGTH = 1073741824 + def main(): - verbose = False + verbose = True collectionsByTyp = { 'skeleton': ['skeletons', 'skeletonUpdates'], 'volume': ['volumes', 'volumeData'] } - srcPort = 7156 - dstPort = 7156 + srcPort = 2000 + dstPort = 7155 tracingReferences = json.load(open('tracingReferences.json')) - srcChannel = grpc.insecure_channel('localhost:{}'.format(srcPort)) + srcChannel = grpc.insecure_channel('localhost:{}'.format(srcPort), options=[('grpc.max_send_message_length', MAX_MESSAGE_LENGTH), ( + 'grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)]) srcStub = proto_rpc.FossilDBStub(srcChannel) - dstChannel = grpc.insecure_channel('localhost:{}'.format(dstPort)) + dstChannel = grpc.insecure_channel('localhost:{}'.format(dstPort), options=[('grpc.max_send_message_length', MAX_MESSAGE_LENGTH), ( + 'grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)]) dstStub = proto_rpc.FossilDBStub(dstChannel) testHealth(srcStub, 'source fossildb at {}'.format(srcPort)) diff --git a/docker-compose.yaml b/docker-compose.yaml index d4a5903..70c5aff 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -21,6 +21,14 @@ services: links: - fossildb + client: + image: scalableminds/fossildb-client:${FOSSILDB_CLIENT_TAG:-master} + volumes: + - ".:/app" + working_dir: /app + entrypoint: /bin/bash + network_mode: host + sbt: image: scalableminds/sbt:${SBT_VERSION_TAG:-sbt-0.13.15_mongo-3.2.17_node-8.x_jdk-8} environment: