Skip to content
This repository has been archived by the owner on Dec 22, 2020. It is now read-only.

Karl-batch-inserts-fixes #137

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e7be442
Batch inserts
Sep 18, 2014
13000cd
bikash changes
ketanrathod Sep 20, 2018
87fdac9
folder for haystack
ketanrathod Nov 7, 2018
954452a
not ignoring collections.yml
ketanrathod Nov 7, 2018
7894e72
variable for usr/pwd
ketanrathod Nov 7, 2018
3146568
template to make conf file
ketanrathod Nov 7, 2018
0e7bab0
deploy script
ketanrathod Nov 7, 2018
5e80b61
logging all commands
ketanrathod Nov 7, 2018
c00d6fe
correcting the path
ketanrathod Nov 7, 2018
fa5373a
adding service name and yml dir
ketanrathod Nov 8, 2018
8e90985
separating events from other collections
ketanrathod Nov 8, 2018
9b7a5ec
events and misc folders
ketanrathod Nov 8, 2018
c252a4c
making sure all haystack related stuff is in haystack dir
ketanrathod Nov 8, 2018
1ec1798
removing the xx, was added for safety
ketanrathod Nov 8, 2018
534dfdd
trying out banner for small size
ketanrathod Nov 8, 2018
1a6cc44
removing empty spaces
ketanrathod Nov 8, 2018
788432c
trying out events now from a point in recent past
ketanrathod Nov 8, 2018
11cbc0d
accepting cmd line param
ketanrathod Nov 8, 2018
74492d1
separating playlist instead of events
ketanrathod Nov 8, 2018
dd8c660
plural playlist
ketanrathod Nov 8, 2018
061621e
adding prod details
ketanrathod Nov 8, 2018
141acb7
don't need these folders anymore
ketanrathod Nov 14, 2018
706fbcf
splitting users and events into its separate folders
ketanrathod Nov 14, 2018
85b6061
delete after testing
ketanrathod Nov 14, 2018
b3dfb27
erroring
ketanrathod Dec 20, 2018
f862ee7
master slave urls for prod
ketanrathod Dec 20, 2018
00c07d8
combining playlists and events and keeping users alone
ketanrathod Jan 17, 2019
6402698
changing to new user mosqluser
ketanrathod May 7, 2019
8468fe9
trying to test with channels table since its small
ketanrathod May 7, 2019
d5e9172
reverting to using users table instead of channels table
ketanrathod May 7, 2019
3a2136f
temp change to revert soon
ketanrathod Jun 11, 2019
f23a4a3
reverting
ketanrathod Jun 11, 2019
0149d78
new column campaigndata
ketanrathod Feb 4, 2020
b1dba7f
new slave
ketanrathod Feb 29, 2020
442d492
new slave 2
ketanrathod Mar 2, 2020
56bedfa
removing the creds from git
ketanrathod Mar 25, 2020
105bdbc
gitignore
Mar 31, 2020
2965a70
haystack
Mar 31, 2020
86e7919
Merge branch 'master' into karl-batch-inserts3
Mar 31, 2020
b2b3da1
disable auto save
Apr 7, 2020
4ff80ee
fix for SIGTERM signal and insert error handle
Apr 7, 2020
07335a5
.gitignore
Apr 7, 2020
8b0fc34
.gitignore
Apr 7, 2020
e602b44
Merge branch 'master' into karl-batch-inserts3
Apr 7, 2020
37de152
undo minor change
Apr 7, 2020
29f80b5
unnecessary files
Apr 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
collections.yml
/.bundle/
Gemfile.lock
Gemfile.lock
9 changes: 7 additions & 2 deletions lib/mosql/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def run
metadata_table = MoSQL::Tailer.create_table(@sql.db, 'mosql_tailers')

@tailer = MoSQL::Tailer.new([@mongo], :existing, metadata_table,
:service => options[:service])
{:service => options[:service], :batch => true})

