Class: Webhookdb::Replicator::Base

Inherits:
Object
  • Object
show all
Includes:
Appydays::Loggable, DBAdapter::ColumnTypes
Defined in:
lib/webhookdb/replicator/base.rb

Defined Under Namespace

Classes: CredentialVerificationResult, ServiceBackfiller

Constant Summary collapse

MAX_INDEX_NAME_LENGTH =
63

Constants included from DBAdapter::ColumnTypes

DBAdapter::ColumnTypes::BIGINT, DBAdapter::ColumnTypes::BIGINT_ARRAY, DBAdapter::ColumnTypes::BOOLEAN, DBAdapter::ColumnTypes::COLUMN_TYPES, DBAdapter::ColumnTypes::DATE, DBAdapter::ColumnTypes::DECIMAL, DBAdapter::ColumnTypes::DOUBLE, DBAdapter::ColumnTypes::FLOAT, DBAdapter::ColumnTypes::INTEGER, DBAdapter::ColumnTypes::INTEGER_ARRAY, DBAdapter::ColumnTypes::OBJECT, DBAdapter::ColumnTypes::TEXT, DBAdapter::ColumnTypes::TEXT_ARRAY, DBAdapter::ColumnTypes::TIMESTAMP, DBAdapter::ColumnTypes::UUID

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(service_integration) ⇒ Base

Returns a new instance of Base.



31
32
33
# File 'lib/webhookdb/replicator/base.rb', line 31

def initialize(service_integration)
  @service_integration = service_integration
end

Instance Attribute Details

#service_integrationWebhookdb::ServiceIntegration (readonly)



29
30
31
# File 'lib/webhookdb/replicator/base.rb', line 29

def service_integration
  @service_integration
end

Class Method Details

.chunked_row_update_bounds(max_pk, chunk_size: 1_000_000) ⇒ Object

Return an array of tuples used for splitting UPDATE queries so locks are not held on the entire table when backfilling values when adding new columns. See ensure_all_columns_modification.

The returned chunks are like: [[0, 100], [100, 200], [200]], and meant to be used like ‘0 < pk <= 100`, `100 < pk <= 200`, `p, > 200`.

Note that final value in the array is a single item, used like ‘pk > chunks[0]`.



652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
# File 'lib/webhookdb/replicator/base.rb', line 652

def self.chunked_row_update_bounds(max_pk, chunk_size: 1_000_000)
  result = []
  chunk_lower_pk = 0
  chunk_upper_pk = chunk_size
  while chunk_upper_pk <= max_pk
    # Get chunks like 0 < pk <= 100, 100 < pk <= 200, etc
    # Each loop we increment one row chunk size, until we find the chunk containing our max PK.
    # Ie if row chunk size is 100, and max_pk is 450, the final chunk here is 400-500.
    result << [chunk_lower_pk, chunk_upper_pk]
    chunk_lower_pk += chunk_size
    chunk_upper_pk += chunk_size
  end
  # Finally, one final chunk for all rows greater than our biggest chunk.
  # For example, with a row chunk size of 100, and max_pk of 450, we got a final chunk of 400-500.
  # But we could have gotten 100 writes (with a new max pk of 550), so this 'pk > 500' catches those.
  result << [chunk_lower_pk]
end

.descriptorWebhookdb::Replicator::Descriptor

This method is abstract.

Return the descriptor for this service.

Returns:

Raises:

  • (NotImplementedError)


24
25
26
# File 'lib/webhookdb/replicator/base.rb', line 24

def self.descriptor
  raise NotImplementedError, "#{self.class}: must return a descriptor that is used for registration purposes"
end

Instance Method Details

#_any_subscriptions_to_notify?Boolean

Returns:

  • (Boolean)


801
802
803
# File 'lib/webhookdb/replicator/base.rb', line 801

def _any_subscriptions_to_notify?
  return !self.service_integration.all_webhook_subscriptions_dataset.to_notify.empty?
end

#_backfill_state_change_fieldsObject

If we support backfilling, these keys are used for them. Override if other fields are used instead. There cannot be overlap between these and the webhook state change fields.



145
# File 'lib/webhookdb/replicator/base.rb', line 145

def _backfill_state_change_fields = ["backfill_key", "backfill_secret", "api_url"]

#_backfillersArray<Webhookdb::Backfiller>

Return backfillers for the replicator. We must use an array for ‘data-based’ backfillers, like when we need to paginate for each row in another table.

By default, return a ServiceBackfiller, which will call _fetch_backfill_page on the receiver.

Returns:



1170
1171
1172
# File 'lib/webhookdb/replicator/base.rb', line 1170

def _backfillers
  return [ServiceBackfiller.new(self)]
end

#_clear_backfill_informationObject



314
315
316
# File 'lib/webhookdb/replicator/base.rb', line 314

def _clear_backfill_information
  self.service_integration.set(api_url: "", backfill_key: "", backfill_secret: "")
end

#_clear_webook_informationObject



301
302
303
# File 'lib/webhookdb/replicator/base.rb', line 301

def _clear_webook_information
  self.service_integration.set(webhook_secret: "")
end

#_coalesce_excluded_on_update(update, column_names) ⇒ Object

Have a column set itself only on insert or if nil.

