Module: Webhookdb::Backfiller::Bulk

Instance Method Summary collapse

Instance Method Details

#conditional_upsert?Boolean

Should ‘_update_where_expr` be used or not? Default false, since most bulk upserting is backfill, which should only involve upserting new rows anyway.

Returns:

  • (Boolean)


74
# File 'lib/webhookdb/backfiller.rb', line 74

def conditional_upsert? = false

#dry_run?Boolean

Returns:

  • (Boolean)


76
# File 'lib/webhookdb/backfiller.rb', line 76

def dry_run? = false

#flush_pending_insertsObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/webhookdb/backfiller.rb', line 103

def flush_pending_inserts
  return if self.dry_run?
  return if self.pending_inserts.empty?
  rows_to_insert = self.pending_inserts.values
  update_where_expr = self.update_where_expr
  update_expr = self.upserting_replicator._upsert_update_expr(rows_to_insert.first)
  self.upserting_replicator.admin_dataset(timeout: :fast) do |ds|
    insert_ds = ds.insert_conflict(
      target: self.upserting_replicator._upsert_conflict_target,
      update: update_expr,
      update_where: update_where_expr,
    )
    insert_ds.multi_insert(rows_to_insert)
  end
  self.pending_inserts.clear
end

#handle_item(body) ⇒ Array(String, Hash), Array(nil)

Add the item to pending upserts, and run the page upsert if needed. Return the key, and the item being upserted.

Returns:

  • (Array(String, Hash), Array(nil))


81
82
83
84
85
86
87
88
89
# File 'lib/webhookdb/backfiller.rb', line 81

def handle_item(body)
  self.prepare_body(body)
  inserting = self.upserting_replicator.upsert_webhook_body(body, upsert: false)
  return nil, nil if inserting.nil?
  k = inserting.fetch(self.remote_key_column_name)
  self.pending_inserts[k] = inserting
  self.flush_pending_inserts if self.pending_inserts.size >= self.upsert_page_size
  return k, inserting
end

#pending_insertsObject



70
71
72
73
# File 'lib/webhookdb/backfiller.rb', line 70

def pending_inserts = @pending_inserts ||= {}
# Should `_update_where_expr` be used or not?
# Default false, since most bulk upserting is backfill,
# which should only involve upserting new rows anyway.

#prepare_body(_body) ⇒ Object



66
# File 'lib/webhookdb/backfiller.rb', line 66

def prepare_body(_body) = raise NotImplementedError("add/remove keys from body before upsert")

#remote_key_column_nameObject



68
# File 'lib/webhookdb/backfiller.rb', line 68

def remote_key_column_name = @remote_key_column_name ||= self.upserting_replicator._remote_key_column.name

#update_where_exprObject

Return the conditional update expression. Usually this is:

  • nil if conditional_upsert? is false.

  • the _update_where_expr if conditional_upsert? is true.

  • Can be overridden by a subclass if they need to use a specific conditional update expression in certain cases (should be rare).



97
# File 'lib/webhookdb/backfiller.rb', line 97

def update_where_expr = self.conditional_upsert? ? self.upserting_replicator._update_where_expr : nil

#upsert_page_sizeObject



65
# File 'lib/webhookdb/backfiller.rb', line 65

def upsert_page_size = raise NotImplementedError("how many items should be upserted at a time")

#upsert_update_expr(first_inserting_row) ⇒ Object

The upsert ‘UPDATE’ expression, calculated using the first row of a multi-row upsert. Defaults to _upsert_update_expr, but may need to be overridden in rare cases.



101
# File 'lib/webhookdb/backfiller.rb', line 101

def upsert_update_expr(first_inserting_row) = self.upserting_replicator._upsert_update_expr(first_inserting_row)

#upserting_replicatorObject



67
# File 'lib/webhookdb/backfiller.rb', line 67

def upserting_replicator = raise NotImplementedError("the replicator being upserted")