Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1

Inherits:
Base
  • Object
show all
Includes:
DBAdapter::ColumnTypes
Defined in:
lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb

Overview

Front has a system of ‘channels’ but it is a challenge to use. This replicator leverages WebhookDB (and our existing Front app) to integrate Front and SignalWire messages, using a sort of two-way sync that implements the necessary Front channel contrWcts.

Note: In the future, we can abstract this to support other channels, with minimal changes.

We have the following concepts to keep in mind:

  • The front_message_v1 replicator stores ALL messages in Front (inbound and outbound).

  • The signalwire_message_v1 replicator stores ALL messages in SignalWire (inbound and outbound).

  • For two-way sync, we care that Outbound Front messages are turned into Outbound SignalWire messages, and Inbound SignalWire messages are turned into Inbound Front messages.

  • This means that, for the purpose of a two-way sync, this replicator can ‘enqueue’ deliveries by storing a row with either a Front message id (query Front for all outbound messages), or SignalWire message id (query signalwire for all inbound messages). When a row has both ids, it means it has been “delivered”, so to speak.

  • We can ignore inbound Front messages and outbound SignalWire messages (stored in their respective replicators), since those are created by this replicator.

This means that, rather than having to manage state between two event-based systems, we can converge to a correct state based on a given state. This is much easier (possible?) to reason about and test, and makes it possible to reuse code,

The order of operations is:

  • The channel description instructs the user to go to /v1/install/front_signalwire/setup.

  • This loads a terminal, showing instructions for how to set up (enabling the WebhookDB Front app, setting up SignalWire).

  • The state machine also asks for the phone number to use to send messages.

    • The phone number used to send messages is stored in the api_url.

  • The state machine prints out the API token to use in Front.

    • The api token is stored in the ‘webhookdb_api_key’ field, which is searchable.

  • The user is directed to Front, to install the WebhookDB SignalWire channel.

  • The user inputs their API token and connects the channel.

  • Front makes an ‘authorization’ request to /v1/install/front_signalwire/authorization.

    • This uses the API key to find the right front_signalwire_message_channel_app_v1 integration via the webhookdb_api_key field.

    • This stores the channel_id on the integration as the api_url.

  • Front makes ‘message’ requests to /v1/install/front_signalwire/message/<opaque id>.

    • This upserts a DB row into the front_message_v1 replicator.

    • It also enqueues a backfill of this replicator.

  • Front can make a ‘delete’ request to /v1/install/front_signalwire/message/<opaque id>.

    • This deletes deletes this service integration.

  • Because this replicator is a dependent of signalwire_message_v1 (see explanation below), whenever a signalwire row is updated, this replicator will be triggered and enqueue a backfill.

  • When this replicator backfills, it will:

    • Look for inbound SMS, and upsert a row into this replication table.

    • Look for outbound Front messages, and upsert a row into this replication table.

    • Find replication table rows without a signalwire id, and send an SMS.

    • Find replication table rows without a Front message id, and create a Front message using dev.frontapp.com/reference/sync-inbound-message

Defined Under Namespace

Classes: Backfiller

Constant Summary

Constants included from DBAdapter::ColumnTypes

DBAdapter::ColumnTypes::BIGINT, DBAdapter::ColumnTypes::BIGINT_ARRAY, DBAdapter::ColumnTypes::BOOLEAN, DBAdapter::ColumnTypes::COLUMN_TYPES, DBAdapter::ColumnTypes::DATE, DBAdapter::ColumnTypes::DECIMAL, DBAdapter::ColumnTypes::DOUBLE, DBAdapter::ColumnTypes::FLOAT, DBAdapter::ColumnTypes::INTEGER, DBAdapter::ColumnTypes::INTEGER_ARRAY, DBAdapter::ColumnTypes::OBJECT, DBAdapter::ColumnTypes::TEXT, DBAdapter::ColumnTypes::TEXT_ARRAY, DBAdapter::ColumnTypes::TIMESTAMP, DBAdapter::ColumnTypes::UUID

Constants inherited from Base

Base::MAX_INDEX_NAME_LENGTH

Instance Attribute Summary

Attributes inherited from Base