Given the payload to DO UPDATE, mutate it so that the column names included in ‘column_names’ use what is already in the table, and fall back to what’s being inserted. This new payload should be passed to the ‘update` kwarg of `insert_conflict`:

ds.insert_conflict(update: self._coalesce_excluded_on_update(payload, :created_at)).insert(payload)

Parameters:

  • update (Hash)
  • column_names (Array<Symbol>)


957
958
959
960
961
962
# File 'lib/webhookdb/replicator/base.rb', line 957

def _coalesce_excluded_on_update(update, column_names)
  # Now replace just the specific columns we're overriding.
  column_names.each do |c|
    update[c] = Sequel.function(:coalesce, self.qualified_table_sequel_identifier[c], Sequel[:excluded][c])
  end
end

#_denormalized_columnsArray<Webhookdb::Replicator::Column]

When an integration needs denormalized columns, specify them here. Indices are created for each column. Modifiers can be used if columns should have a default or whatever. See Webhookdb::Replicator::Column for more details about column fields.

Returns:



528
529
530
# File 'lib/webhookdb/replicator/base.rb', line 528

def _denormalized_columns
  return []
end

#_enqueue_backfill_jobs(incremental:, criteria: nil, recursive: true, enqueue: true) ⇒ Object



215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/webhookdb/replicator/base.rb', line 215

def _enqueue_backfill_jobs(incremental:, criteria: nil, recursive: true, enqueue: true)
  m = recursive ? :create_recursive : :create
  j = Webhookdb::BackfillJob.send(
    m,
    service_integration:,
    incremental:,
    criteria: criteria || {},
    created_by: Webhookdb.request_user_and_admin[0],
  )
  j.enqueue if enqueue
  return j
end

#_extra_index_specsArray<Webhook::Replicator::IndexSpec>

Names of columns for multi-column indices. Each one must be in denormalized_columns.

Returns:

  • (Array<Webhook::Replicator::IndexSpec>)


477
478
479
# File 'lib/webhookdb/replicator/base.rb', line 477

def _extra_index_specs
  return []
end

#_fetch_enrichment(resource, event, request) ⇒ *

Given the resource that is going to be inserted and an optional event, make an API call to enrich it with further data if needed. The result of this is passed to _prepare_for_insert.

Parameters:

Returns:

  • (*)


838
839
840
# File 'lib/webhookdb/replicator/base.rb', line 838

def _fetch_enrichment(resource, event, request)
  return nil
end

#_find_dependency_candidate(value) ⇒ Object

Parameters:

  • value (String)

Raises:



229
230
231
232
233
234
235
236
237
# File 'lib/webhookdb/replicator/base.rb', line 229

def _find_dependency_candidate(value)
  int_val = value.strip.blank? ? 1 : value.to_i
  idx = int_val - 1
  dep_candidates = self.service_integration.dependency_candidates
  raise Webhookdb::InvalidPrecondition, "no dependency candidates" if dep_candidates.empty?
  raise Webhookdb::InvalidInput, "'#{value}' is not a valid dependency" if
    idx.negative? || idx >= dep_candidates.length
  return dep_candidates[idx]
end

#_notify_dependents(inserting, changed) ⇒ Object

Parameters:

  • changed (Boolean)


795
796
797
798
799
# File 'lib/webhookdb/replicator/base.rb', line 795

def _notify_dependents(inserting, changed)
  self.service_integration.dependents.each do |d|
    d.replicator.on_dependency_webhook_upsert(self, inserting, changed:)
  end
end

#_parallel_backfillObject

If this replicator supports backfilling in parallel (running multiple backfillers at a time), return the degree of paralellism (or nil if not running in parallel). We leave parallelism up to the replicator, not CPU count, since most work involves waiting on APIs to return.

NOTE: These threads are in addition to any worker threads, so it’s important to pay attention to memory use.



1158
1159
1160
# File 'lib/webhookdb/replicator/base.rb', line 1158

def _parallel_backfill
  return nil
end

#_prepare_for_insert(resource, event, request, enrichment) ⇒ Hash

Return the hash that should be inserted into the database, based on the denormalized columns and data given.

Parameters:

Returns:

  • (Hash)


890
891
892
893
894
895
896
897
# File 'lib/webhookdb/replicator/base.rb', line 890

def _prepare_for_insert(resource, event, request, enrichment)
  h = [self._remote_key_column].concat(self._denormalized_columns).each_with_object({}) do |col, memo|
    value = col.to_ruby_value(resource:, event:, enrichment:, service_integration:)
    skip = value.nil? && col.skip_nil?
    memo[col.name] = value unless skip
  end
  return h
end

#_publish_rowupsert(row, check_for_subscriptions: true) ⇒ Object



805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
# File 'lib/webhookdb/replicator/base.rb', line 805

def _publish_rowupsert(row, check_for_subscriptions: true)
  return unless check_for_subscriptions && self._any_subscriptions_to_notify?
  payload = [
    self.service_integration.id,
    {
      row:,
      external_id_column: self._remote_key_column.name,
      external_id: row[self._remote_key_column.name],
    },
  ]
  # We AVOID pubsub here because we do NOT want to go through the router
  # and audit logger for this.
  event = Amigo::Event.create("webhookdb.serviceintegration.rowupsert", payload.as_json)
  Webhookdb::Jobs::SendWebhook.perform_async(event.as_json)
end

#_remote_key_columnWebhookdb::Replicator::Column

This method is abstract.

Each integration needs a single remote key, like the Shopify order id for shopify orders, or sid for Twilio resources. This column must be unique for the table, like a primary key.

NOTE: Do not set index:true. The remote key column always must be unique, so it gets a unique index automatically.

Returns:

Raises:

  • (NotImplementedError)


518
519
520
# File 'lib/webhookdb/replicator/base.rb', line 518

def _remote_key_column
  raise NotImplementedError
end

#_resource_and_event(request) ⇒ Array<Hash,Array>?

This method is abstract.

Given a webhook/backfill item payload, return the resource hash, and an optional event hash. If ‘body’ is the resource itself, this method returns [body, nil]. If ‘body’ is an event, this method returns [body.resource-key, body]. Columns can check for whether there is an event and/or body when converting.

If this returns nil, the upsert is skipped.

For example, a Stripe customer backfill upsert would be ‘’cus_123’‘ when we backfill, but `’event’, data: {id: ‘cus_123’}‘ when handling an event.

Parameters:

Returns:

  • (Array<Hash,Array>, nil)

Raises:

  • (NotImplementedError)


879
880
881
# File 'lib/webhookdb/replicator/base.rb', line 879

def _resource_and_event(request)
  raise NotImplementedError
end

#_resource_to_data(resource, event, request, enrichment) ⇒ Hash

Given the resource, return the value for the :data column. Only needed in rare situations where fields should be stored on the row, but not in :data. To skip :data column updates, return nil.

Parameters:

Returns:

  • (Hash)


908
909
910
# File 'lib/webhookdb/replicator/base.rb', line 908

def _resource_to_data(resource, event, request, enrichment)
  return resource
end

#_store_enrichment_body?Boolean

Use this to determine whether we should add an enrichment column in the create table modification to store the enrichment body.

Returns:

  • (Boolean)


340
341
342
# File 'lib/webhookdb/replicator/base.rb', line 340

def _store_enrichment_body?
  return false
end

#_timestamp_column_nameSymbol

This method is abstract.

The name of the timestamp column in the schema. This column is used primarily for conditional upserts (ie to know if a row has changed), but also as a general way of auditing changes.

Returns:

  • (Symbol)

Raises:

  • (NotImplementedError)


506
507
508
# File 'lib/webhookdb/replicator/base.rb', line 506

def _timestamp_column_name
  raise NotImplementedError
end

#_to_json(v) ⇒ Object

The NULL ASCII character (u0000), when present in a string (“u0000”), and then encoded into JSON (“\u0000”) is invalid in PG JSONB- its strings cannot contain NULLs (note that JSONB does not store the encoded string verbatim, it parses it into PG types, and a PG string cannot contain NULL since C strings are NULL-terminated).

So we remove the “\u0000” character from encoded JSON- for example, in the hash “u0000”, if we #to_json, we end up with ‘“x”:“\u0000”’. The removal of encoded NULL gives us ‘“x”:“”’.

