Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1
- Includes:
- DBAdapter::ColumnTypes
- Defined in:
- lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb
Overview
Front has a system of ‘channels’ but it is a challenge to use. This replicator leverages WebhookDB (and our existing Front app) to integrate Front and SignalWire messages, using a sort of two-way sync that implements the necessary Front channel contrWcts.
Note: In the future, we can abstract this to support other channels, with minimal changes.
We have the following concepts to keep in mind:
-
The front_message_v1 replicator stores ALL messages in Front (inbound and outbound).
-
The signalwire_message_v1 replicator stores ALL messages in SignalWire (inbound and outbound).
-
For two-way sync, we care that Outbound Front messages are turned into Outbound SignalWire messages, and Inbound SignalWire messages are turned into Inbound Front messages.
-
This means that, for the purpose of a two-way sync, this replicator can ‘enqueue’ deliveries by storing a row with either a Front message id (query Front for all outbound messages), or SignalWire message id (query signalwire for all inbound messages). When a row has both ids, it means it has been “delivered”, so to speak.
-
We can ignore inbound Front messages and outbound SignalWire messages (stored in their respective replicators), since those are created by this replicator.
This means that, rather than having to manage state between two event-based systems, we can converge to a correct state based on a given state. This is much easier (possible?) to reason about and test, and makes it possible to reuse code,
The order of operations is:
-
The channel description instructs the user to go to /v1/install/front_signalwire/setup.
-
This loads a terminal, showing instructions for how to set up (enabling the WebhookDB Front app, setting up SignalWire).
-
The state machine also asks for the phone number to use to send messages.
-
The phone number used to send messages is stored in the api_url.
-
-
The state machine prints out the API token to use in Front.
-
The api token is stored in the ‘webhookdb_api_key’ field, which is searchable.
-
-
The user is directed to Front, to install the WebhookDB SignalWire channel.
-
The user inputs their API token and connects the channel.
-
Front makes an ‘authorization’ request to /v1/install/front_signalwire/authorization.
-
This uses the API key to find the right front_signalwire_message_channel_app_v1 integration via the webhookdb_api_key field.
-
This stores the channel_id on the integration as the api_url.
-
-
Front makes ‘message’ requests to /v1/install/front_signalwire/message/<opaque id>.
-
This upserts a DB row into the front_message_v1 replicator.
-
It also enqueues a backfill of this replicator.
-
-
Front can make a ‘delete’ request to /v1/install/front_signalwire/message/<opaque id>.
-
This deletes deletes this service integration.
-
-
Because this replicator is a dependent of signalwire_message_v1 (see explanation below), whenever a signalwire row is updated, this replicator will be triggered and enqueue a backfill.
-
When this replicator backfills, it will:
-
Look for inbound SMS, and upsert a row into this replication table.
-
Look for outbound Front messages, and upsert a row into this replication table.
-
Find replication table rows without a signalwire id, and send an SMS.
-
Find replication table rows without a Front message id, and create a Front message using dev.frontapp.com/reference/sync-inbound-message
-
Defined Under Namespace
Classes: Backfiller
Constant Summary
Constants included from DBAdapter::ColumnTypes
DBAdapter::ColumnTypes::BIGINT, DBAdapter::ColumnTypes::BIGINT_ARRAY, DBAdapter::ColumnTypes::BOOLEAN, DBAdapter::ColumnTypes::COLUMN_TYPES, DBAdapter::ColumnTypes::DATE, DBAdapter::ColumnTypes::DECIMAL, DBAdapter::ColumnTypes::DOUBLE, DBAdapter::ColumnTypes::FLOAT, DBAdapter::ColumnTypes::INTEGER, DBAdapter::ColumnTypes::INTEGER_ARRAY, DBAdapter::ColumnTypes::OBJECT, DBAdapter::ColumnTypes::TEXT, DBAdapter::ColumnTypes::TEXT_ARRAY, DBAdapter::ColumnTypes::TIMESTAMP, DBAdapter::ColumnTypes::UUID
Constants inherited from Base
Instance Attribute Summary
Attributes inherited from Base
Class Method Summary collapse
Instance Method Summary collapse
- #_backfillers ⇒ Object
- #_denormalized_columns ⇒ Object
- #_front_recipient_phones(payload) ⇒ Object
- #_notify_dependents(inserting, changed) ⇒ Object
- #_remote_key_column ⇒ Object
- #_resource_and_event(request) ⇒ Object
- #_timestamp_column_name ⇒ Object
- #_update_where_expr ⇒ Object
- #_webhook_response(request) ⇒ Object
-
#alert_async_failed_signalwire_send(sw_row) ⇒ Object
Send alerts for any undelivered or failed messages.
- #calculate_backfill_state_machine ⇒ Object
- #calculate_webhook_state_machine ⇒ Object
- #clear_webhook_information ⇒ Object
- #format_phone(s) ⇒ Object
- #front_channel_id ⇒ Object
- #front_channel_id=(c) ⇒ Object
- #on_dependency_webhook_upsert(_sw_replicator, sw_payload, changed:) ⇒ Object
- #process_webhooks_synchronously? ⇒ Boolean
- #support_phone ⇒ Object
- #sync_front_inbound_message(sender:, delivered_at:, body:, external_id:, external_conversation_id:) ⇒ Object
- #synchronous_processing_response_body(upserted:, request:) ⇒ Object
Methods inherited from Base
#_any_subscriptions_to_notify?, #_backfill_state_change_fields, #_clear_backfill_information, #_clear_webook_information, #_coalesce_excluded_on_update, #_enqueue_backfill_jobs, #_extra_index_specs, #_fetch_enrichment, #_find_dependency_candidate, #_parallel_backfill, #_prepare_for_insert, #_publish_rowupsert, #_resource_to_data, #_store_enrichment_body?, #_to_json, #_upsert_conflict_target, #_upsert_update_expr, #_upsert_webhook, #_upsert_webhook_single_resource, #_verify_backfill_err_msg, #_webhook_state_change_fields, #admin_dataset, #avoid_writes?, #backfill, #backfill_not_supported_message, #calculate_and_backfill_state_machine, #calculate_dependency_state_machine_step, #calculate_preferred_create_state_machine, chunked_row_update_bounds, #clear_backfill_information, #create_table, #create_table_modification, #create_table_partitions, #data_column, #dbadapter_table, #denormalized_columns, #descriptor, #dispatch_request_to, #documentation_url, #enqueue_sync_targets, #enrichment_column, #ensure_all_columns, #ensure_all_columns_modification, #existing_partitions, #find_dependent, #find_dependent!, #indices, #initialize, #on_backfill_error, #partition?, #partitioning, #preferred_create_state_machine_method, #preprocess_headers_for_logging, #primary_key_column, #process_state_change, #qualified_table_sequel_identifier, #readonly_dataset, #remote_key_column, #requires_sequence?, #resource_name_plural, #resource_name_singular, #schema_and_table_symbols, #storable_columns, #timestamp_column, #upsert_has_deps?, #upsert_webhook, #upsert_webhook_body, #verify_backfill_credentials, #webhook_endpoint, #webhook_response, #with_advisory_lock
Constructor Details
This class inherits a constructor from Webhookdb::Replicator::Base
Class Method Details
.descriptor ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 65 def self.descriptor return Webhookdb::Replicator::Descriptor.new( name: "front_signalwire_message_channel_app_v1", ctor: self, feature_roles: [], resource_name_singular: "Front/SignalWire Message", dependency_descriptor: Webhookdb::Replicator::SignalwireMessageV1.descriptor, supports_webhooks: true, supports_backfill: true, api_docs_url: "https://dev.frontapp.com/docs/getting-started-with-partner-channels", ) end |
Instance Method Details
#_backfillers ⇒ Object
304 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 304 def _backfillers = [Backfiller.new(self)] |
#_denormalized_columns ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 82 def _denormalized_columns return [ Webhookdb::Replicator::Column.new(:signalwire_sid, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:front_message_id, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:external_conversation_id, TEXT, optional: true, index: true), Webhookdb::Replicator::Column.new(:row_updated_at, TIMESTAMP, defaulter: :now, optional: true, index: true), Webhookdb::Replicator::Column.new(:direction, TEXT), Webhookdb::Replicator::Column.new(:body, TEXT), Webhookdb::Replicator::Column.new(:sender, TEXT), Webhookdb::Replicator::Column.new(:recipient, TEXT), ] end |
#_front_recipient_phones(payload) ⇒ Object
212 213 214 215 216 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 212 def _front_recipient_phones(payload) recipients = payload["recipients"].select { |r| r.fetch("role") == "to" } raise Webhookdb::InvariantViolation, "no recipient found in #{payload}" if recipients.empty? return recipients.map { |r| self.format_phone(r.fetch("handle")) } end |
#_notify_dependents(inserting, changed) ⇒ Object
245 246 247 248 249 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 245 def _notify_dependents(inserting, changed) super return unless changed Webhookdb::BackfillJob.create_recursive(service_integration: self.service_integration, incremental: true).enqueue end |
#_remote_key_column ⇒ Object
78 79 80 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 78 def _remote_key_column return Webhookdb::Replicator::Column.new(:external_id, TEXT) end |
#_resource_and_event(request) ⇒ Object
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 176 def _resource_and_event(request) type = request.body["type"] is_signalwire = type.nil? return request.body, nil if is_signalwire # This ends up being called for 'authorization' and 'delete' messages too. # Those are handled in the webhook response body. = ["message", "message_autoreply"].include?(type) return nil, nil unless resource = request.body.dup payload = resource.fetch("payload") mid = if type == "message" payload.fetch("id") else replied_to_id = payload["_links"]["related"]["message_replied_to"].split("/").last "#{replied_to_id}_autoreply" end resource["front_message_id"] = mid resource["direction"] = "outbound" resource["body"] = payload.fetch("text") resource["sender"] = self.support_phone resources = self._front_recipient_phones(payload).map do |recipient| r = resource.dup r["recipient"] = recipient # The same message can go to multiple recipients, but we want to treat them as separate conversations. # That is, we CANNOT use signalwire/front to do 'group chats' since we don't want to # allow one user to send a message that is sent to other users (would be a spam vector). r["external_id"] = "#{mid}-#{recipient}" # Thread this message into the recipient's specific conversation, unlike email. r["external_conversation_id"] = recipient r end return resources, nil end |
#_timestamp_column_name ⇒ Object
95 96 97 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 95 def return :row_updated_at end |
#_update_where_expr ⇒ Object
99 100 101 102 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 99 def _update_where_expr return (self.qualified_table_sequel_identifier[:signalwire_sid] =~ nil) | (self.qualified_table_sequel_identifier[:front_message_id] =~ nil) end |
#_webhook_response(request) ⇒ Object
172 173 174 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 172 def _webhook_response(request) return Webhookdb::Front.webhook_response(request, Webhookdb::Front.signalwire_channel_app_secret) end |
#alert_async_failed_signalwire_send(sw_row) ⇒ Object
Send alerts for any undelivered or failed messages. The (outbound) message is already created in Front, but if the Signalwire message fails to send, we need to import a new message into Front as a reply explaining why the message failed to send.
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 254 def alert_async_failed_signalwire_send(sw_row) idempotency_key = "fsmca-swfail-#{sw_row.fetch(:signalwire_id)}" idempotency = Webhookdb::Idempotency.once_ever.stored.using_seperate_connection.under_key(idempotency_key) idempotency.execute do # The 'sender' of this message is who the failed message is sent **to** sender = sw_row.fetch(:to) data = JSON.parse(sw_row.fetch(:data)) external_id = sw_row.fetch(:signalwire_id) external_conversation_id = sender trunc_body = data.fetch("body", "")[..25] body = "SMS failed to send. Error (#{data['error_code'] || '-'}): #{data['error_message'] || '-'}\n#{trunc_body}" kwargs = {sender:, delivered_at: Time.now.to_i, body:, external_id:, external_conversation_id:} # The call to Front MUST be done in a job, since if it fails, we would not be able to retry. # The code is called after the signalwire payload is upserted and changes; # but if this fails, the row won't change again in the future, # so this code wouldn't be called again. # This is a general problem and should probably have a general solution, # but because of the external call, it is important to guard against it. Webhookdb::Jobs::FrontSignalwireMessageChannelSyncInbound.perform_async( self.service_integration.id, kwargs.as_json, ) end end |
#calculate_backfill_state_machine ⇒ Object
131 132 133 134 135 136 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 131 def calculate_backfill_state_machine # The backfills here are not normal backfills, requested by the customer. # They are procedurally enqueued when we upsert data. # So just reuse the webhook state machine. return self.calculate_webhook_state_machine end |
#calculate_webhook_state_machine ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 107 def calculate_webhook_state_machine if (step = self.calculate_dependency_state_machine_step(dependency_help: "")) return step end step = Webhookdb::Replicator::StateMachineStep.new if self.service_integration.api_url.blank? step.output = %(This Front Channel will be linked to a specific number in SignalWire. Choose the phone number to connect to Front.) return step.prompting("Phone number").api_url(self.service_integration) end self.service_integration.webhookdb_api_key ||= self.service_integration.new_api_key self.service_integration.save_changes step.output = %(Almost there! You can now finish installing the SignalWire Channel in Front. 1. In Front, go to Settings -> Company -> Channels (in the left nav), Connect a Channel, and choose the 'WebhookDB/SignalWire' channel. 2. In the 'Token' field, enter this API Key: #{self.service_integration.webhookdb_api_key} If you need to find this key, you can run `webhookdb integrations info front_signalwire_message_channel_app_v1`. All of this information can be found in the WebhookDB docs, at https://docs.webhookdb.com/guides/front-channel-signalwire/) return step.completed end |
#clear_webhook_information ⇒ Object
138 139 140 141 142 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 138 def clear_webhook_information # We say we support backfill, so this won't get cleared normally. self._clear_backfill_information super end |
#format_phone(s) ⇒ Object
104 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 104 def format_phone(s) = Webhookdb::PhoneNumber.format_e164(s) |
#front_channel_id ⇒ Object
166 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 166 def front_channel_id = self.service_integration.backfill_key |
#front_channel_id=(c) ⇒ Object
168 169 170 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 168 def front_channel_id=(c) self.service_integration.backfill_key = c end |
#on_dependency_webhook_upsert(_sw_replicator, sw_payload, changed:) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 218 def on_dependency_webhook_upsert(_sw_replicator, sw_payload, changed:) return unless changed # If the signalwire message is failed, update the Front convo with a notification that the send failed failed_notifier_cutoff = Time.now - 4.days signalwire_send_failed = sw_payload.fetch(:date_updated) > failed_notifier_cutoff && ["failed", "undelivered"].include?(sw_payload.fetch(:status)) && sw_payload.fetch(:from) == self.support_phone self.alert_async_failed_signalwire_send(sw_payload) if signalwire_send_failed # If a message has come in from a user, insert a row so it'll be imported into Front signalwire_payload_inbound_to_support = sw_payload.fetch(:direction) == "inbound" && sw_payload.fetch(:to) == self.support_phone return unless signalwire_payload_inbound_to_support body = JSON.parse(sw_payload.fetch(:data)) body.merge!( "external_id" => sw_payload.fetch(:signalwire_id), "signalwire_sid" => sw_payload.fetch(:signalwire_id), "direction" => "inbound", "sender" => sw_payload.fetch(:from), "recipient" => self.support_phone, "external_conversation_id" => sw_payload.fetch(:from), ) self.upsert_webhook_body(body) end |
#process_webhooks_synchronously? ⇒ Boolean
144 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 144 def process_webhooks_synchronously? = true |
#support_phone ⇒ Object
105 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 105 def support_phone = self.format_phone(self.service_integration.api_url) |
#sync_front_inbound_message(sender:, delivered_at:, body:, external_id:, external_conversation_id:) ⇒ Object
278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 278 def (sender:, delivered_at:, body:, external_id:, external_conversation_id:) body = { sender: {handle: sender}, body:, delivered_at:, metadata: {external_id:, external_conversation_id:}, } token = JWT.encode( { iss: Webhookdb::Front.signalwire_channel_app_id, jti: Webhookdb::Front.channel_jwt_jti, sub: self.front_channel_id, exp: 10.seconds.from_now.to_i, }, Webhookdb::Front.signalwire_channel_app_secret, ) resp = Webhookdb::Http.post( "https://api2.frontapp.com/channels/#{self.front_channel_id}/inbound_messages", body, headers: {"Authorization" => "Bearer #{token}"}, timeout: Webhookdb::Front.http_timeout, logger: self.logger, ) return resp.parsed_response end |
#synchronous_processing_response_body(upserted:, request:) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 146 def synchronous_processing_response_body(upserted:, request:) case request.body["type"] when "authorization" self.front_channel_id = request.body.fetch("payload").fetch("channel_id") self.service_integration.save_changes return {type: "success", webhook_url: "#{Webhookdb.api_url}/v1/install/front_signalwire/channel"}.to_json when "delete" self.service_integration.destroy_self_and_all_dependents return "{}" when "message", "message_autoreply" return { type: "success", external_id: upserted.map { |r| r.fetch(:external_id) }.join(","), external_conversation_id: upserted.map { |r| r.fetch(:external_conversation_id) }.join(","), }.to_json else return "{}" end end |