#service_integration

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#_any_subscriptions_to_notify?, #_backfill_state_change_fields, #_clear_backfill_information, #_clear_webook_information, #_coalesce_excluded_on_update, #_enqueue_backfill_jobs, #_extra_index_specs, #_fetch_enrichment, #_find_dependency_candidate, #_parallel_backfill, #_prepare_for_insert, #_publish_rowupsert, #_resource_to_data, #_store_enrichment_body?, #_to_json, #_upsert_update_expr, #_upsert_webhook, #_verify_backfill_err_msg, #_webhook_state_change_fields, #admin_dataset, #backfill, #backfill_not_supported_message, #calculate_and_backfill_state_machine, #calculate_dependency_state_machine_step, #calculate_preferred_create_state_machine, chunked_row_update_bounds, #clear_backfill_information, #create_table, #create_table_modification, #data_column, #dbadapter_table, #denormalized_columns, #descriptor, #dispatch_request_to, #documentation_url, #enqueue_sync_targets, #enrichment_column, #ensure_all_columns, #ensure_all_columns_modification, #find_dependent, #find_dependent!, #indices, #initialize, #preferred_create_state_machine_method, #preprocess_headers_for_logging, #primary_key_column, #process_state_change, #qualified_table_sequel_identifier, #readonly_dataset, #remote_key_column, #requires_sequence?, #resource_name_plural, #resource_name_singular, #schema_and_table_symbols, #storable_columns, #timestamp_column, #upsert_has_deps?, #upsert_webhook, #upsert_webhook_body, #verify_backfill_credentials, #webhook_endpoint, #webhook_response

Constructor Details

This class inherits a constructor from Webhookdb::Replicator::Base

Class Method Details

.descriptorObject



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 63

def self.descriptor
  return Webhookdb::Replicator::Descriptor.new(
    name: "front_signalwire_message_channel_app_v1",
    ctor: self,
    feature_roles: [],
    resource_name_singular: "Front/SignalWire Message",
    dependency_descriptor: Webhookdb::Replicator::SignalwireMessageV1.descriptor,
    supports_webhooks: true,
    supports_backfill: true,
    api_docs_url: "https://dev.frontapp.com/docs/getting-started-with-partner-channels",
  )
end

Instance Method Details

#_backfillersObject



232
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 232

def _backfillers = [Backfiller.new(self)]

#_denormalized_columnsObject



80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 80

def _denormalized_columns
  return [
    Webhookdb::Replicator::Column.new(:signalwire_sid, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:front_message_id, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:external_conversation_id, TEXT, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:row_updated_at, TIMESTAMP, defaulter: :now, optional: true, index: true),
    Webhookdb::Replicator::Column.new(:direction, TEXT),
    Webhookdb::Replicator::Column.new(:body, TEXT),
    Webhookdb::Replicator::Column.new(:sender, TEXT),
    Webhookdb::Replicator::Column.new(:recipient, TEXT),
  ]
end

#_front_recipient_phone(payload) ⇒ Object



204
205
206
207
208
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 204

def _front_recipient_phone(payload)
  recipient = payload["recipients"].find { |r| r.fetch("role") == "to" }
  raise Webhookdb::InvariantViolation, "no recipient found in #{payload}" if recipient.nil?
  return self.format_phone(recipient.fetch("handle"))
end

#_notify_dependents(inserting, changed) ⇒ Object



226
227
228
229
230
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 226

def _notify_dependents(inserting, changed)
  super
  return unless changed
  Webhookdb::BackfillJob.create_recursive(service_integration: self.service_integration, incremental: true).enqueue
end

#_remote_key_columnObject



76
77
78
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 76

def _remote_key_column
  return Webhookdb::Replicator::Column.new(:external_id, TEXT)
end

#_resource_and_event(request) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 174

def _resource_and_event(request)
  type = request.body["type"]
  is_signalwire = type.nil?
  return request.body, nil if is_signalwire

  # This ends up being called for 'authorization' and 'delete' messages too.
  # Those are handled in the webhook response body.
  is_message_type = ["message", "message_autoreply"].include?(type)
  return nil, nil unless is_message_type

  resource = request.body.dup
  payload = resource.fetch("payload")
  mid = if type == "message"
          payload.fetch("id")
    else
      replied_to_id = payload["_links"]["related"]["message_replied_to"].split("/").last
      "#{replied_to_id}_autoreply"
  end
  resource["front_message_id"] = mid
  # Use the Front ID to identify this outbound message.
  resource["external_id"] = mid
  resource["direction"] = "outbound"
  resource["body"] = payload.fetch("text")
  resource["sender"] = self.support_phone
  resource["recipient"] = self._front_recipient_phone(payload)
  # All messages get the same conversation with SMS/chat, unlike email.
  resource["external_conversation_id"] = resource["recipient"]
  return resource, nil
