Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1
- Includes:
- DBAdapter::ColumnTypes
- Defined in:
- lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb
Overview
Front has a system of ‘channels’ but it is a challenge to use. This replicator leverages WebhookDB (and our existing Front app) to integrate Front and SignalWire messages, using a sort of two-way sync that implements the necessary Front channel contrWcts.
Note: In the future, we can abstract this to support other channels, with minimal changes.
We have the following concepts to keep in mind:
-
The front_message_v1 replicator stores ALL messages in Front (inbound and outbound).
-
The signalwire_message_v1 replicator stores ALL messages in SignalWire (inbound and outbound).
-
For two-way sync, we care that Outbound Front messages are turned into Outbound SignalWire messages, and Inbound SignalWire messages are turned into Inbound Front messages.
-
This means that, for the purpose of a two-way sync, this replicator can ‘enqueue’ deliveries by storing a row with either a Front message id (query Front for all outbound messages), or SignalWire message id (query signalwire for all inbound messages). When a row has both ids, it means it has been “delivered”, so to speak.
-
We can ignore inbound Front messages and outbound SignalWire messages (stored in their respective replicators), since those are created by this replicator.
This means that, rather than having to manage state between two event-based systems, we can converge to a correct state based on a given state. This is much easier (possible?) to reason about and test, and makes it possible to reuse code,
The order of operations is:
-
The channel description instructs the user to go to /v1/install/front_signalwire/setup.
-
This loads a terminal, showing instructions for how to set up (enabling the WebhookDB Front app, setting up SignalWire).
-
The state machine also asks for the phone number to use to send messages.
-
The phone number used to send messages is stored in the api_url.
-
-
The state machine prints out the API token to use in Front.
-
The api token is stored in the ‘webhookdb_api_key’ field, which is searchable.
-
-
The user is directed to Front, to install the WebhookDB SignalWire channel.
-
The user inputs their API token and connects the channel.
-
Front makes an ‘authorization’ request to /v1/install/front_signalwire/authorization.
-
This uses the API key to find the right front_signalwire_message_channel_app_v1 integration via the webhookdb_api_key field.
-
This stores the channel_id on the integration as the api_url.
-
-
Front makes ‘message’ requests to /v1/install/front_signalwire/message/<opaque id>.
-
This upserts a DB row into the front_message_v1 replicator.
-
It also enqueues a backfill of this replicator.
-
-
Front can make a ‘delete’ request to /v1/install/front_signalwire/message/<opaque id>.
-
This deletes deletes this service integration.
-
-
Because this replicator is a dependent of signalwire_message_v1 (see explanation below), whenever a signalwire row is updated, this replicator will be triggered and enqueue a backfill.
-
When this replicator backfills, it will:
-
Look for inbound SMS, and upsert a row into this replication table.
-
Look for outbound Front messages, and upsert a row into this replication table.
-
Find replication table rows without a signalwire id, and send an SMS.
-
Find replication table rows without a Front message id, and create a Front message using dev.frontapp.com/reference/sync-inbound-message
-
Defined Under Namespace
Classes: Backfiller
Constant Summary
Constants included from DBAdapter::ColumnTypes
DBAdapter::ColumnTypes::BIGINT, DBAdapter::ColumnTypes::BIGINT_ARRAY, DBAdapter::ColumnTypes::BOOLEAN, DBAdapter::ColumnTypes::COLUMN_TYPES, DBAdapter::ColumnTypes::DATE, DBAdapter::ColumnTypes::DECIMAL, DBAdapter::ColumnTypes::DOUBLE, DBAdapter::ColumnTypes::FLOAT, DBAdapter::ColumnTypes::INTEGER, DBAdapter::ColumnTypes::INTEGER_ARRAY, DBAdapter::ColumnTypes::OBJECT, DBAdapter::ColumnTypes::TEXT, DBAdapter::ColumnTypes::TEXT_ARRAY, DBAdapter::ColumnTypes::TIMESTAMP, DBAdapter::ColumnTypes::UUID
Constants inherited from Base
Instance Attribute Summary
Attributes inherited from Base
Class Method Summary collapse
Instance Method Summary collapse
- #_backfillers ⇒ Object
- #_denormalized_columns ⇒ Object
- #_front_recipient_phone(payload) ⇒ Object
- #_notify_dependents(inserting, changed) ⇒ Object
- #_remote_key_column ⇒ Object
- #_resource_and_event(request) ⇒ Object
- #_timestamp_column_name ⇒ Object
- #_update_where_expr ⇒ Object
- #_webhook_response(request) ⇒ Object
- #calculate_backfill_state_machine ⇒ Object
- #calculate_webhook_state_machine ⇒ Object
- #clear_webhook_information ⇒ Object
- #format_phone(s) ⇒ Object
- #front_channel_id ⇒ Object
- #front_channel_id=(c) ⇒ Object
- #on_dependency_webhook_upsert(_replicator, payload, changed:) ⇒ Object
- #process_webhooks_synchronously? ⇒ Boolean
- #support_phone ⇒ Object
- #synchronous_processing_response_body(upserted:, request:) ⇒ Object
Methods inherited from Base
#_any_subscriptions_to_notify?, #_backfill_state_change_fields, #_clear_backfill_information, #_clear_webook_information, #_coalesce_excluded_on_update, #_enqueue_backfill_jobs, #_extra_index_specs, #_fetch_enrichment, #_find_dependency_candidate, #_parallel_backfill, #_prepare_for_insert, #_publish_rowupsert, #_resource_to_data, #_store_enrichment_body?, #_to_json, #_upsert_update_expr, #_upsert_webhook, #_verify_backfill_err_msg, #_webhook_state_change_fields, #admin_dataset, #backfill, #backfill_not_supported_message, #calculate_and_backfill_state_machine, #calculate_dependency_state_machine_step, #calculate_preferred_create_state_machine, chunked_row_update_bounds, #clear_backfill_information, #create_table, #create_table_modification, #data_column, #dbadapter_table, #denormalized_columns, #descriptor, #dispatch_request_to, #documentation_url, #enqueue_sync_targets, #enrichment_column, #ensure_all_columns, #ensure_all_columns_modification, #find_dependent, #find_dependent!, #indices, #initialize, #preferred_create_state_machine_method, #preprocess_headers_for_logging, #primary_key_column, #process_state_change, #qualified_table_sequel_identifier, #readonly_dataset, #remote_key_column, #requires_sequence?, #resource_name_plural, #resource_name_singular, #schema_and_table_symbols, #storable_columns, #timestamp_column, #upsert_has_deps?, #upsert_webhook, #upsert_webhook_body, #verify_backfill_credentials, #webhook_endpoint, #webhook_response
Constructor Details
This class inherits a constructor from Webhookdb::Replicator::Base
Class Method Details
.descriptor ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 63 def self.descriptor return Webhookdb::Replicator::Descriptor.new( name: "front_signalwire_message_channel_app_v1", ctor: self, feature_roles: [], resource_name_singular: "Front/SignalWire Message", dependency_descriptor: Webhookdb::Replicator::SignalwireMessageV1.descriptor, supports_webhooks: true, supports_backfill: true, api_docs_url: "https://dev.frontapp.com/docs/getting-started-with-partner-channels", ) end |
Instance Method Details
#_backfillers ⇒ Object
232 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 232 def _backfillers = [Backfiller.new(self)] |
#_denormalized_columns ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 80 def _denormalized_columns return [ Webhookdb::Replicator::Column.new(:signalwire_sid, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:front_message_id, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:external_conversation_id, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:row_updated_at, TIMESTAMP, defaulter: :now, optional: true, index: true), Webhookdb::Replicator::Column.new(:direction, TEXT), Webhookdb::Replicator::Column.new(:body, TEXT), Webhookdb::Replicator::Column.new(:sender, TEXT), Webhookdb::Replicator::Column.new(:recipient, TEXT), ] end |
#_front_recipient_phone(payload) ⇒ Object
204 205 206 207 208 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 204 def _front_recipient_phone(payload) recipient = payload["recipients"].find { |r| r.fetch("role") == "to" } raise Webhookdb::InvariantViolation, "no recipient found in #{payload}" if recipient.nil? return self.format_phone(recipient.fetch("handle")) end |
#_notify_dependents(inserting, changed) ⇒ Object
226 227 228 229 230 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 226 def _notify_dependents(inserting, changed) super return unless changed Webhookdb::BackfillJob.create_recursive(service_integration: self.service_integration, incremental: true).enqueue end |
#_remote_key_column ⇒ Object
76 77 78 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 76 def _remote_key_column return Webhookdb::Replicator::Column.new(:external_id, TEXT) end |
#_resource_and_event(request) ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 174 def _resource_and_event(request) type = request.body["type"] is_signalwire = type.nil? return request.body, nil if is_signalwire # This ends up being called for 'authorization' and 'delete' messages too. # Those are handled in the webhook response body. = ["message", "message_autoreply"].include?(type) return nil, nil unless resource = request.body.dup payload = resource.fetch("payload") mid = if type == "message" payload.fetch("id") else replied_to_id = payload["_links"]["related"]["message_replied_to"].split("/").last "#{replied_to_id}_autoreply" end resource["front_message_id"] = mid # Use the Front ID to identify this outbound message. resource["external_id"] = mid resource["direction"] = "outbound" resource["body"] = payload.fetch("text") resource["sender"] = self.support_phone resource["recipient"] = self._front_recipient_phone(payload) # All messages get the same conversation with SMS/chat, unlike email. resource["external_conversation_id"] = resource["recipient"] return resource, nil end |
#_timestamp_column_name ⇒ Object
93 94 95 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 93 def return :row_updated_at end |
#_update_where_expr ⇒ Object
97 98 99 100 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 97 def _update_where_expr return (self.qualified_table_sequel_identifier[:signalwire_sid] =~ nil) | (self.qualified_table_sequel_identifier[:front_message_id] =~ nil) end |
#_webhook_response(request) ⇒ Object
170 171 172 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 170 def _webhook_response(request) return Webhookdb::Front.webhook_response(request, Webhookdb::Front.signalwire_channel_app_secret) end |
#calculate_backfill_state_machine ⇒ Object
129 130 131 132 133 134 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 129 def calculate_backfill_state_machine # The backfills here are not normal backfills, requested by the customer. # They are procedurally enqueued when we upsert data. # So just reuse the webhook state machine. return self.calculate_webhook_state_machine end |
#calculate_webhook_state_machine ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 105 def calculate_webhook_state_machine if (step = self.calculate_dependency_state_machine_step(dependency_help: "")) return step end step = Webhookdb::Replicator::StateMachineStep.new if self.service_integration.api_url.blank? step.output = %(This Front Channel will be linked to a specific number in SignalWire. Choose the phone number to connect to Front.) return step.prompting("Phone number").api_url(self.service_integration) end self.service_integration.webhookdb_api_key ||= self.service_integration.new_api_key self.service_integration.save_changes step.output = %(Almost there! You can now finish installing the SignalWire Channel in Front. 1. In Front, go to Settings -> Company -> Channels (in the left nav), Connect a Channel, and choose the 'WebhookDB/SignalWire' channel. 2. In the 'Token' field, enter this API Key: #{self.service_integration.webhookdb_api_key} If you need to find this key, you can run `webhookdb integrations info front_signalwire_message_channel_app_v1`. All of this information can be found in the WebhookDB docs, at https://docs.webhookdb.com/guides/front-channel-signalwire/) return step.completed end |
#clear_webhook_information ⇒ Object
136 137 138 139 140 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 136 def clear_webhook_information # We say we support backfill, so this won't get cleared normally. self._clear_backfill_information super end |
#format_phone(s) ⇒ Object
102 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 102 def format_phone(s) = Webhookdb::PhoneNumber.format_e164(s) |
#front_channel_id ⇒ Object
164 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 164 def front_channel_id = self.service_integration.backfill_key |
#front_channel_id=(c) ⇒ Object
166 167 168 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 166 def front_channel_id=(c) self.service_integration.backfill_key = c end |
#on_dependency_webhook_upsert(_replicator, payload, changed:) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 210 def on_dependency_webhook_upsert(_replicator, payload, changed:) return unless changed return unless payload.fetch(:direction) == "inbound" return unless payload.fetch(:to) == self.support_phone body = JSON.parse(payload.fetch(:data)) body.merge!( "external_id" => payload.fetch(:signalwire_id), "signalwire_sid" => payload.fetch(:signalwire_id), "direction" => "inbound", "sender" => payload.fetch(:from), "recipient" => self.support_phone, "external_conversation_id" => payload.fetch(:from), ) self.upsert_webhook_body(body) end |
#process_webhooks_synchronously? ⇒ Boolean
142 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 142 def process_webhooks_synchronously? = true |
#support_phone ⇒ Object
103 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 103 def support_phone = self.format_phone(self.service_integration.api_url) |
#synchronous_processing_response_body(upserted:, request:) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 144 def synchronous_processing_response_body(upserted:, request:) case request.body["type"] when "authorization" self.front_channel_id = request.body.fetch("payload").fetch("channel_id") self.service_integration.save_changes return {type: "success", webhook_url: "#{Webhookdb.api_url}/v1/install/front_signalwire/channel"}.to_json when "delete" self.service_integration.destroy return "{}" when "message", "message_autoreply" return { type: "success", external_id: upserted.fetch(:external_id), external_conversation_id: upserted.fetch(:external_conversation_id), }.to_json else return "{}" end end |