Class: Webhookdb::SyncTarget::Routine

Inherits:
Object
  • Object
show all
Defined in:
lib/webhookdb/sync_target.rb

Direct Known Subclasses

DatabaseRoutine, HttpRoutine

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(now, sync_target) ⇒ Routine

Returns a new instance of Routine.



392
393
394
395
396
397
398
# File 'lib/webhookdb/sync_target.rb', line 392

def initialize(now, sync_target)
  @now = now
  @sync_target = sync_target
  @last_synced_at = sync_target.last_synced_at
  @replicator = sync_target.service_integration.replicator
  @timestamp_expr = Sequel[@replicator.timestamp_column.name]
end

Instance Attribute Details

#last_synced_atTime

Returns:

  • (Time)


389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/webhookdb/sync_target.rb', line 389

class Routine
  attr_reader :now, :sync_target, :replicator, :timestamp_expr

  def initialize(now, sync_target)
    @now = now
    @sync_target = sync_target
    @last_synced_at = sync_target.last_synced_at
    @replicator = sync_target.service_integration.replicator
    @timestamp_expr = Sequel[@replicator.timestamp_column.name]
  end

  def run = raise NotImplementedError

  # Get the dataset of rows that need to be synced.
  # Note that there are a couple race conditions here.
  # First, those in https://github.com/webhookdb/webhookdb/issues/571.
  # There is also the condition that we could send the same row
  # multiple times when the row timestamp is set to last_synced_at but
  # it wasn't in the last sync; however that is likely not a big problem
  # since clients need to handle updates in any case.
  def dataset_to_sync
    # Use admin dataset, since the client could be using all their readonly conns.
    @replicator.admin_dataset do |ds|
      # Find rows updated before we started
      tscond = (@timestamp_expr <= @now)
      # Find rows updated after the last sync was run
      @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at))
      ds = ds.where(tscond)
      # We want to paginate from oldest to newest
      ds = ds.order(@timestamp_expr)
      yield(ds)
    end
  end

  def perform_db_op(&)
    yield
  rescue Sequel::NoExistingObject => e
    raise Webhookdb::SyncTarget::Deleted, e
  end

  def record(last_synced_at)
    self.perform_db_op do
      self.sync_target.update(last_synced_at:)
    end
  end

  def with_stat(&)
    start = Time.now
    begin
      yield
      self.sync_target.add_sync_stat(start)
    rescue Webhookdb::Http::Error => e
      self.sync_target.add_sync_stat(start, response_status: e.status)
      raise
    rescue StandardError => e
      self.sync_target.add_sync_stat(start, exception: e)
      raise
    end
  end

  def to_ms(t)
    return (t.to_f * 1000).to_i
  end
end

#nowObject (readonly)

Returns the value of attribute now.



390
391
392
# File 'lib/webhookdb/sync_target.rb', line 390

def now
  @now
end

#replicatorObject (readonly)

Returns the value of attribute replicator.



390
391
392
# File 'lib/webhookdb/sync_target.rb', line 390

def replicator
  @replicator
end

#sync_targetObject (readonly)

Returns the value of attribute sync_target.



390
391
392
# File 'lib/webhookdb/sync_target.rb', line 390

def sync_target
  @sync_target
end

#timestamp_exprObject (readonly)

Returns the value of attribute timestamp_expr.



390
391
392
# File 'lib/webhookdb/sync_target.rb', line 390

def timestamp_expr
  @timestamp_expr
end

Instance Method Details

#dataset_to_syncObject

Get the dataset of rows that need to be synced. Note that there are a couple race conditions here. First, those in github.com/webhookdb/webhookdb/issues/571. There is also the condition that we could send the same row multiple times when the row timestamp is set to last_synced_at but it wasn’t in the last sync; however that is likely not a big problem since clients need to handle updates in any case.



409
410
411
412
413
414
415
416
417
418
419
420
421
# File 'lib/webhookdb/sync_target.rb', line 409

def dataset_to_sync
  # Use admin dataset, since the client could be using all their readonly conns.
  @replicator.admin_dataset do |ds|
    # Find rows updated before we started
    tscond = (@timestamp_expr <= @now)
    # Find rows updated after the last sync was run
    @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at))
    ds = ds.where(tscond)
    # We want to paginate from oldest to newest
    ds = ds.order(@timestamp_expr)
    yield(ds)
  end
end

#perform_db_opObject



423
424
425
426
427
# File 'lib/webhookdb/sync_target.rb', line 423

def perform_db_op(&)
  yield
rescue Sequel::NoExistingObject => e
  raise Webhookdb::SyncTarget::Deleted, e
end

#record(last_synced_at) ⇒ Object



429
430
431
432
433
# File 'lib/webhookdb/sync_target.rb', line 429

def record(last_synced_at)
  self.perform_db_op do
    self.sync_target.update(last_synced_at:)
  end
end

#runObject

Raises:

  • (NotImplementedError)


400
# File 'lib/webhookdb/sync_target.rb', line 400

def run = raise NotImplementedError

#to_ms(t) ⇒ Object



449
450
451
# File 'lib/webhookdb/sync_target.rb', line 449

def to_ms(t)
  return (t.to_f * 1000).to_i
end

#with_statObject



435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/webhookdb/sync_target.rb', line 435

def with_stat(&)
  start = Time.now
  begin
    yield
    self.sync_target.add_sync_stat(start)
  rescue Webhookdb::Http::Error => e
    self.sync_target.add_sync_stat(start, response_status: e.status)
    raise
  rescue StandardError => e
    self.sync_target.add_sync_stat(start, exception: e)
    raise
  end
end