Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1

Inherits:
Base
  • Object
show all
Includes:
DBAdapter::ColumnTypes
Defined in:
lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb

Overview

Front has a system of ‘channels’ but it is a challenge to use. This replicator leverages WebhookDB (and our existing Front app) to integrate Front and SignalWire messages, using a sort of two-way sync that implements the necessary Front channel contrWcts.

Note: In the future, we can abstract this to support other channels, with minimal changes.

We have the following concepts to keep in mind:

  • The front_message_v1 replicator stores ALL messages in Front (inbound and outbound).

  • The signalwire_message_v1 replicator stores ALL messages in SignalWire (inbound and outbound).

  • For two-way sync, we care that Outbound Front messages are turned into Outbound SignalWire messages, and Inbound SignalWire messages are turned into Inbound Front messages.

  • This means that, for the purpose of a two-way sync, this replicator can ‘enqueue’ deliveries by storing a row with either a Front message id (query Front for all outbound messages), or SignalWire message id (query signalwire for all inbound messages). When a row has both ids, it means it has been “delivered”, so to speak.

  • We can ignore inbound Front messages and outbound SignalWire messages (stored in their respective replicators), since those are created by this replicator.

This means that, rather than having to manage state between two event-based systems, we can converge to a correct state based on a given state. This is much easier (possible?) to reason about and test, and makes it possible to reuse code,

The order of operations is:

  • The channel description instructs the user to go to /v1/install/front_signalwire/setup.

  • This loads a terminal, showing instructions for how to set up (enabling the WebhookDB Front app, setting up SignalWire).

  • The state machine also asks for the phone number to use to send messages.

    • The phone number used to send messages is stored in the api_url.

  • The state machine prints out the API token to use in Front.

    • The api token is stored in the ‘webhookdb_api_key’ field, which is searchable.

  • The user is directed to Front, to install the WebhookDB SignalWire channel.

  • The user inputs their API token and connects the channel.

  • Front makes an ‘authorization’ request to /v1/install/front_signalwire/authorization.

    • This uses the API key to find the right front_signalwire_message_channel_app_v1 integration via the webhookdb_api_key field.

    • This stores the channel_id on the integration as the api_url.

  • Front makes ‘message’ requests to /v1/install/front_signalwire/message/<opaque id>.

    • This upserts a DB row into the front_message_v1 replicator.

    • It also enqueues a backfill of this replicator.

  • Front can make a ‘delete’ request to /v1/install/front_signalwire/message/<opaque id>.

    • This deletes deletes this service integration.

  • Because this replicator is a dependent of signalwire_message_v1 (see explanation below), whenever a signalwire row is updated, this replicator will be triggered and enqueue a backfill.

  • When this replicator backfills, it will:

    • Look for inbound SMS, and upsert a row into this replication table.

    • Look for outbound Front messages, and upsert a row into this replication table.

    • Find replication table rows without a signalwire id, and send an SMS.

    • Find replication table rows without a Front message id, and create a Front message using dev.frontapp.com/reference/sync-inbound-message

Defined Under Namespace

Classes: Backfiller

Constant Summary

Constants included from DBAdapter::ColumnTypes

DBAdapter::ColumnTypes::BIGINT, DBAdapter::ColumnTypes::BIGINT_ARRAY, DBAdapter::ColumnTypes::BOOLEAN, DBAdapter::ColumnTypes::COLUMN_TYPES, DBAdapter::ColumnTypes::DATE, DBAdapter::ColumnTypes::DECIMAL, DBAdapter::ColumnTypes::DOUBLE, DBAdapter::ColumnTypes::FLOAT, DBAdapter::ColumnTypes::INTEGER, DBAdapter::ColumnTypes::INTEGER_ARRAY, DBAdapter::ColumnTypes::OBJECT, DBAdapter::ColumnTypes::TEXT, DBAdapter::ColumnTypes::TEXT_ARRAY, DBAdapter::ColumnTypes::TIMESTAMP, DBAdapter::ColumnTypes::UUID

Constants inherited from Base

Base::MAX_INDEX_NAME_LENGTH

Instance Attribute Summary

Attributes inherited from Base