HOWEVER, if the encoded null is itself escaped, we MUST NOT remove it. For example, in the hash “u0000”.to_json.to_json (ie, a JSON string which contains another JSON string), we end up with ‘“x”:“\\u0000”`, That is, a string containing the escaped null character. This is valid for PG, because it’s not a NULL- it’s an escaped “", followed by ”u0000“. If we were to remove the string ”\u0000“, we’d end up with ‘”x“:”\“’. This creates an invalid document.

So we remove only “\u0000” by not replacing “\\u0000”- replace all occurences of “<any one character except backslash>\u0000” with “<character before backslash>”.



790
791
792
# File 'lib/webhookdb/replicator/base.rb', line 790

def _to_json(v)
  return v.to_json.gsub(/(\\\\u0000|\\u0000)/, {"\\\\u0000" => "\\\\u0000", "\\u0000" => ""})
end

#_update_where_exprSequel::SQL::Expression

This method is abstract.

The argument for insert_conflict update_where clause. Used to conditionally update, like updating only if a row is newer than what’s stored. We must always have an ‘update where’ because we never want to overwrite with the same data as exists.

If an integration does not have any way to detect if a resource changed, it can compare data columns.

Examples:

With a meaningful timestmap

self.qualified_table_sequel_identifier[:updated_at] < Sequel[:excluded][:updated_at]

Without a meaingful timestamp

self.qualified_table_sequel_identifier[:data] !~ Sequel[:excluded][:data]

Returns:

  • (Sequel::SQL::Expression)

Raises:

  • (NotImplementedError)


858
859
860
# File 'lib/webhookdb/replicator/base.rb', line 858

def _update_where_expr
  raise NotImplementedError
end

#_upsert_conflict_targetObject

The target for ON CONFLICT. Usually the remote key column name, except if the remote id is a compound unique index, like for partitioned tables. Can be a symbol, array of symbols representing the column names, a Sequel.lit, etc. See Sequel::Dataset.insert_conflict :target option for details.



772
# File 'lib/webhookdb/replicator/base.rb', line 772

def _upsert_conflict_target = self._remote_key_column.name

#_upsert_update_expr(inserting, enrichment: nil) ⇒ Object

Given the hash that is passed to the Sequel insert (so contains all columns, including those from _prepare_for_insert), return the hash used for the insert_conflict(update:) keyword args.

Rather than sending over the literal values in the inserting statement (which is pretty verbose, like the large ‘data’ column), make a smaller statement by using ‘EXCLUDED’.

This can be overriden when the service requires different values for inserting vs. updating, such as when a column’s update value must use the EXCLUDED table in the upsert expression.

Most commonly, the use case for this is when you want to provide a row a value, but ONLY on insert, OR on update by ONLY if the column is nil. In that case, pass the result of this base method to _coalesce_excluded_on_update (see also for more details).

You can also use this method to merge :data columns together. For example: ‘super_result = Sequel.lit(“#Webhookdb::Replicator::Base.selfself.service_integrationself.service_integration.table_name.data || excluded.data”)`

