Class: Webhookdb::SyncTarget::Routine
- Inherits:
-
Object
- Object
- Webhookdb::SyncTarget::Routine
- Defined in:
- lib/webhookdb/sync_target.rb
Direct Known Subclasses
Instance Attribute Summary collapse
- #last_synced_at ⇒ Time
-
#now ⇒ Object
readonly
Returns the value of attribute now.
-
#replicator ⇒ Object
readonly
Returns the value of attribute replicator.
-
#sync_target ⇒ Object
readonly
Returns the value of attribute sync_target.
-
#timestamp_expr ⇒ Object
readonly
Returns the value of attribute timestamp_expr.
Instance Method Summary collapse
-
#dataset_to_sync ⇒ Object
Get the dataset of rows that need to be synced.
-
#initialize(now, sync_target) ⇒ Routine
constructor
A new instance of Routine.
- #record(last_synced_at) ⇒ Object
- #run ⇒ Object
Constructor Details
#initialize(now, sync_target) ⇒ Routine
Returns a new instance of Routine.
312 313 314 315 316 317 318 |
# File 'lib/webhookdb/sync_target.rb', line 312 def initialize(now, sync_target) @now = now @sync_target = sync_target @last_synced_at = sync_target.last_synced_at @replicator = sync_target.service_integration.replicator @timestamp_expr = Sequel[@replicator..name] end |
Instance Attribute Details
#last_synced_at ⇒ Time
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
# File 'lib/webhookdb/sync_target.rb', line 309 class Routine attr_reader :now, :sync_target, :replicator, :timestamp_expr def initialize(now, sync_target) @now = now @sync_target = sync_target @last_synced_at = sync_target.last_synced_at @replicator = sync_target.service_integration.replicator @timestamp_expr = Sequel[@replicator..name] end def run = raise NotImplementedError # Get the dataset of rows that need to be synced. # Note that there are a couple race conditions here. # First, those in https://github.com/webhookdb/webhookdb/issues/571. # There is also the condition that we could send the same row # multiple times when the row timestamp is set to last_synced_at but # it wasn't in the last sync; however that is likely not a big problem # since clients need to handle updates in any case. def dataset_to_sync @replicator.readonly_dataset do |ds| # Find rows updated before we started tscond = (@timestamp_expr <= @now) # Find rows updated after the last sync was run @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at)) ds = ds.where(tscond) # We want to paginate from oldest to newest ds = ds.order(@timestamp_expr) yield(ds) end end def record(last_synced_at) self.sync_target.update(last_synced_at:) rescue Sequel::NoExistingObject => e raise Webhookdb::SyncTarget::Deleted, e end end |
#now ⇒ Object (readonly)
Returns the value of attribute now.
310 311 312 |
# File 'lib/webhookdb/sync_target.rb', line 310 def now @now end |
#replicator ⇒ Object (readonly)
Returns the value of attribute replicator.
310 311 312 |
# File 'lib/webhookdb/sync_target.rb', line 310 def replicator @replicator end |
#sync_target ⇒ Object (readonly)
Returns the value of attribute sync_target.
310 311 312 |
# File 'lib/webhookdb/sync_target.rb', line 310 def sync_target @sync_target end |
#timestamp_expr ⇒ Object (readonly)
Returns the value of attribute timestamp_expr.
310 311 312 |
# File 'lib/webhookdb/sync_target.rb', line 310 def @timestamp_expr end |
Instance Method Details
#dataset_to_sync ⇒ Object
Get the dataset of rows that need to be synced. Note that there are a couple race conditions here. First, those in github.com/webhookdb/webhookdb/issues/571. There is also the condition that we could send the same row multiple times when the row timestamp is set to last_synced_at but it wasn’t in the last sync; however that is likely not a big problem since clients need to handle updates in any case.
329 330 331 332 333 334 335 336 337 338 339 340 |
# File 'lib/webhookdb/sync_target.rb', line 329 def dataset_to_sync @replicator.readonly_dataset do |ds| # Find rows updated before we started tscond = (@timestamp_expr <= @now) # Find rows updated after the last sync was run @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at)) ds = ds.where(tscond) # We want to paginate from oldest to newest ds = ds.order(@timestamp_expr) yield(ds) end end |
#record(last_synced_at) ⇒ Object
342 343 344 345 346 |
# File 'lib/webhookdb/sync_target.rb', line 342 def record(last_synced_at) self.sync_target.update(last_synced_at:) rescue Sequel::NoExistingObject => e raise Webhookdb::SyncTarget::Deleted, e end |
#run ⇒ Object
320 |
# File 'lib/webhookdb/sync_target.rb', line 320 def run = raise NotImplementedError |