#service_integration

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#_any_subscriptions_to_notify?, #_backfill_state_change_fields, #_clear_backfill_information, #_clear_webook_information, #_coalesce_excluded_on_update, #_enqueue_backfill_jobs, #_extra_index_specs, #_fetch_enrichment, #_find_dependency_candidate, #_parallel_backfill, #_prepare_for_insert, #_publish_rowupsert, #_resource_to_data, #_store_enrichment_body?, #_to_json, #_upsert_conflict_target, #_upsert_update_expr, #_upsert_webhook, #_upsert_webhook_single_resource, #_verify_backfill_err_msg, #_webhook_state_change_fields, #admin_dataset, #avoid_writes?, #backfill, #backfill_not_supported_message, #calculate_and_backfill_state_machine, #calculate_dependency_state_machine_step, #calculate_preferred_create_state_machine, chunked_row_update_bounds, #clear_backfill_information, #create_table, #create_table_modification, #create_table_partitions, #data_column, #dbadapter_table, #denormalized_columns, #descriptor, #dispatch_request_to, #documentation_url, #enqueue_sync_targets, #enrichment_column, #ensure_all_columns, #ensure_all_columns_modification, #existing_partitions, #find_dependent, #find_dependent!, #indices, #initialize, #on_backfill_error, #partition?, #partitioning, #preferred_create_state_machine_method, #preprocess_headers_for_logging, #primary_key_column, #process_state_change, #qualified_table_sequel_identifier, #readonly_dataset, #remote_key_column, #requires_sequence?, #resource_name_plural, #resource_name_singular, #schema_and_table_symbols, #storable_columns, #timestamp_column, #upsert_has_deps?, #upsert_webhook, #upsert_webhook_body, #verify_backfill_credentials, #webhook_endpoint, #webhook_response, #with_advisory_lock

Constructor Details

This class inherits a constructor from Webhookdb::Replicator::Base

Class Method Details

.descriptorObject



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 65

def self.descriptor
  return Webhookdb::Replicator::Descriptor.new(
    name: "front_signalwire_message_channel_app_v1",
    ctor: self,
    feature_roles: [],
    resource_name_singular: "Front/SignalWire Message",
    dependency_descriptor: Webhookdb::Replicator::SignalwireMessageV1.descriptor,
    supports_webhooks: true,
    supports_backfill: true,
    api_docs_url: "https://dev.frontapp.com/docs/getting-started-with-partner-channels",
  )
end

Instance Method Details

#_backfillersObject



304
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 304

def _backfillers = [Backfiller.new(self)]

#_denormalized_columnsObject



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 82

def _denormalized_columns
  return [
    Webhookdb::Replicator::Column.new(:signalwire_sid, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:front_message_id, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:external_conversation_id, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:row_updated_at, TIMESTAMP, defaulter: :now, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:direction, TEXT),
    Webhookdb::Replicator::Column.new(:body, TEXT),
    Webhookdb::Replicator::Column.new(:sender, TEXT),
    Webhookdb::Replicator::Column.new(:recipient, TEXT),
  ]
end

#_front_recipient_phones(payload) ⇒ Object



212
213
214
215
216
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 212

def _front_recipient_phones(payload)
  recipients = payload["recipients"].select { |r| r.fetch("role") == "to" }
  raise Webhookdb::InvariantViolation, "no recipient found in #{payload}" if recipients.empty?
  return recipients.map { |r| self.format_phone(r.fetch("handle")) }
end

#_notify_dependents(inserting, changed) ⇒ Object



245
246
247
248
249
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 245

def _notify_dependents(inserting, changed)
  super
  return unless changed
  Webhookdb::BackfillJob.create_recursive(service_integration: self.service_integration, incremental: true).enqueue
end

#_remote_key_columnObject



78
79
80
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 78

def _remote_key_column
  return Webhookdb::Replicator::Column.new(:external_id, TEXT)
end

#_resource_and_event(request) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 176