By default, this will use the same values for UPDATE as are used for INSERT, like ‘email = EXCLUDED.email` (the ’EXCLUDED’ row being the one that failed to insert).



934
935
936
937
# File 'lib/webhookdb/replicator/base.rb', line 934

def _upsert_update_expr(inserting, enrichment: nil)
  result = inserting.each_with_object({}) { |(c, _), h| h[c] = Sequel[:excluded][c] }
  return result
end

#_upsert_webhook(request, upsert: true) ⇒ Array, Hash

Hook to be overridden, while still retaining top-level upsert_webhook functionality like error handling.

Parameters:

Returns:

  • (Array, Hash)

    Inserted rows, or array of inserted rows if many.



728
729
730
731
732
733
734
735
736
737
738
739
740
741
# File 'lib/webhookdb/replicator/base.rb', line 728

def _upsert_webhook(request, upsert: true)
  resource_or_list, event = self._resource_and_event(request)
  return nil if resource_or_list.nil?
  if resource_or_list.is_a?(Array)
    unless event.nil?
      msg = "resource_and_event cannot return an array of resources with a non-nil event"
      raise Webhookdb::InvalidPostcondition, msg
    end
    return resource_or_list.map do |resource|
      self._upsert_webhook_single_resource(request, resource:, event:, upsert:)
    end
  end
  return self._upsert_webhook_single_resource(request, resource: resource_or_list, event:, upsert:)
end

#_upsert_webhook_single_resource(request, resource:, event:, upsert:) ⇒ Object



743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
# File 'lib/webhookdb/replicator/base.rb', line 743

def _upsert_webhook_single_resource(request, resource:, event:, upsert:)
  enrichment = self._fetch_enrichment(resource, event, request)
  prepared = self._prepare_for_insert(resource, event, request, enrichment)
  raise Webhookdb::InvalidPostcondition if prepared.key?(:data)
  inserting = {}
  data_col_val = self._resource_to_data(resource, event, request, enrichment)
  inserting[:data] = self._to_json(data_col_val)
  inserting[:enrichment] = self._to_json(enrichment) if self._store_enrichment_body?
  inserting.merge!(prepared)
  return inserting unless upsert
  updating = self._upsert_update_expr(inserting, enrichment:)
  update_where = self._update_where_expr
  upserted_rows = self.admin_dataset(timeout: :fast) do |ds|
    ds.insert_conflict(
      target: self._upsert_conflict_target,
      update: updating,
      update_where:,
    ).insert(inserting)
  end
  row_changed = upserted_rows.present?
  self._notify_dependents(inserting, row_changed)
  self._publish_rowupsert(inserting) if row_changed
  return inserting
end

#_verify_backfill_err_msgObject

Raises:

  • (NotImplementedError)


1057
1058
1059
# File 'lib/webhookdb/replicator/base.rb', line 1057

def _verify_backfill_err_msg
  raise NotImplementedError, "each integration must provide an error message for unanticipated errors"
end

#_webhook_response(request) ⇒ Webhookdb::WebhookResponse

This method is abstract.

Return a the response for the webhook. We must do this immediately in the endpoint itself, since verification may include info specific to the request content (like, it can be whitespace sensitive).

Parameters:

  • request (Rack::Request)

Returns:

Raises:

  • (NotImplementedError)


134
135
136
# File 'lib/webhookdb/replicator/base.rb', line 134

def _webhook_response(request)
  raise NotImplementedError
end

#_webhook_state_change_fieldsObject

If we support webhooks, these fields correspond to the webhook state machine. Override them if some other fields are also needed for webhooks.



140
# File 'lib/webhookdb/replicator/base.rb', line 140

def _webhook_state_change_fields = ["webhook_secret"]

#admin_dataset(**kw) ⇒ Sequel::Dataset

Yield to a dataset using the admin connection.

Returns:

  • (Sequel::Dataset)


966
967
968
# File 'lib/webhookdb/replicator/base.rb', line 966

def admin_dataset(**kw, &)
  self.with_dataset(self.service_integration.organization.admin_connection_url_raw, **kw, &)
end

#avoid_writes?Boolean

Avoid writes under the following conditions:

  • A table lock is taken on the table

  • A vacuum is in progress on the table

Of course, in most situations we want to write anyway, but there are some cases (lower-priority replicators for example) where we can reschedule the job to happen in the future instead.

Returns:

  • (Boolean)


1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
# File 'lib/webhookdb/replicator/base.rb', line 1280

def avoid_writes?
  # We will need to handle this differently when not under Postgres, but for now,
  # just assume Postgres.
  # Find the admin URL for the organization's server (NOT the organization admin url, it can't see system processes).
  # Then check for 1) vacuums in progress, 2) locks.
  self.service_integration.organization.readonly_connection do |db|
    count = db[:pg_locks].
      join(:pg_class, {oid: :relation}).
      join(:pg_namespace, {oid: :relnamespace}).
      where(
        locktype: "relation",
        nspname: self.service_integration.organization.replication_schema,
        relname: self.service_integration.table_name,
        mode: ["ShareUpdateExclusiveLock", "ExclusiveLock", "AccessExclusiveLock"],
      ).limit(1).count
    return true if count&.positive?
  end
  return false
end

#backfill(job) ⇒ Object

In order to backfill, we need to:

  • Iterate through pages of records from the external service

  • Upsert each record

The caveats/complexities are:

  • The backfill method should take care of retrying fetches for failed pages.

  • That means it needs to keep track of some pagination token.



1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
# File 'lib/webhookdb/replicator/base.rb', line 1070

def backfill(job)
  raise Webhookdb::InvalidPrecondition, "job is for different service integration" unless
    job.service_integration === self.service_integration

  raise Webhookdb::InvariantViolation, "manual backfill not supported" unless self.descriptor.supports_backfill?

  sint = self.service_integration
  raise Webhookdb::Replicator::CredentialsMissing if
    sint.backfill_key.blank? && sint.backfill_secret.blank? && sint.depends_on.blank?
  last_backfilled = job.incremental? ? sint.last_backfilled_at : nil
  new_last_backfilled = Time.now
  job.update(started_at: Time.now)

  backfillers = self._backfillers(**job.criteria.symbolize_keys)
  begin
    if self._parallel_backfill && self._parallel_backfill > 1
      _do_parallel_backfill(backfillers, last_backfilled)
    else
      _do_serial_backfill(backfillers, last_backfilled)
    end
  rescue StandardError => e
    if self.on_backfill_error(e) == true
      job.update(finished_at: Time.now)
      return
    end
    raise e
  end

  sint.update(last_backfilled_at: new_last_backfilled) if job.incremental?
  job.update(finished_at: Time.now)
  job.enqueue_children
end

#backfill_not_supported_messageObject

When backfilling is not supported, this message is used. It can be overridden for custom explanations, or descriptor#documentation_url can be provided, which will use a default message. If no documentation is available, a fallback message is used.



277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/webhookdb/replicator/base.rb', line 277

def backfill_not_supported_message
  du = self.documentation_url
  if du.blank?
    msg = %(Sorry, you cannot backfill this integration. You may be looking for one of the following:

webhookdb integrations reset #{self.service_integration.table_name}
    )
    return msg
  end
  msg = %(Sorry, you cannot manually backfill this integration.
Please refer to the documentation at #{du}
for information on how to refresh data.)
  return msg
end

#calculate_and_backfill_state_machine(incremental:, criteria: nil, recursive: true, enqueue: true) ⇒ Array<Webhookdb::StateMachineStep, Webhookdb::BackfillJob>

Run calculate_backfill_state_machine. Then create and enqueue a new BackfillJob if it’s successful. Returns a tuple of the StateMachineStep and BackfillJob. If the BackfillJob is returned, the StateMachineStep was successful; otherwise no job is created and the second item is nil.

Returns:



265
266
267
268
269
270
# File 'lib/webhookdb/replicator/base.rb', line 265

def calculate_and_backfill_state_machine(incremental:, criteria: nil, recursive: true, enqueue: true)
  step = self.calculate_backfill_state_machine
  bfjob = nil
  bfjob = self._enqueue_backfill_jobs(incremental:, criteria:, recursive:, enqueue:) if step.successful?
  return step, bfjob
end

#calculate_backfill_state_machineWebhookdb::Replicator::StateMachineStep

Return the state machine that is used when adding backfill support to an integration. Usually this sets one or both of the backfill key and secret.

Returns:

Raises:

  • (NotImplementedError)


254
255
256
257
# File 'lib/webhookdb/replicator/base.rb', line 254

def calculate_backfill_state_machine
  # This is a pure function that can be tested on its own--the endpoints just need to return a state machine step
  raise NotImplementedError
end

#calculate_dependency_state_machine_step(dependency_help:) ⇒ Object



1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
# File 'lib/webhookdb/replicator/base.rb', line 1230

def calculate_dependency_state_machine_step(dependency_help:)
  raise Webhookdb::InvalidPrecondition, "#{self.descriptor.name} does not have a dependency" if
    self.class.descriptor.dependency_descriptor.nil?
  return nil if self.service_integration.depends_on_id
  step = Webhookdb::Replicator::StateMachineStep.new
  dep_descr = self.descriptor.dependency_descriptor
  candidates = self.service_integration.dependency_candidates
  if candidates.empty?
    step.output = %(This integration requires #{dep_descr.resource_name_plural} to sync.

You don't have any #{dep_descr.resource_name_singular} integrations yet. You can run:

webhookdb integrations create #{dep_descr.name}

to set one up. Then once that's complete, you can re-run:

webhookdb integrations create #{self.descriptor.name}

to keep going.
)
    step.error_code = "no_candidate_dependency"
    return step.completed
  end
  choice_lines = candidates.each_with_index.
    map { |si, idx| "#{idx + 1} - #{si.table_name}" }.
    join("\n")
  step.output = %(This integration requires #{dep_descr.resource_name_plural} to sync.
#{dependency_help.blank? ? '' : "\n#{dependency_help}\n"}
Enter the number for the #{dep_descr.resource_name_singular} integration you want to use,
or leave blank to choose the first option.

#{choice_lines}
)
  step.prompting("Parent integration number")
  step.post_to_url = self.service_integration.authed_api_path + "/transition/dependency_choice"
  return step
end

#calculate_preferred_create_state_machineWebhookdb::Replicator::StateMachineStep

See preferred_create_state_machine_method. If we prefer backfilling, and it’s successful, we also want to enqueue jobs; that is, use calculate_and_backfill_state_machine, not just calculate_backfill_state_machine.



209
210
211
212
213
# File 'lib/webhookdb/replicator/base.rb', line 209

def calculate_preferred_create_state_machine
  m = self.preferred_create_state_machine_method
  return self.calculate_and_backfill_state_machine(incremental: true)[0] if m == :calculate_backfill_state_machine
  return self.calculate_webhook_state_machine
end

#calculate_webhook_state_machineWebhookdb::Replicator::StateMachineStep

This method is abstract.

Return the state machine that is used when setting up this integration. Usually this entails providing the user the webhook url, and providing or asking for a webhook secret. In some cases, this can be a lot more complex though.

Returns:

Raises:

  • (NotImplementedError)


246
247
248
# File 'lib/webhookdb/replicator/base.rb', line 246

def calculate_webhook_state_machine
  raise NotImplementedError
end

#clear_backfill_informationObject

Remove all the information needed for backfilling from the integration so that it can be re-entered



306
307
308
309
310
311
312
# File 'lib/webhookdb/replicator/base.rb', line 306

def clear_backfill_information
  self._clear_backfill_information
  # If we don't support both webhooks and backfilling, we are safe to clear ALL fields
  # and get back into an initial state.
  self._clear_webook_information unless self.descriptor.supports_webhooks_and_backfill?
  self.service_integration.save_changes
end

#clear_webhook_informationObject

Remove all the information used in the initial creation of the integration so that it can be re-entered



293
294
295
296
297
298
299
# File 'lib/webhookdb/replicator/base.rb', line 293

def clear_webhook_information
  self._clear_webook_information
  # If we don't support both webhooks and backfilling, we are safe to clear ALL fields
  # and get back into an initial state.
  self._clear_backfill_information unless self.descriptor.supports_webhooks_and_backfill?
  self.service_integration.save_changes
end

#create_table(if_not_exists: false) ⇒ Object



344
345
346
347
348
349
# File 'lib/webhookdb/replicator/base.rb', line 344

def create_table(if_not_exists: false)
  cmd = self.create_table_modification(if_not_exists:)
  self.admin_dataset(timeout: :fast) do |ds|
    cmd.execute(ds.db)
  end
end

#create_table_modification(if_not_exists: false) ⇒ Webhookdb::Replicator::SchemaModification

Return the schema modification used to create the table where it does nto exist.



353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/webhookdb/replicator/base.rb', line 353

def create_table_modification(if_not_exists: false)
  table = self.dbadapter_table
  columns = [self.primary_key_column, self.remote_key_column]
  columns.concat(self.storable_columns)
  # 'data' column should be last, since it's very large, we want to see other columns in psql/pgcli first
  columns << self.data_column
  adapter = Webhookdb::DBAdapter::PG.new
  result = Webhookdb::Replicator::SchemaModification.new
  create_table = adapter.create_table_sql(table, columns, if_not_exists:, partition: self.partitioning)
  result.transaction_statements << create_table
  result.transaction_statements.concat(self.create_table_partitions(adapter))
  self.indices(table).each do |dbindex|
    result.transaction_statements << adapter.create_index_sql(dbindex, concurrently: false)
  end
  result.application_database_statements << self.service_integration.ensure_sequence_sql if self.requires_sequence?
  return result
end

#create_table_partitions(adapter) ⇒ Object



385
386
387
388
389
390
391
392
393
394
395
396
397
# File 'lib/webhookdb/replicator/base.rb', line 385

def create_table_partitions(adapter)
  return [] unless self.partition?
  # We only need create_table partitions when we create the table.
  # Range partitions would be created on demand, when inserting rows and the partition doesn't exist.
  return [] unless self.partitioning.by == Webhookdb::DBAdapter::Partitioning::HASH

  max_partition = self.service_integration.partition_value
  raise Webhookdb::InvalidPrecondition, "partition value must be positive" unless max_partition.positive?
  stmts = (0...max_partition).map do |i|
    adapter.create_hash_partition_sql(self.dbadapter_table, max_partition, i)
  end
  return stmts
end

#data_columnWebhookdb::DBAdapter::Column



458
459
460
# File 'lib/webhookdb/replicator/base.rb', line 458

def data_column
  return Webhookdb::DBAdapter::Column.new(name: :data, type: OBJECT, nullable: false)
end

#dbadapter_tableWebhookdb::DBAdapter::Table

Return a DBAdapter table based on the schema_and_table_symbols.



99
100
101
102
103
104
# File 'lib/webhookdb/replicator/base.rb', line 99

def dbadapter_table
  sch, tbl = self.schema_and_table_symbols
  schema = Webhookdb::DBAdapter::Schema.new(name: sch)
  table = Webhookdb::DBAdapter::Table.new(name: tbl, schema:)
  return table
end

#denormalized_columnsArray<Webhookdb::DBAdapter::Column>

Returns:



470
471
472
# File 'lib/webhookdb/replicator/base.rb', line 470

def denormalized_columns
  return self._denormalized_columns.map(&:to_dbadapter)
end

#descriptorWebhookdb::Replicator::Descriptor



36
37
38
# File 'lib/webhookdb/replicator/base.rb', line 36

def descriptor
  return @descriptor ||= self.class.descriptor
end

#dispatch_request_to(request) ⇒ Webhookdb::Replicator::Base

A given HTTP request may not be handled by the service integration it was sent to, for example where the service integration is part of some ‘root’ hierarchy. This method is called in the webhook endpoint, and should return the replicator used to handle the webhook request. The request is validated by the returned instance, and it is enqueued for processing.

By default, the service called by the webhook is the one we want to use, so return self.

Parameters:

  • request (Rack::Request)

Returns:



693
694
695
# File 'lib/webhookdb/replicator/base.rb', line 693

def dispatch_request_to(request)
  return self
end

#documentation_urlObject



1061
# File 'lib/webhookdb/replicator/base.rb', line 1061

def documentation_url = nil

#enqueue_sync_targetsObject

Some replicators support ‘instant sync’, because they are upserted en-masse rather than row-by-row. That is, usually we run sync targets on a cron, because otherwise we’d need to run the sync target for every row. But if inserting is always done through backfilling, we know we have a useful set of results to sync, so don’t need to wait for cron.



1011
1012
1013
1014
1015
# File 'lib/webhookdb/replicator/base.rb', line 1011

def enqueue_sync_targets
  self.service_integration.sync_targets.each do |stgt|
    Webhookdb::Jobs::SyncTargetRunSync.perform_async(stgt.id)
  end
end

#enrichment_columnWebhookdb::DBAdapter::Column?

Column used to store enrichments. Return nil if the service does not use enrichments.



464
465
466
467
# File 'lib/webhookdb/replicator/base.rb', line 464

def enrichment_column
  return nil unless self._store_enrichment_body?
  return Webhookdb::DBAdapter::Column.new(name: :enrichment, type: OBJECT, nullable: true)
end

#ensure_all_columnsObject

We support adding columns to existing integrations without having to bump the version; changing types, or removing/renaming columns, is not supported and should bump the version or must be handled out-of-band (like deleting the integration then backfilling). To figure out what columns we need to add, we can check what are currently defined, check what exists, and add denormalized columns and indices for those that are missing.



564
565
566
567
568
569
570
571
572
573
574
# File 'lib/webhookdb/replicator/base.rb', line 564

def ensure_all_columns
  modification = self.ensure_all_columns_modification
  return if modification.noop?
  self.admin_dataset(timeout: :slow_schema) do |ds|
    modification.execute(ds.db)
    # We need to clear cached columns on the data since we know we're adding more.
    # It's probably not a huge deal but may as well keep it in sync.
    ds.send(:clear_columns_cache)
  end
  self.readonly_dataset { |ds| ds.send(:clear_columns_cache) }
end

#ensure_all_columns_modificationWebhookdb::Replicator::SchemaModification



577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
# File 'lib/webhookdb/replicator/base.rb', line 577

def ensure_all_columns_modification
  existing_cols, existing_indices, existing_partitions = nil
  max_pk = 0
  sint = self.service_integration
  self.admin_dataset do |ds|
    return self.create_table_modification unless ds.db.table_exists?(self.qualified_table_sequel_identifier)
    existing_cols = ds.columns.to_set
    existing_indices = ds.db[:pg_indexes].where(
      schemaname: sint.organization.replication_schema,
      tablename: sint.table_name,
    ).select_map(:indexname).to_set
    max_pk = ds.max(:pk) || 0
    existing_partitions = self.existing_partitions(ds.db)
  end
  adapter = Webhookdb::DBAdapter::PG.new
  table = self.dbadapter_table
  result = Webhookdb::Replicator::SchemaModification.new

  missing_columns = self._denormalized_columns.delete_if { |c| existing_cols.include?(c.name) }
  # Add missing columns
  missing_columns.each do |whcol|
    # Don't bother bulking the ADDs into a single ALTER TABLE, it won't really matter.
    result.transaction_statements << adapter.add_column_sql(table, whcol.to_dbadapter)
  end
  # Easier to handle this explicitly than use storage_columns, but it a duplicated concept so be careful.
  if (enrich_col = self.enrichment_column) && !existing_cols.include?(enrich_col.name)
    result.transaction_statements << adapter.add_column_sql(table, enrich_col)
  end

  # Backfill values for new columns.
  if missing_columns.any?
    # We need to backfill values into the new column, but we don't want to lock the entire table
    # as we update each row. So we need to update in chunks of rows.
    # Chunk size should be large for speed (and sending over fewer queries), but small enough
    # to induce a viable delay if another query is updating the same row.
    # Note that the delay will only be for writes to those rows; reads will not block,
    # so something a bit longer should be ok.
    #
    # Note that at the point these UPDATEs are running, we have the new column AND the new code inserting
    # into that new column. We could in theory skip all the PKs that were added after this modification
    # started to run. However considering the number of rows in this window will always be relatively low
    # (though not absolutely low), and the SQL backfill operation should yield the same result
    # as the Ruby operation, this doesn't seem too important.
    result.nontransaction_statements.concat(missing_columns.filter_map(&:backfill_statement))
    update_expr = missing_columns.to_h { |c| [c.name, c.backfill_expr || c.to_sql_expr] }
    self.admin_dataset do |ds|
      chunks = Webhookdb::Replicator::Base.chunked_row_update_bounds(max_pk)
      chunks[...-1].each do |(lower, upper)|
        update_query = ds.where { pk > lower }.where { pk <= upper }.update_sql(update_expr)
        result.nontransaction_statements << update_query
      end
      final_update_query = ds.where { pk > chunks[-1][0] }.update_sql(update_expr)
      result.nontransaction_statements << final_update_query
    end
  end

  # Add missing indices. This should happen AFTER the UPDATE calls so the UPDATEs don't have to update indices.
  self.indices(table).map do |index|
    next if existing_indices.include?(index.name.to_s)
    result.nontransaction_statements.concat(
      adapter.create_index_sqls(index, concurrently: true, partitions: existing_partitions),
    )
  end

  result.application_database_statements << sint.ensure_sequence_sql if self.requires_sequence?
  return result
end

#existing_partitions(_db) ⇒ Array<Webhookdb::DBAdapter::Partition>

Return the partitions belonging to the table. Return an empty array if this replicator is not partitioned.

Returns:

Raises:

  • (NotImplementedError)


380
381
382
383
# File 'lib/webhookdb/replicator/base.rb', line 380

def existing_partitions(_db)
  raise NotImplementedError if self.partition?
  return []
end

#find_dependent(service_name) ⇒ Webhookdb::ServiceIntegration?

Find a dependent service integration with the given service name. If none are found, return nil. If multiple are found, raise, as this should only be used for automatically managed integrations.

Parameters:

  • service_name (String, Array<String>)

Returns:

Raises:



323
324
325
326
327
328
329
# File 'lib/webhookdb/replicator/base.rb', line 323

def find_dependent(service_name)
  names = service_name.respond_to?(:to_ary) ? service_name : [service_name]
  sints = self.service_integration.dependents.filter { |si| names.include?(si.service_name) }
  raise Webhookdb::InvalidPrecondition, "there are multiple #{names.join('/')} integrations in dependents" if
    sints.length > 1
  return sints.first
end

#find_dependent!(service_name) ⇒ Webhookdb::ServiceIntegration



332
333
334
335
336
# File 'lib/webhookdb/replicator/base.rb', line 332

def find_dependent!(service_name)
  sint = self.find_dependent(service_name)
  raise Webhookdb::InvalidPrecondition, "there is no #{service_name} integration in dependents" if sint.nil?
  return sint
end

#indices(table) ⇒ Array<Webhookdb::DBAdapter::Index>

Returns:



533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
# File 'lib/webhookdb/replicator/base.rb', line 533

def indices(table)
  dba_columns = [self.primary_key_column, self.remote_key_column]
  dba_columns.concat(self.storable_columns)
  dba_cols_by_name = dba_columns.index_by(&:name)

  result = []
  dba_columns.select(&:index?).each do |c|
    targets = [c]
    idx_name = self.index_name(targets)
    result << Webhookdb::DBAdapter::Index.new(name: idx_name.to_sym, table:, targets:, where: c.index_where)
  end
  self._extra_index_specs.each do |spec|
    targets = spec.columns.map { |n| dba_cols_by_name.fetch(n) }
    idx_name = self.index_name(targets, identifier: spec.identifier)
    result << Webhookdb::DBAdapter::Index.new(name: idx_name.to_sym, table:, targets:, where: spec.where)
  end
  index_names = result.map(&:name)
  if (dupes = index_names.find_all.with_index { |n, idx| idx != index_names.rindex(n) }).any?
    msg = "Duplicate index names detected. Use the 'name' attribute to differentiate: " +
      dupes.map(&:to_s).join(", ")
    raise Webhookdb::Replicator::BrokenSpecification, msg
  end

  return result
end

#on_backfill_error(e) ⇒ Object

Called when the #backfill method errors. This can do something like dispatch a developer alert. The handler must raise in order to stop the job from processing- if nothing is raised, the original exception will be raised instead. By default, this method noops, so the original exception is raised.

Parameters:

  • e (Exception)


1149
# File 'lib/webhookdb/replicator/base.rb', line 1149

def on_backfill_error(e) = nil

#on_dependency_webhook_upsert(replicator, payload, changed:) ⇒ Object

Called when the upstream dependency upserts. In most cases, you can noop; but in some cases, you may want to update or fetch rows. One example would be a ‘db only’ integration, where values are taken from the parent service and added to this service’s table. We may want to upsert rows in our table whenever a row in our parent table changes.

Parameters:

Raises:

  • (NotImplementedError)


1226
1227
1228
# File 'lib/webhookdb/replicator/base.rb', line 1226

def on_dependency_webhook_upsert(replicator, payload, changed:)
  raise NotImplementedError, "this must be overridden for replicators that have dependencies"
end

#partition?Boolean

True if the replicator uses partitioning.

Returns:

  • (Boolean)


372
373
374
# File 'lib/webhookdb/replicator/base.rb', line 372

def partition? = false
# Non-nil only if +partition?+ is true.
# @return [Webhookdb::DBAdapter::Partitioning,nil]

#partitioningWebhookdb::DBAdapter::Partitioning?

Non-nil only if partition? is true.



375
# File 'lib/webhookdb/replicator/base.rb', line 375

def partitioning = nil

#preferred_create_state_machine_methodSymbol

If the integration supports webhooks, then we want to do that on create. If it’s backfill only, then we fall back to that instead. Things like choosing dependencies are webhook-vs-backfill agnostic, so which machine we choose isn’t that important (but it does happen during creation).

Returns:

  • (Symbol)


201
202
203
# File 'lib/webhookdb/replicator/base.rb', line 201

def preferred_create_state_machine_method
  return self.descriptor.supports_webhooks? ? :calculate_webhook_state_machine : :calculate_backfill_state_machine
end

#preprocess_headers_for_logging(headers) ⇒ Object

In some cases, services may send us sensitive headers we do not want to log. This should be very rare but some services are designed really badly and send auth info in the webhook. Remove or obfuscate the passed header hash.



76
# File 'lib/webhookdb/replicator/base.rb', line 76

def preprocess_headers_for_logging(headers); end

#primary_key_columnWebhookdb::DBAdapter::Column



443
444
445
# File 'lib/webhookdb/replicator/base.rb', line 443

def primary_key_column
  return Webhookdb::DBAdapter::Column.new(name: :pk, type: BIGINT, pk: true)
end

#process_state_change(field, value, attr: nil) ⇒ Webhookdb::Replicator::StateMachineStep

Set the new service integration field and return the newly calculated state machine.

Subclasses can override this method and then super, to change the field or value.

Parameters:

  • field (String)

    Like ‘webhook_secret’, ‘backfill_key’, etc.

  • value (String)

    The value of the field.

  • attr (String) (defaults to: nil)

    Subclasses can pass in a custom field that does not correspond to a service integration column. When doing that, they must pass in attr, which is what will be set during the state change.

Returns:



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/webhookdb/replicator/base.rb', line 159

def process_state_change(field, value, attr: nil)
  attr ||= field
  desc = self.descriptor
  value = value.strip if value.respond_to?(:strip)
  case field
    when *self._webhook_state_change_fields
      # If we don't support webhooks, then the backfill state machine may be using it.
      meth = desc.supports_webhooks? ? :calculate_webhook_state_machine : :calculate_backfill_state_machine
    when *self._backfill_state_change_fields
      # If we don't support backfilling, then the create state machine may be using them.
      meth = desc.supports_backfill? ? :calculate_backfill_state_machine : :calculate_webhook_state_machine
    when "dependency_choice"
      # Choose an upstream dependency for an integration.
      # See where this is used for more details.
      meth = self.preferred_create_state_machine_method
      value = self._find_dependency_candidate(value)
      attr = "depends_on"
    when "noop_create"
      # Use this to just recalculate the state machine,
      # not make any changes to the data.
      return self.calculate_preferred_create_state_machine
    else
      raise ArgumentError, "Field '#{field}' is not valid for a state change"
  end
  self.service_integration.db.transaction do
    self.service_integration.send(:"#{attr}=", value)
    self.service_integration.save_changes
    step = self.send(meth)
    if step.successful? && meth == :calculate_backfill_state_machine
      # If we are processing the backfill state machine, and we finish successfully,
      # we always want to start syncing.
      self._enqueue_backfill_jobs(incremental: true)
    end
    return step
  end
end

#process_webhooks_synchronously?Boolean

Return true if the service should process webhooks in the actual endpoint, rather than asynchronously through the job system. This should ONLY be used where we have important order-of-operations in webhook processing and/or need to return data to the webhook sender.

NOTE: You MUST implement synchronous_processing_response_body if this returns true.

Returns:

  • (Boolean)


56
57
58
# File 'lib/webhookdb/replicator/base.rb', line 56

def process_webhooks_synchronously?
  return false
end

#qualified_table_sequel_identifier(schema: nil, table: nil) ⇒ Sequel::SQL::QualifiedIdentifier

Return a Sequel identifier using schema_and_table_symbols, or schema or table as overrides if given.

Returns:

  • (Sequel::SQL::QualifiedIdentifier)


92
93
94
95
# File 'lib/webhookdb/replicator/base.rb', line 92

def qualified_table_sequel_identifier(schema: nil, table: nil)
  sch, tbl = self.schema_and_table_symbols
  return Sequel[schema || sch][table || tbl]
end

#readonly_dataset(**kw) ⇒ Sequel::Dataset

Yield to a dataset using the readonly connection.

Returns:

  • (Sequel::Dataset)


972
973
974
# File 'lib/webhookdb/replicator/base.rb', line 972

def readonly_dataset(**kw, &)
  self.with_dataset(self.service_integration.organization.readonly_connection_url_raw, **kw, &)
end

#remote_key_columnWebhookdb::DBAdapter::Column



448
449
450
451
452
453
454
455
# File 'lib/webhookdb/replicator/base.rb', line 448

def remote_key_column
  c = self._remote_key_column
  if c.index?
    msg = "_remote_key_column index:true should not be set, since it automatically gets a unique index"
    Kernel.warn msg
  end
  return c.to_dbadapter(unique: true, nullable: false, index: false)
end

#requires_sequence?Boolean

Some integrations require sequences, like when upserting rows with numerical unique ids (if they were random values like UUIDs we could generate them and not use a sequence). In those cases, the integrations can mark themselves as requiring a sequence.

The sequence will be created in the *application database*, but it used primarily when inserting rows into the *organization/replication database*. This is necessary because things like sequences are not possible to migrate when moving replication databases.

Returns:

  • (Boolean)


678
679
680
# File 'lib/webhookdb/replicator/base.rb', line 678

def requires_sequence?
  return false
end

#resource_name_pluralObject



44
45
46
# File 'lib/webhookdb/replicator/base.rb', line 44

def resource_name_plural
  return @resource_name_plural ||= self.descriptor.resource_name_plural
end

#resource_name_singularObject



40
41
42
# File 'lib/webhookdb/replicator/base.rb', line 40

def resource_name_singular
  return @resource_name_singular ||= self.descriptor.resource_name_singular
end

#schema_and_table_symbolsArray<Symbol>

Return a tuple of (schema, table) based on the organization’s replication schema, and the service integration’s table name.

Returns:

  • (Array<Symbol>)


82
83
84
85
86
# File 'lib/webhookdb/replicator/base.rb', line 82

def schema_and_table_symbols
  sch = self.service_integration.organization&.replication_schema&.to_sym || :public
  tbl = self.service_integration.table_name.to_sym
  return [sch, tbl]
end

#storable_columnsArray<Webhookdb::DBAdapter::Column>

Denormalized columns, plus the enrichment column if supported. Does not include the data or external id columns, though perhaps it should.

Returns:



484
485
486
487
488
489
490
# File 'lib/webhookdb/replicator/base.rb', line 484

def storable_columns
  cols = self.denormalized_columns
  if (enr = self.enrichment_column)
    cols << enr
  end
  return cols
end

#synchronous_processing_response_body(upserted:, request:) ⇒ String

Call with the value that was inserted by synchronous processing. Takes the row values being upserted (result upsert_webhook), and the arguments used to upsert it (arguments to upsert_webhook), and should return the body string to respond back with.

Parameters:

Returns:

  • (String)

Raises:

  • (NotImplementedError)


68
69
70
71
# File 'lib/webhookdb/replicator/base.rb', line 68

def synchronous_processing_response_body(upserted:, request:)
  return {message: "process synchronously"}.to_json if Webhookdb::Replicator.always_process_synchronously
  raise NotImplementedError, "must be implemented if process_webhooks_synchronously? is true"
end

#timestamp_columnWebhookdb::DBAdapter::Column

Column to use as the ‘timestamp’ for the row. This is usually some created or updated at timestamp.

Returns:

Raises:

  • (NotImplementedError)


495
496
497
498
499
500
# File 'lib/webhookdb/replicator/base.rb', line 495

def timestamp_column
  got = self._denormalized_columns.find { |c| c.name == self._timestamp_column_name }
  raise NotImplementedError, "#{self.descriptor.name} has no timestamp column #{self._timestamp_column_name}" if
    got.nil?
  return got.to_dbadapter
end

#upsert_has_deps?Boolean

Return true if the integration requires making an API call to upsert. This puts the sync into a lower-priority queue so it is less likely to block other processing. This is usually true if enrichments are involved.

Returns:

  • (Boolean)


826
827
828
# File 'lib/webhookdb/replicator/base.rb', line 826

def upsert_has_deps?
  return false
end

#upsert_webhook(request, **kw) ⇒ Array, Hash

Upsert a webhook request into the database. Note this is a WebhookRequest, NOT a Rack::Request.

Parameters:

Returns:

  • (Array, Hash)

    Inserted rows, or array of inserted rows if many.



712
713
714
715
716
717
718
719
720
# File 'lib/webhookdb/replicator/base.rb', line 712

def upsert_webhook(request, **kw)
  return self._upsert_webhook(request, **kw)
rescue Amigo::Retry::Error
  # Do not log this since it's expected/handled by Amigo
  raise
rescue StandardError => e
  self.logger.error("upsert_webhook_error", {request: request.as_json}, e)
  raise
end

#upsert_webhook_body(body, **kw) ⇒ Array, Hash

Upsert webhook using only a body. This is not valid for the rare integration which does not rely on request info, like when we have to take different action based on a request method.

Parameters:

  • body (Hash)

Returns:

  • (Array, Hash)

    Inserted rows, or array of inserted rows if many.



703
704
705
# File 'lib/webhookdb/replicator/base.rb', line 703

def upsert_webhook_body(body, **kw)
  return self.upsert_webhook(Webhookdb::Replicator::WebhookRequest.new(body:), **kw)
end

#verify_backfill_credentialsWebhookdb::CredentialVerificationResult

Try to verify backfill credentials, by fetching the first page of items. Only relevant for integrations supporting backfilling.

If an error is received, return ‘verify_backfill<http status>_err_msg` as the error message, if defined. So for example, a 401 will call the method _verify_backfill_401_err_msg if defined. If such a method is not defined, call and return _verify_backfill_err_msg.

