Method: Webhookdb::Replicator::Base#backfill
- Defined in:
- lib/webhookdb/replicator/base.rb
#backfill(job) ⇒ Object
In order to backfill, we need to:
-
Iterate through pages of records from the external service
-
Upsert each record
The caveats/complexities are:
-
The backfill method should take care of retrying fetches for failed pages.
-
That means it needs to keep track of some pagination token.
987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 |
# File 'lib/webhookdb/replicator/base.rb', line 987 def backfill(job) raise Webhookdb::InvalidPrecondition, "job is for different service integration" unless job.service_integration === self.service_integration raise Webhookdb::InvariantViolation, "manual backfill not supported" unless self.descriptor.supports_backfill? sint = self.service_integration raise Webhookdb::Replicator::CredentialsMissing if sint.backfill_key.blank? && sint.backfill_secret.blank? && sint.depends_on.blank? last_backfilled = job.incremental? ? sint.last_backfilled_at : nil new_last_backfilled = Time.now job.update(started_at: Time.now) backfillers = self._backfillers(**job.criteria.symbolize_keys) if self._parallel_backfill && self._parallel_backfill > 1 # Create a dedicated threadpool for these backfillers, # with max parallelism determined by the replicator. pool = Concurrent::FixedThreadPool.new(self._parallel_backfill) # Record any errors that occur, since they won't raise otherwise. # Initialize a sized array to avoid any potential race conditions (though GIL should make it not an issue?). errors = Array.new(backfillers.size) backfillers.each_with_index do |bf, idx| pool.post do bf.backfill(last_backfilled) rescue StandardError => e errors[idx] = e end end # We've enqueued all backfillers; do not accept anymore work. pool.shutdown loop do # We want to stop early if we find an error, so check for errors every 10 seconds. completed = pool.wait_for_termination(10) first_error = errors.find { |e| !e.nil? } if first_error.nil? # No error, and wait_for_termination returned true, so all work is done. break if completed # No error, but work is still going on, so loop again. next end # We have an error; don't run any more backfillers. pool.kill # Wait for all ongoing backfills before raising. pool.wait_for_termination raise first_error end else backfillers.each do |backfiller| backfiller.backfill(last_backfilled) end end sint.update(last_backfilled_at: new_last_backfilled) if job.incremental? job.update(finished_at: Time.now) job.enqueue_children end |