def _resource_and_event(request)
  type = request.body["type"]
  is_signalwire = type.nil?
  return request.body, nil if is_signalwire

  # This ends up being called for 'authorization' and 'delete' messages too.
  # Those are handled in the webhook response body.
  is_message_type = ["message", "message_autoreply"].include?(type)
  return nil, nil unless is_message_type

  resource = request.body.dup
  payload = resource.fetch("payload")
  mid = if type == "message"
          payload.fetch("id")
    else
      replied_to_id = payload["_links"]["related"]["message_replied_to"].split("/").last
      "#{replied_to_id}_autoreply"
  end
  resource["front_message_id"] = mid
  resource["direction"] = "outbound"
  resource["body"] = payload.fetch("text")
  resource["sender"] = self.support_phone
  resources = self._front_recipient_phones(payload).map do |recipient|
    r = resource.dup
    r["recipient"] = recipient
    # The same message can go to multiple recipients, but we want to treat them as separate conversations.
    # That is, we CANNOT use signalwire/front to do 'group chats' since we don't want to
    # allow one user to send a message that is sent to other users (would be a spam vector).
    r["external_id"] = "#{mid}-#{recipient}"
    # Thread this message into the recipient's specific conversation, unlike email.
    r["external_conversation_id"] = recipient
    r
  end
  return resources, nil
end

#_timestamp_column_nameObject



95
96
97
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 95

def _timestamp_column_name
  return :row_updated_at
end

#_update_where_exprObject



99
100
101
102
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 99

def _update_where_expr
  return (self.qualified_table_sequel_identifier[:signalwire_sid] =~ nil) |
      (self.qualified_table_sequel_identifier[:front_message_id] =~ nil)
end

#_webhook_response(request) ⇒ Object



172
173
174
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 172

def _webhook_response(request)
  return Webhookdb::Front.webhook_response(request, Webhookdb::Front.signalwire_channel_app_secret)
end

#alert_async_failed_signalwire_send(sw_row) ⇒ Object

Send alerts for any undelivered or failed messages. The (outbound) message is already created in Front, but if the Signalwire message fails to send, we need to import a new message into Front as a reply explaining why the message failed to send.



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 254

def alert_async_failed_signalwire_send(sw_row)
  idempotency_key = "fsmca-swfail-#{sw_row.fetch(:signalwire_id)}"
  idempotency = Webhookdb::Idempotency.once_ever.stored.using_seperate_connection.under_key(idempotency_key)
  idempotency.execute do
    # The 'sender' of this message is who the failed message is sent **to**
    sender = sw_row.fetch(:to)
    data = JSON.parse(sw_row.fetch(:data))
    external_id = sw_row.fetch(:signalwire_id)
    external_conversation_id = sender
    trunc_body = data.fetch("body", "")[..25]
    body = "SMS failed to send. Error (#{data['error_code'] || '-'}): #{data['error_message'] || '-'}\n#{trunc_body}"
    kwargs = {sender:, delivered_at: Time.now.to_i, body:, external_id:, external_conversation_id:}
    # The call to Front MUST be done in a job, since if it fails, we would not be able to retry.
    # The code is called after the signalwire payload is upserted and changes;
    # but if this fails, the row won't change again in the future,
    # so this code wouldn't be called again.
    # This is a general problem and should probably have a general solution,
    # but because of the external call, it is important to guard against it.
    Webhookdb::Jobs::FrontSignalwireMessageChannelSyncInbound.perform_async(
      self.service_integration.id, kwargs.as_json,
    )
  end
end

#calculate_backfill_state_machineObject



131
132
133
134
135
136
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 131

def calculate_backfill_state_machine
  # The backfills here are not normal backfills, requested by the customer.
  # They are procedurally enqueued when we upsert data.
  # So just reuse the webhook state machine.
  return self.calculate_webhook_state_machine
end

#calculate_webhook_state_machineObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 107