Returns:

  • (Webhookdb::CredentialVerificationResult)


1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
# File 'lib/webhookdb/replicator/base.rb', line 1030

def verify_backfill_credentials
  backfiller = self._backfillers.first
  if backfiller.nil?
    # If for some reason we do not have a backfiller,
    # we can't verify credentials. This should never happen in practice,
    # because we wouldn't call this method if the integration doesn't support it.
    raise "No backfiller available for #{self.service_integration.inspect}"
  end
  begin
    # begin backfill attempt but do not return backfill result
    backfiller.fetch_backfill_page(nil, last_backfilled: nil)
  rescue Webhookdb::Http::Error => e
    msg = if self.respond_to?(:"_verify_backfill_#{e.status}_err_msg")
            self.send(:"_verify_backfill_#{e.status}_err_msg")
    else
      self._verify_backfill_err_msg
    end
    return CredentialVerificationResult.new(verified: false, message: msg)
  rescue TypeError, NoMethodError => e
    # if we don't incur an HTTP error, but do incur an Error due to differences in the shapes of anticipated
    # response data in the `fetch_backfill_page` function, we can assume that the credentials are okay
    self.logger.info "verify_backfill_credentials_expected_failure", e
    return CredentialVerificationResult.new(verified: true, message: "")
  end
  return CredentialVerificationResult.new(verified: true, message: "")