end

#_timestamp_column_nameObject



93
94
95
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 93

def _timestamp_column_name
  return :row_updated_at
end

#_update_where_exprObject



97
98
99
100
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 97

def _update_where_expr
  return (self.qualified_table_sequel_identifier[:signalwire_sid] =~ nil) |
      (self.qualified_table_sequel_identifier[:front_message_id] =~ nil)
end

#_webhook_response(request) ⇒ Object



170
171
172
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 170

def _webhook_response(request)
  return Webhookdb::Front.webhook_response(request, Webhookdb::Front.signalwire_channel_app_secret)
end

#calculate_backfill_state_machineObject



129
130
131
132
133
134
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 129

def calculate_backfill_state_machine
  # The backfills here are not normal backfills, requested by the customer.
  # They are procedurally enqueued when we upsert data.
  # So just reuse the webhook state machine.
  return self.calculate_webhook_state_machine
end

#calculate_webhook_state_machineObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 105

def calculate_webhook_state_machine
  if (step = self.calculate_dependency_state_machine_step(dependency_help: ""))
    return step
  end
  step = Webhookdb::Replicator::StateMachineStep.new
  if self.service_integration.api_url.blank?
    step.output = %(This Front Channel will be linked to a specific number in SignalWire.
Choose the phone number to connect to Front.)
    return step.prompting("Phone number").api_url(self.service_integration)
  end
  self.service_integration.webhookdb_api_key ||= self.service_integration.new_api_key
  self.service_integration.save_changes
  step.output = %(Almost there! You can now finish installing the SignalWire Channel in Front.

1. In Front, go to Settings -> Company -> Channels (in the left nav), Connect a Channel,
 and choose the 'WebhookDB/SignalWire' channel.
2. In the 'Token' field, enter this API Key: #{self.service_integration.webhookdb_api_key}

If you need to find this key, you can run `webhookdb integrations info front_signalwire_message_channel_app_v1`.

All of this information can be found in the WebhookDB docs, at https://docs.webhookdb.com/guides/front-channel-signalwire/)
  return step.completed
end

#clear_webhook_informationObject



136
137
138
139
140
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 136

def clear_webhook_information
  # We say we support backfill, so this won't get cleared normally.
  self._clear_backfill_information
  super
end

#format_phone(s) ⇒ Object



102
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 102

def format_phone(s) = Webhookdb::PhoneNumber.format_e164(s)

#front_channel_idObject



164
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 164

def front_channel_id = self.service_integration.backfill_key

#front_channel_id=(c) ⇒ Object



166
167
168
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 166

def front_channel_id=(c)
  self.service_integration.backfill_key = c
end

#on_dependency_webhook_upsert(_replicator, payload, changed:) ⇒ Object



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 210

def on_dependency_webhook_upsert(_replicator, payload, changed:)
  return unless changed
  return unless payload.fetch(:direction) == "inbound"
  return unless payload.fetch(:to) == self.support_phone
  body = JSON.parse(payload.fetch(:data))
  body.merge!(
    "external_id" => payload.fetch(:signalwire_id),
    "signalwire_sid" => payload.fetch(:signalwire_id),
    "direction" => "inbound",
    "sender" => payload.fetch(:from),
    "recipient" => self.support_phone,
    "external_conversation_id" => payload.fetch(:from),
  )
  self.upsert_webhook_body(body)
end

#process_webhooks_synchronously?Boolean

Returns:

  • (Boolean)


142
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 142

def process_webhooks_synchronously? = true

#support_phoneObject



103
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 103

def support_phone = self.format_phone(self.service_integration.api_url)

#synchronous_processing_response_body(upserted:, request:) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 144

def synchronous_processing_response_body(upserted:, request:)
  case request.body["type"]
    when "authorization"
      self.front_channel_id = request.body.fetch("payload").fetch("channel_id")
      self.service_integration.save_changes
      return {type: "success", webhook_url: "#{Webhookdb.api_url}/v1/install/front_signalwire/channel"}.to_json
    when "delete"
      self.service_integration.destroy
      return "{}"
    when "message", "message_autoreply"
      return {
        type: "success",
        external_id: upserted.fetch(:external_id),
        external_conversation_id: upserted.fetch(:external_conversation_id),
      }.to_json
    else
      return "{}"
  end
end