def calculate_webhook_state_machine
  if (step = self.calculate_dependency_state_machine_step(dependency_help: ""))
    return step
  end
  step = Webhookdb::Replicator::StateMachineStep.new
  if self.service_integration.api_url.blank?
    step.output = %(This Front Channel will be linked to a specific number in SignalWire.
Choose the phone number to connect to Front.)
    return step.prompting("Phone number").api_url(self.service_integration)
  end
  self.service_integration.webhookdb_api_key ||= self.service_integration.new_api_key
  self.service_integration.save_changes
  step.output = %(Almost there! You can now finish installing the SignalWire Channel in Front.

1. In Front, go to Settings -> Company -> Channels (in the left nav), Connect a Channel,
 and choose the 'WebhookDB/SignalWire' channel.
2. In the 'Token' field, enter this API Key: #{self.service_integration.webhookdb_api_key}

If you need to find this key, you can run `webhookdb integrations info front_signalwire_message_channel_app_v1`.

All of this information can be found in the WebhookDB docs, at https://docs.webhookdb.com/guides/front-channel-signalwire/)
  return step.completed
end

#clear_webhook_informationObject



138
139
140
141
142
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 138

def clear_webhook_information
  # We say we support backfill, so this won't get cleared normally.
  self._clear_backfill_information
  super
end

#format_phone(s) ⇒ Object



104
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 104

def format_phone(s) = Webhookdb::PhoneNumber.format_e164(s)

#front_channel_idObject



166
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 166

def front_channel_id = self.service_integration.backfill_key

#front_channel_id=(c) ⇒ Object



168
169
170
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 168

def front_channel_id=(c)
  self.service_integration.backfill_key = c
end

#on_dependency_webhook_upsert(_sw_replicator, sw_payload, changed:) ⇒ Object



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 218

def on_dependency_webhook_upsert(_sw_replicator, sw_payload, changed:)
  return unless changed

  # If the signalwire message is failed, update the Front convo with a notification that the send failed
  failed_notifier_cutoff = Time.now - 4.days
  signalwire_send_failed = sw_payload.fetch(:date_updated) > failed_notifier_cutoff &&
    ["failed", "undelivered"].include?(sw_payload.fetch(:status)) &&
    sw_payload.fetch(:from) == self.support_phone
  self.alert_async_failed_signalwire_send(sw_payload) if signalwire_send_failed

  # If a message has come in from a user, insert a row so it'll be imported into Front
  signalwire_payload_inbound_to_support = sw_payload.fetch(:direction) == "inbound" &&
    sw_payload.fetch(:to) == self.support_phone
  return unless signalwire_payload_inbound_to_support

  body = JSON.parse(sw_payload.fetch(:data))
  body.merge!(
    "external_id" => sw_payload.fetch(:signalwire_id),
    "signalwire_sid" => sw_payload.fetch(:signalwire_id),
    "direction" => "inbound",
    "sender" => sw_payload.fetch(:from),
    "recipient" => self.support_phone,
    "external_conversation_id" => sw_payload.fetch(:from),
  )
  self.upsert_webhook_body(body)
end

#process_webhooks_synchronously?Boolean

Returns:

  • (Boolean)


144
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 144

def process_webhooks_synchronously? = true

#support_phoneObject



105
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 105

def support_phone = self.format_phone(self.service_integration.api_url)

#sync_front_inbound_message(sender:, delivered_at:, body:, external_id:, external_conversation_id:) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 278

def sync_front_inbound_message(sender:, delivered_at:, body:, external_id:, external_conversation_id:)
  body = {
    sender: {handle: sender},
    body:,
    delivered_at:,
    metadata: {external_id:, external_conversation_id:},
  }
  token = JWT.encode(
    {
      iss: Webhookdb::Front.signalwire_channel_app_id,
      jti: Webhookdb::Front.channel_jwt_jti,
      sub: self.front_channel_id,
      exp: 10.seconds.from_now.to_i,
    },
    Webhookdb::Front.signalwire_channel_app_secret,
  )
  resp = Webhookdb::Http.post(
    "https://api2.frontapp.com/channels/#{self.front_channel_id}/inbound_messages",
    body,
    headers: {"Authorization" => "Bearer #{token}"},
    timeout: Webhookdb::Front.http_timeout,
    logger: self.logger,
  )
  return resp.parsed_response
end

#synchronous_processing_response_body(upserted:, request:) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 146

def synchronous_processing_response_body(upserted:, request:)
  case request.body["type"]
    when "authorization"
      self.front_channel_id = request.body.fetch("payload").fetch("channel_id")
      self.service_integration.save_changes
      return {type: "success", webhook_url: "#{Webhookdb.api_url}/v1/install/front_signalwire/channel"}.to_json
    when "delete"
      self.service_integration.destroy_self_and_all_dependents
      return "{}"
    when "message", "message_autoreply"
      return {
        type: "success",
        external_id: upserted.map { |r| r.fetch(:external_id) }.join(","),
        external_conversation_id: upserted.map { |r| r.fetch(:external_conversation_id) }.join(","),
      }.to_json
    else
      return "{}"
  end
end