Class: Webhookdb::SyncTarget::Routine

Inherits:
Object
  • Object
show all
Defined in:
lib/webhookdb/sync_target.rb

Direct Known Subclasses

DatabaseRoutine, HttpRoutine

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(now, sync_target) ⇒ Routine

Returns a new instance of Routine.



312
313
314
315
316
317
318
# File 'lib/webhookdb/sync_target.rb', line 312

def initialize(now, sync_target)
  @now = now
  @sync_target = sync_target
  @last_synced_at = sync_target.last_synced_at
  @replicator = sync_target.service_integration.replicator
  @timestamp_expr = Sequel[@replicator.timestamp_column.name]
end

Instance Attribute Details

#last_synced_atTime

Returns:

  • (Time)


309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
# File 'lib/webhookdb/sync_target.rb', line 309

class Routine
  attr_reader :now, :sync_target, :replicator, :timestamp_expr

  def initialize(now, sync_target)
    @now = now
    @sync_target = sync_target
    @last_synced_at = sync_target.last_synced_at
    @replicator = sync_target.service_integration.replicator
    @timestamp_expr = Sequel[@replicator.timestamp_column.name]
  end

  def run = raise NotImplementedError

  # Get the dataset of rows that need to be synced.
  # Note that there are a couple race conditions here.
  # First, those in https://github.com/webhookdb/webhookdb/issues/571.
  # There is also the condition that we could send the same row
  # multiple times when the row timestamp is set to last_synced_at but
  # it wasn't in the last sync; however that is likely not a big problem
  # since clients need to handle updates in any case.
  def dataset_to_sync
    @replicator.readonly_dataset do |ds|
      # Find rows updated before we started
      tscond = (@timestamp_expr <= @now)
      # Find rows updated after the last sync was run
      @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at))
      ds = ds.where(tscond)
      # We want to paginate from oldest to newest
      ds = ds.order(@timestamp_expr)
      yield(ds)
    end
  end

  def record(last_synced_at)
    self.sync_target.update(last_synced_at:)
  rescue Sequel::NoExistingObject => e
    raise Webhookdb::SyncTarget::Deleted, e
  end
end

#nowObject (readonly)

Returns the value of attribute now.



310
311
312
# File 'lib/webhookdb/sync_target.rb', line 310

def now
  @now
end

#replicatorObject (readonly)

Returns the value of attribute replicator.



310
311
312
# File 'lib/webhookdb/sync_target.rb', line 310

def replicator
  @replicator
end

#sync_targetObject (readonly)

Returns the value of attribute sync_target.



310
311
312
# File 'lib/webhookdb/sync_target.rb', line 310

def sync_target
  @sync_target
end

#timestamp_exprObject (readonly)

Returns the value of attribute timestamp_expr.



310
311
312
# File 'lib/webhookdb/sync_target.rb', line 310

def timestamp_expr
  @timestamp_expr
end

Instance Method Details

#dataset_to_syncObject

Get the dataset of rows that need to be synced. Note that there are a couple race conditions here. First, those in github.com/webhookdb/webhookdb/issues/571. There is also the condition that we could send the same row multiple times when the row timestamp is set to last_synced_at but it wasn’t in the last sync; however that is likely not a big problem since clients need to handle updates in any case.



329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/webhookdb/sync_target.rb', line 329

def dataset_to_sync
  @replicator.readonly_dataset do |ds|
    # Find rows updated before we started
    tscond = (@timestamp_expr <= @now)
    # Find rows updated after the last sync was run
    @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at))
    ds = ds.where(tscond)
    # We want to paginate from oldest to newest
    ds = ds.order(@timestamp_expr)
    yield(ds)
  end
end

#record(last_synced_at) ⇒ Object



342
343
344
345
346
# File 'lib/webhookdb/sync_target.rb', line 342

def record(last_synced_at)
  self.sync_target.update(last_synced_at:)
rescue Sequel::NoExistingObject => e
  raise Webhookdb::SyncTarget::Deleted, e
end

#runObject

Raises:

  • (NotImplementedError)


320
# File 'lib/webhookdb/sync_target.rb', line 320

def run = raise NotImplementedError