Class: MoSQL::Streamer

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/mosql/streamer.rb

Constant Summary collapse

BATCH =
1000
NEW_KEYS =
[:options, :tailer, :mongo, :sql, :schema]

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log

Constructor Details

#initialize(opts) ⇒ Streamer

Returns a new instance of Streamer.



11
12
13
14
15
16
17
18
19
20
# File 'lib/mosql/streamer.rb', line 11

def initialize(opts)
  NEW_KEYS.each do |parm|
    unless opts.key?(parm)
      raise ArgumentError.new("Required argument `#{parm}' not provided to #{self.class.name}#new.")
    end
    instance_variable_set(:"@#{parm.to_s}", opts[parm])
  end

  @done    = false
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



7
8
9
# File 'lib/mosql/streamer.rb', line 7

def options
  @options
end

#tailerObject (readonly)

Returns the value of attribute tailer.



7
8
9
# File 'lib/mosql/streamer.rb', line 7

def tailer
  @tailer
end

Instance Method Details

#bulk_upsert(table, ns, items) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/mosql/streamer.rb', line 51

def bulk_upsert(table, ns, items)
  begin
    @schema.copy_data(table.db, ns, items)
  rescue Sequel::DatabaseError => e
    log.debug("Bulk insert error (#{e}), attempting invidual upserts...")
    cols = @schema.all_columns(@schema.find_ns(ns))
    items.each do |it|
      h = {}
      cols.zip(it).each { |k,v| h[k] = v }
      unsafe_handle_exceptions(ns, h) do
        @sql.upsert!(table, @schema.primary_sql_key_for_ns(ns), h)
      end
    end
  end
end

#collection_for_ns(ns) ⇒ Object



32
33
34
35
# File 'lib/mosql/streamer.rb', line 32

def collection_for_ns(ns)
  dbname, collection = ns.split(".", 2)
  @mongo.db(dbname).collection(collection)
end

#did_truncateObject



130
# File 'lib/mosql/streamer.rb', line 130

def did_truncate; @did_truncate ||= {}; end

#handle_op(op) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/mosql/streamer.rb', line 192

def handle_op(op)
  log.debug("processing op: #{op.inspect}")
  unless op['ns'] && op['op']
    log.warn("Weird op: #{op.inspect}")
    return
  end

  # First, check if this was an operation performed via applyOps. If so, call handle_op with
  # for each op that was applied.
  # The oplog format of applyOps commands can be viewed here:
  # https://groups.google.com/forum/#!topic/mongodb-user/dTf5VEJJWvY
  if op['op'] == 'c' && (ops = op['o']['applyOps'])
    ops.each { |op| handle_op(op) }
    return
  end

  unless @schema.find_ns(op['ns'])
    log.debug("Skipping op for unknown ns #{op['ns']}...")
    return
  end

  ns = op['ns']
  dbname, collection_name = ns.split(".", 2)

  case op['op']
  when 'n'
    log.debug("Skipping no-op #{op.inspect}")
  when 'i'
    if collection_name == 'system.indexes'
      log.info("Skipping index update: #{op.inspect}")
    else
      unsafe_handle_exceptions(ns, op['o'])  do
        @sql.upsert_ns(ns, op['o'])
      end
    end
  when 'u'
    selector = op['o2']
    update   = op['o']
    if update.keys.any? { |k| k.start_with? '$' }
      log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})")
      sync_object(ns, selector)
    else

      # The update operation replaces the existing object, but
      # preserves its _id field, so grab the _id off of the
      # 'query' field -- it's not guaranteed to be present on the
      # update.
      primary_sql_keys = @schema.primary_sql_key_for_ns(ns)
      schema = @schema.find_ns!(ns)
      keys = {}
      primary_sql_keys.each do |key|
        source =  schema[:columns].find {|c| c[:name] == key }[:source]
        keys[source] = selector[source]
      end

      log.debug("upsert #{ns}: #{keys}")

      update = keys.merge(update)
      unsafe_handle_exceptions(ns, update) do
        @sql.upsert_ns(ns, update)
      end
    end
  when 'd'
    if options[:ignore_delete]
      log.debug("Ignoring delete op on #{ns} as instructed.")
    else
      @sql.delete_ns(ns, op['o'])
    end
  else
    log.info("Skipping unknown op #{op.inspect}")
  end
end

#importObject



26
27
28
29
30
# File 'lib/mosql/streamer.rb', line 26

def import
  if options[:reimport] || tailer.read_position.nil?
    initial_import
  end