@streamer = Streamer.new(:options => @options,
:tailer => @tailer,
Expand All @@ -173,7 +173,12 @@ def run
end

unless options[:skip_tail]
@streamer.optail
begin
@streamer.optail
rescue => e
puts 'Unexpected error. Attempting to save'
@streamer.saveAll(true)
end
end
end
end
Expand Down
7 changes: 5 additions & 2 deletions lib/mosql/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,11 @@ def transform(ns, obj, schema=nil)

# Do a deep clone, because we're potentially going to be
# mutating embedded objects.
obj = BSON.deserialize(BSON.serialize(obj))

# Bikash - Sep 8, 2016
# failure due ot large doc. changing below line
# obj = BSON.deserialize(BSON.serialize(obj))
obj = BSON.deserialize(BSON::BSON_CODER.serialize(obj, false, false, 16*1024*1024))

row = []
schema[:columns].each do |col|

Expand Down
87 changes: 78 additions & 9 deletions lib/mosql/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ module MoSQL
class Streamer
include MoSQL::Logging

BATCH = 1000
BATCH_SIZE = 1000
# How long to wait before saving unsaved inserts to postgres
INSERT_BATCH_TIMELIMIT = 5.0

attr_reader :options, :tailer
attr_reader :options, :tailer, :batch_done

NEW_KEYS = [:options, :tailer, :mongo, :sql, :schema]

Expand All @@ -16,7 +18,10 @@ def initialize(opts)
instance_variable_set(:"@#{parm.to_s}", opts[parm])
end

@done = false
# Hash to from namespace -> inserts that need to be made
@batch_insert_lists = Hash.new { |hash, key| hash[key] = [] }
@done = false
@batch_done = true
end

def stop
Expand All @@ -42,6 +47,7 @@ def unsafe_handle_exceptions(ns, obj)
if wrapped.result && options[:unsafe]
log.warn("Ignoring row (#{obj.inspect}): #{e}")
else
log.error("Erroring row (#{obj.inspect}): #{e}")
log.error("Error processing #{obj.inspect} for #{ns}.")
raise e
end
Expand All @@ -51,7 +57,8 @@ def unsafe_handle_exceptions(ns, obj)
def bulk_upsert(table, ns, items)
begin
@schema.copy_data(table.db, ns, items)
rescue Sequel::DatabaseError => e
@batch_done = true
rescue => e #Sequel::DatabaseError
log.debug("Bulk insert error (#{e}), attempting invidual upserts...")
cols = @schema.all_columns(@schema.find_ns(ns))
items.each do |it|
Expand All @@ -61,6 +68,7 @@ def bulk_upsert(table, ns, items)
@sql.upsert!(table, @schema.primary_sql_key_for_ns(ns), h)
end
end
@batch_done = true
end
end

Expand Down Expand Up @@ -141,13 +149,13 @@ def import_collection(ns, collection, filter)

start = Time.now
sql_time = 0
collection.find(filter, :batch_size => BATCH) do |cursor|
collection.find(filter, :batch_size => BATCH_SIZE) do |cursor|
with_retries do
cursor.each do |obj|
batch << @schema.transform(ns, obj)
count += 1

if batch.length >= BATCH
if batch.length >= BATCH_SIZE
sql_time += track_time do
bulk_upsert(table, ns, batch)
end
Expand All @@ -165,19 +173,48 @@ def import_collection(ns, collection, filter)
end
end

def saveAll(force = false)
return unless @batch_done
if force
tailer.save_state
else
tailer.maybe_save_state
end
end

def optail
tail_from = options[:tail_from]
if tail_from.is_a? Time
tail_from = tailer.most_recent_position(tail_from)
end

last_batch_insert = Time.now
tailer.tail(:from => tail_from, :filter => options[:oplog_filter])
until @done
tailer.stream(1000) do |op|
handle_op(op)
end
time = Time.now
if time - last_batch_insert >= INSERT_BATCH_TIMELIMIT
last_batch_insert = time
do_batch_inserts
end

saveAll
end

log.info("Finishing, doing last batch inserts.")
do_batch_inserts
saveAll
end

# Handle $set, $inc and other operators in updates. Done by querying
# mongo and setting the value to whatever mongo holds at the time.
# Note that this somewhat messes with consistency as postgres will be
# "ahead" everything else if tailer is behind.
#
# If no such object is found, try to delete according to primary keys that
# must be present in selector (and not behind $set and etc).
def sync_object(ns, selector)
obj = collection_for_ns(ns).find_one(selector)
if obj
Expand All @@ -189,6 +226,36 @@ def sync_object(ns, selector)
end
end

# Add this op to be batch inserted to namespace
# next time a non-insert happens
def queue_to_batch_insert(op, namespace)
@batch_insert_lists[namespace] << @schema.transform(namespace, op['o'])
if @batch_insert_lists[namespace].length >= BATCH_SIZE
do_batch_inserts(namespace)
end
end

# Do a batch insert for that namespace, putting data to postgres.
# If no namespace is given, all namespaces are done
def do_batch_inserts(namespace=nil)
if namespace.nil?
@batch_insert_lists.keys.each do |ns|
do_batch_inserts(ns)
end
else
to_batch = @batch_insert_lists[namespace]
@batch_insert_lists[namespace] = []
if to_batch.empty?
@batch_done = true
return
end

table = @sql.table_for_ns(namespace)
log.debug("Batch inserting #{to_batch.length} items to #{table} from #{namespace}.")
bulk_upsert(table, namespace, to_batch)
end
end

def handle_op(op)
log.debug("processing op: #{op.inspect}")
unless op['ns'] && op['op']
Expand Down Expand Up @@ -220,9 +287,8 @@ def handle_op(op)
if collection_name == 'system.indexes'
log.info("Skipping index update: #{op.inspect}")
else
unsafe_handle_exceptions(ns, op['o']) do
@sql.upsert_ns(ns, op['o'])
end
@batch_done = false
queue_to_batch_insert(op, ns)
end
when 'u'
selector = op['o2']
Expand All @@ -231,6 +297,7 @@ def handle_op(op)
log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})")
sync_object(ns, selector)
else
do_batch_inserts(ns)

# The update operation replaces the existing object, but
# preserves its _id field, so grab the _id off of the
Expand All @@ -252,6 +319,8 @@ def handle_op(op)
end
end
when 'd'
do_batch_inserts(ns)

if options[:ignore_delete]
log.debug("Ignoring delete op on #{ns} as instructed.")
else
Expand Down
1 change: 1 addition & 0 deletions test/functional/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ def build_streamer
"ns" => "db.has_timestamp",
"o" => mongo['db']['has_timestamp'].find_one({_id: id})
})
@streamer.do_batch_inserts
got = @sequel[:has_timestamp].where(:_id => id.to_s).select.first[:ts]
assert_equal(ts.to_i, got.to_i)
assert_equal(ts.tv_usec, got.tv_usec)
Expand Down
1 change: 1 addition & 0 deletions test/functional/transform.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional
"ns" => "test.test_transform",
"o" => collection.find_one(_id: id)
})
streamer.do_batch_inserts

got = @sequel[:test_transform].where(_id: id).to_a
assert_equal(sql, got.first[:value], "was able to transform a #{typ} field while streaming")
Expand Down