end

#webhook_endpointObject



1268
1269
1270
# File 'lib/webhookdb/replicator/base.rb', line 1268

def webhook_endpoint
  return self._webhook_endpoint
end

#webhook_response(request) ⇒ Webhookdb::WebhookResponse

Given a Rack request, return the webhook response object. Usually this performs verification of the request based on the webhook secret configured on the service integration. Note that if skip_webhook_verification is true on the service integration, this method always returns 201.

Parameters:

  • request (Rack::Request)

Returns:



122
123
124
125
# File 'lib/webhookdb/replicator/base.rb', line 122

def webhook_response(request)
  return Webhookdb::WebhookResponse.ok(status: 201) if self.service_integration.skip_webhook_verification
  return self._webhook_response(request)
end

#with_advisory_lock(key) ⇒ Object

Run the given block with a (try) advisory lock taken on a combination of:

  • The table OID for this replicator

  • The given key

Note this establishes a new DB connection for the advisory lock; we have had issues with advisory locks on reused connections, and this is safer than having a lock that is never released.



991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
# File 'lib/webhookdb/replicator/base.rb', line 991

def with_advisory_lock(key, &)
  url = self.service_integration.organization.admin_connection_url_raw
  got = nil
  Webhookdb::Dbutil.borrow_conn(url) do |conn|
    table_oid = conn.select(
      Sequel.function(:to_regclass, self.schema_and_table_symbols.join(".")).cast(:oid).as(:table_id),
    ).first[:table_id]
    self.logger.debug("taking_replicator_advisory_lock", table_oid:, key_id: key)
    Sequel::AdvisoryLock.new(conn, table_oid, key).with_lock? do
      got = yield
    end
  end
  return got
end