end

#import_collection(ns, collection, filter) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/mosql/streamer.rb', line 132

def import_collection(ns, collection, filter)
  log.info("Importing for #{ns}...")
  count = 0
  batch = []
  table = @sql.table_for_ns(ns)
  unless options[:no_drop_tables] || did_truncate[table.first_source]
    table.truncate
    did_truncate[table.first_source] = true
  end

  start    = Time.now
  sql_time = 0
  collection.find(filter, :batch_size => BATCH) do |cursor|
    with_retries do
      cursor.each do |obj|
        batch << @schema.transform(ns, obj)
        count += 1

        if batch.length >= BATCH
          sql_time += track_time do
            bulk_upsert(table, ns, batch)
          end
          elapsed = Time.now - start
          log.info("Imported #{count} rows (#{elapsed}s, #{sql_time}s SQL)...")
          batch.clear
          exit(0) if @done
        end
      end
    end
  end

  unless batch.empty?
    bulk_upsert(table, ns, batch)
  end
end

#initial_importObject



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/mosql/streamer.rb', line 89

def initial_import
  @schema.create_schema(@sql.db, !options[:no_drop_tables])

  unless options[:skip_tail]
    start_state = {
      'time' => nil,
      'position' => @tailer.most_recent_position
    }
  end

  dbnames = []

  if options[:dbname]
    log.info "Skipping DB scan and using db: #{options[:dbname]}"
    dbnames = [ options[:dbname] ]
  else
    dbnames = @mongo.database_names
  end

  dbnames.each do |dbname|
    spec = @schema.find_db(dbname)

    if(spec.nil?)
      log.info("Mongd DB '#{dbname}' not found in config file. Skipping.")
      next
    end

    log.info("Importing for Mongo DB #{dbname}...")
    db = @mongo.db(dbname)
    collections = db.collections.select { |c| spec.key?(c.name) }

    collections.each do |collection|
      ns = "#{dbname}.#{collection.name}"
      import_collection(ns, collection, spec[collection.name][:meta][:filter])
      exit(0) if @done
    end
  end

  tailer.save_state(start_state) unless options[:skip_tail]
end

#optailObject



168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/mosql/streamer.rb', line 168

def optail
  tail_from = options[:tail_from]
  if tail_from.is_a? Time
    tail_from = tailer.most_recent_position(tail_from)
  end
  tailer.tail(:from => tail_from, :filter => options[:oplog_filter])
  until @done
    tailer.stream(1000) do |op|
      handle_op(op)
    end
  end
end

#stopObject



22
23
24
# File 'lib/mosql/streamer.rb', line 22

def stop
  @done = true
end

#sync_object(ns, selector) ⇒ Object



181
182
183
184
185
186
187
188
189
190
# File 'lib/mosql/streamer.rb', line 181

def sync_object(ns, selector)
  obj = collection_for_ns(ns).find_one(selector)
  if obj
    unsafe_handle_exceptions(ns, obj) do
      @sql.upsert_ns(ns, obj)
    end
  else
    @sql.delete_ns(ns, selector)
  end
end

#track_timeObject



83
84
85
86
87
# File 'lib/mosql/streamer.rb', line 83

def track_time
  start = Time.now
  yield
  Time.now - start
end

#unsafe_handle_exceptions(ns, obj) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/mosql/streamer.rb', line 37

def unsafe_handle_exceptions(ns, obj)
  begin
    yield
  rescue Sequel::DatabaseError => e
    wrapped = e.wrapped_exception
    if wrapped.result && options[:unsafe]
      log.warn("Ignoring row (#{obj.inspect}): #{e}")
    else
      log.error("Error processing #{obj.inspect} for #{ns}.")
      raise e
    end
  end
end

#with_retries(tries = 10) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/mosql/streamer.rb', line 67

def with_retries(tries=10)
  tries.times do |try|
    begin
      yield
    rescue Mongo::ConnectionError, Mongo::ConnectionFailure, Mongo::OperationFailure => e
      # Duplicate key error
      raise if e.kind_of?(Mongo::OperationFailure) && [11000, 11001].include?(e.error_code)
      # Cursor timeout
      raise if e.kind_of?(Mongo::OperationFailure) && e.message =~ /^Query response returned CURSOR_NOT_FOUND/
      delay = 0.5 * (1.5 ** try)
      log.warn("Mongo exception: #{e}, sleeping #{delay}s...")
      sleep(delay)
    end
  end
end