Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1::Backfiller

Inherits:
Backfiller
  • Object
show all
Defined in:
lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb

Instance Method Summary collapse

Methods inherited from Backfiller

#_fetch_backfill_page_with_retry, #backfill, do_retry_wait, #max_backfill_retry_attempts, #wait_for_retry_attempt

Constructor Details

#initialize(replicator) ⇒ Backfiller

Returns a new instance of Backfiller.



307
308
309
310
311
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 307

def initialize(replicator)
  super()
  @replicator = replicator
  @signalwire_sint = replicator.service_integration.depends_on
end

Instance Method Details

#_send_sms(idempotency, from:, to:, body:) ⇒ Object



365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 365

def _send_sms(idempotency, from:, to:, body:)
  return idempotency.execute do
    Webhookdb::Signalwire.send_sms(
      from:,
      to:,
      body:,
      space_url: @signalwire_sint.api_url,
      project_id: @signalwire_sint.backfill_key,
      api_key: @signalwire_sint.backfill_secret,
      logger: @replicator.logger,
    )
  end
rescue Webhookdb::Http::Error => e
  response_body = e.body
  response_status = e.status
  request_url = e.uri.to_s
  @replicator.logger.warn("signalwire_send_sms_error",
                          response_body:, response_status:, request_url:, sms_from: from, sms_to: to,)
  code = begin
    # If this fails for whatever reason, or there is no 'code', re-raise the original error
    e.response.parsed_response["code"]
  rescue StandardError
    nil
  end
  # All known codes are for the integrator, not on the webhookdb code side.
  # https://developer.signalwire.com/guides/how-to-troubleshoot-common-messaging-issues
  raise e if code.nil?

  # Error handling note as of Jan 2025:
  # We are choosing to handle synchronous 'send sms' errors through the org alert system,
  # which will tell developers (not support agents) about the failure.
  # This is because, if this send fails, it will be retried later.
  # For example, if we sent a bulk message to 1000 customers,
  # and Signalwire was down and failed 500 sends, we would just retry the 500 sends.
  # We do NOT want to update the 500 failed conversations, and force support agents
  # to deal with the fallout of retrying a send only to those 500 people.
  message = Webhookdb::Messages::ErrorSignalwireSendSms.new(
    @replicator.service_integration,
    response_status:,
    response_body:,
    request_url:,
    request_method: "POST",
  )
  @replicator.service_integration.organization.alerting.dispatch_alert(message)
  return nil
end

#_sync_front_inbound(sender:, texted_at:, db_row:, body:) ⇒ Object



412
413
414
415
416
417
418
419
420
421
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 412

def _sync_front_inbound(sender:, texted_at:, db_row:, body:)
  body ||= "<no body>"
  return @replicator.sync_front_inbound_message(
    sender:,
    delivered_at: texted_at.to_i,
    body:,
    external_id: db_row.fetch(:external_id),
    external_conversation_id: db_row.fetch(:external_conversation_id),
  )
end

#fetch_backfill_pageObject



423
424
425
426
427
428
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 423

def fetch_backfill_page(*)
  rows = @replicator.admin_dataset do |ds|
    ds.where(Sequel[signalwire_sid: nil] | Sequel[front_message_id: nil]).all
  end
  return rows, nil
end

#handle_item(db_row) ⇒ Object



313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 313

def handle_item(db_row)
  front_id = db_row.fetch(:front_message_id)
  sw_id = db_row.fetch(:signalwire_sid)
  # This is sort of gross- we get the db row here, and need to re-update it with certain fields
  # as a result of the signalwire or front sync. To do that, we need to run the upsert on 'data',
  # but what's in 'data' is incomplete. So we use the db row to form a more fully complete 'data'.
  upserting_data = db_row.dup
  # Remove the columns that don't belong in 'data'
  upserting_data.delete(:pk)
  upserting_data.delete(:row_updated_at)
  # Splat the 'data' column into the row so it all gets put back into 'data'
  upserting_data.merge!(**upserting_data.delete(:data))
  if (front_id && sw_id) || (!front_id && !sw_id)
    msg = "row should have a front id OR signalwire id, should not have been inserted, or selected: #{db_row}"
    raise Webhookdb::InvariantViolation, msg
  end
  sender = @replicator.format_phone(db_row.fetch(:sender))
  recipient = @replicator.format_phone(db_row.fetch(:recipient))
  body = db_row.fetch(:body)
  idempotency_key = "fsmca-fims-#{db_row.fetch(:external_id)}"
  idempotency = Webhookdb::Idempotency.once_ever.stored.using_seperate_connection.under_key(idempotency_key)
  if front_id.nil?
    texted_at = Time.parse(db_row.fetch(:data).fetch("date_created"))
    if texted_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago
      # Do not sync old rows, just mark them synced
      upserting_data[:front_message_id] = "skipped_due_to_age"
    else
      # sync the message into Front
      front_response_body = idempotency.execute do
        self._sync_front_inbound(sender:, texted_at:, db_row:, body:)
      end
      upserting_data[:front_message_id] = front_response_body.fetch("message_uid")
    end
  else
    messaged_at = Time.at(db_row.fetch(:data).fetch("payload").fetch("created_at"))
    if messaged_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago
      # Do not sync old rows, just mark them synced
      upserting_data[:signalwire_sid] = "skipped_due_to_age"
    else
      # send the SMS via signalwire
      signalwire_resp = _send_sms(
        idempotency,
        from: sender,
        to: recipient,
        body:,
      )
      upserting_data[:signalwire_sid] = signalwire_resp.fetch("sid") if signalwire_resp
    end
  end
  @replicator.upsert_webhook_body(upserting_data.deep_stringify_keys)
end