Class: Webhookdb::SyncTarget::Routine
- Inherits:
-
Object
- Object
- Webhookdb::SyncTarget::Routine
- Defined in:
- lib/webhookdb/sync_target.rb
Direct Known Subclasses
Instance Attribute Summary collapse
- #last_synced_at ⇒ Time
-
#now ⇒ Object
readonly
Returns the value of attribute now.
-
#replicator ⇒ Object
readonly
Returns the value of attribute replicator.
-
#sync_target ⇒ Object
readonly
Returns the value of attribute sync_target.
-
#timestamp_expr ⇒ Object
readonly
Returns the value of attribute timestamp_expr.
Instance Method Summary collapse
-
#dataset_to_sync ⇒ Object
Get the dataset of rows that need to be synced.
-
#initialize(now, sync_target) ⇒ Routine
constructor
A new instance of Routine.
- #perform_db_op ⇒ Object
- #record(last_synced_at) ⇒ Object
- #run ⇒ Object
- #to_ms(t) ⇒ Object
- #with_stat ⇒ Object
Constructor Details
#initialize(now, sync_target) ⇒ Routine
Returns a new instance of Routine.
392 393 394 395 396 397 398 |
# File 'lib/webhookdb/sync_target.rb', line 392 def initialize(now, sync_target) @now = now @sync_target = sync_target @last_synced_at = sync_target.last_synced_at @replicator = sync_target.service_integration.replicator @timestamp_expr = Sequel[@replicator..name] end |
Instance Attribute Details
#last_synced_at ⇒ Time
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/webhookdb/sync_target.rb', line 389 class Routine attr_reader :now, :sync_target, :replicator, :timestamp_expr def initialize(now, sync_target) @now = now @sync_target = sync_target @last_synced_at = sync_target.last_synced_at @replicator = sync_target.service_integration.replicator @timestamp_expr = Sequel[@replicator..name] end def run = raise NotImplementedError # Get the dataset of rows that need to be synced. # Note that there are a couple race conditions here. # First, those in https://github.com/webhookdb/webhookdb/issues/571. # There is also the condition that we could send the same row # multiple times when the row timestamp is set to last_synced_at but # it wasn't in the last sync; however that is likely not a big problem # since clients need to handle updates in any case. def dataset_to_sync # Use admin dataset, since the client could be using all their readonly conns. @replicator.admin_dataset do |ds| # Find rows updated before we started tscond = (@timestamp_expr <= @now) # Find rows updated after the last sync was run @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at)) ds = ds.where(tscond) # We want to paginate from oldest to newest ds = ds.order(@timestamp_expr) yield(ds) end end def perform_db_op(&) yield rescue Sequel::NoExistingObject => e raise Webhookdb::SyncTarget::Deleted, e end def record(last_synced_at) self.perform_db_op do self.sync_target.update(last_synced_at:) end end def with_stat(&) start = Time.now begin yield self.sync_target.add_sync_stat(start) rescue Webhookdb::Http::Error => e self.sync_target.add_sync_stat(start, response_status: e.status) raise rescue StandardError => e self.sync_target.add_sync_stat(start, exception: e) raise end end def to_ms(t) return (t.to_f * 1000).to_i end end |
#now ⇒ Object (readonly)
Returns the value of attribute now.
390 391 392 |
# File 'lib/webhookdb/sync_target.rb', line 390 def now @now end |
#replicator ⇒ Object (readonly)
Returns the value of attribute replicator.
390 391 392 |
# File 'lib/webhookdb/sync_target.rb', line 390 def replicator @replicator end |
#sync_target ⇒ Object (readonly)
Returns the value of attribute sync_target.
390 391 392 |
# File 'lib/webhookdb/sync_target.rb', line 390 def sync_target @sync_target end |
#timestamp_expr ⇒ Object (readonly)
Returns the value of attribute timestamp_expr.
390 391 392 |
# File 'lib/webhookdb/sync_target.rb', line 390 def @timestamp_expr end |
Instance Method Details
#dataset_to_sync ⇒ Object
Get the dataset of rows that need to be synced. Note that there are a couple race conditions here. First, those in github.com/webhookdb/webhookdb/issues/571. There is also the condition that we could send the same row multiple times when the row timestamp is set to last_synced_at but it wasn’t in the last sync; however that is likely not a big problem since clients need to handle updates in any case.
409 410 411 412 413 414 415 416 417 418 419 420 421 |
# File 'lib/webhookdb/sync_target.rb', line 409 def dataset_to_sync # Use admin dataset, since the client could be using all their readonly conns. @replicator.admin_dataset do |ds| # Find rows updated before we started tscond = (@timestamp_expr <= @now) # Find rows updated after the last sync was run @last_synced_at && (tscond &= (@timestamp_expr >= @last_synced_at)) ds = ds.where(tscond) # We want to paginate from oldest to newest ds = ds.order(@timestamp_expr) yield(ds) end end |
#perform_db_op ⇒ Object
423 424 425 426 427 |
# File 'lib/webhookdb/sync_target.rb', line 423 def perform_db_op(&) yield rescue Sequel::NoExistingObject => e raise Webhookdb::SyncTarget::Deleted, e end |
#record(last_synced_at) ⇒ Object
429 430 431 432 433 |
# File 'lib/webhookdb/sync_target.rb', line 429 def record(last_synced_at) self.perform_db_op do self.sync_target.update(last_synced_at:) end end |
#run ⇒ Object
400 |
# File 'lib/webhookdb/sync_target.rb', line 400 def run = raise NotImplementedError |
#to_ms(t) ⇒ Object
449 450 451 |
# File 'lib/webhookdb/sync_target.rb', line 449 def to_ms(t) return (t.to_f * 1000).to_i end |
#with_stat ⇒ Object
435 436 437 438 439 440 441 442 443 444 445 446 447 |
# File 'lib/webhookdb/sync_target.rb', line 435 def with_stat(&) start = Time.now begin yield self.sync_target.add_sync_stat(start) rescue Webhookdb::Http::Error => e self.sync_target.add_sync_stat(start, response_status: e.status) raise rescue StandardError => e self.sync_target.add_sync_stat(start, exception: e) raise end end |