Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1::Backfiller

Inherits:
Backfiller
  • Object
show all
Defined in:
lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb

Instance Method Summary collapse

Methods inherited from Backfiller

#_fetch_backfill_page_with_retry, #backfill, do_retry_wait, #max_backfill_retry_attempts, #wait_for_retry_attempt

Constructor Details

#initialize(replicator) ⇒ Backfiller

Returns a new instance of Backfiller.



235
236
237
238
239
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 235

def initialize(replicator)
  super()
  @replicator = replicator
  @signalwire_sint = replicator.service_integration.depends_on
end

Instance Method Details

#_sync_front_inbound(sender:, texted_at:, item:, body:) ⇒ Object



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 289

def _sync_front_inbound(sender:, texted_at:, item:, body:)
  body = {
    sender: {handle: sender},
    body:,
    delivered_at: texted_at.to_i,
    metadata: {
      external_id: item.fetch(:external_id),
      external_conversation_id: item.fetch(:external_conversation_id),
    },
  }
  token = JWT.encode(
    {
      iss: Webhookdb::Front.signalwire_channel_app_id,
      jti: Webhookdb::Front.channel_jwt_jti,
      sub: @replicator.front_channel_id,
      exp: 10.seconds.from_now.to_i,
    },
    Webhookdb::Front.signalwire_channel_app_secret,
  )
  resp = Webhookdb::Http.post(
    "https://api2.frontapp.com/channels/#{@replicator.front_channel_id}/inbound_messages",
    body,
    headers: {"Authorization" => "Bearer #{token}"},
    timeout: Webhookdb::Front.http_timeout,
    logger: @replicator.logger,
  )
  resp.parsed_response
end

#fetch_backfill_pageObject



318
319
320
321
322
323
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 318

def fetch_backfill_page(*)
  rows = @replicator.admin_dataset do |ds|
    ds.where(Sequel[signalwire_sid: nil] | Sequel[front_message_id: nil]).all
  end
  return rows, nil
end

#handle_item(item) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 241

def handle_item(item)
  front_id = item.fetch(:front_message_id)
  sw_id = item.fetch(:signalwire_sid)
  if (front_id && sw_id) || (!front_id && !sw_id)
    msg = "row should have a front id OR signalwire id, should not have been inserted, or selected: #{item}"
    raise Webhookdb::InvariantViolation, msg
  end
  sender = @replicator.format_phone(item.fetch(:sender))
  recipient = @replicator.format_phone(item.fetch(:recipient))
  body = item.fetch(:body)
  idempotency_key = "fsmca-fims-#{item.fetch(:external_id)}"
  idempotency = Webhookdb::Idempotency.once_ever.stored.using_seperate_connection.under_key(idempotency_key)
  if front_id.nil?
    texted_at = Time.parse(item.fetch(:data).fetch("date_created"))
    if texted_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago
      # Do not sync old rows, just mark them synced
      item[:front_message_id] = "skipped_due_to_age"
    else
      # sync the message into Front
      front_response_body = idempotency.execute do
        self._sync_front_inbound(sender:, texted_at:, item:, body:)
      end
      item[:front_message_id] = front_response_body.fetch("message_uid")
    end
  else
    messaged_at = Time.at(item.fetch(:data).fetch("payload").fetch("created_at"))
    if messaged_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago
      # Do not sync old rows, just mark them synced
      item[:signalwire_sid] = "skipped_due_to_age"
    else
      # send the SMS via signalwire
      signalwire_resp = idempotency.execute do
        Webhookdb::Signalwire.send_sms(
          from: sender,
          to: recipient,
          body:,
          space_url: @signalwire_sint.api_url,
          project_id: @signalwire_sint.backfill_key,
          api_key: @signalwire_sint.backfill_secret,
          logger: @replicator.logger,
        )
      end
      item[:signalwire_sid] = signalwire_resp.fetch("sid")
    end
  end
  @replicator.upsert_webhook_body(item.deep_stringify_keys)
end