Class: Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1::Backfiller
- Inherits:
-
Backfiller
- Object
- Backfiller
- Webhookdb::Replicator::FrontSignalwireMessageChannelAppV1::Backfiller
- Defined in:
- lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb
Instance Method Summary collapse
- #_sync_front_inbound(sender:, texted_at:, item:, body:) ⇒ Object
- #fetch_backfill_page ⇒ Object
- #handle_item(item) ⇒ Object
-
#initialize(replicator) ⇒ Backfiller
constructor
A new instance of Backfiller.
Methods inherited from Backfiller
#_fetch_backfill_page_with_retry, #backfill, do_retry_wait, #max_backfill_retry_attempts, #wait_for_retry_attempt
Constructor Details
#initialize(replicator) ⇒ Backfiller
Returns a new instance of Backfiller.
235 236 237 238 239 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 235 def initialize(replicator) super() @replicator = replicator @signalwire_sint = replicator.service_integration.depends_on end |
Instance Method Details
#_sync_front_inbound(sender:, texted_at:, item:, body:) ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 289 def _sync_front_inbound(sender:, texted_at:, item:, body:) body = { sender: {handle: sender}, body:, delivered_at: texted_at.to_i, metadata: { external_id: item.fetch(:external_id), external_conversation_id: item.fetch(:external_conversation_id), }, } token = JWT.encode( { iss: Webhookdb::Front.signalwire_channel_app_id, jti: Webhookdb::Front.channel_jwt_jti, sub: @replicator.front_channel_id, exp: 10.seconds.from_now.to_i, }, Webhookdb::Front.signalwire_channel_app_secret, ) resp = Webhookdb::Http.post( "https://api2.frontapp.com/channels/#{@replicator.front_channel_id}/inbound_messages", body, headers: {"Authorization" => "Bearer #{token}"}, timeout: Webhookdb::Front.http_timeout, logger: @replicator.logger, ) resp.parsed_response end |
#fetch_backfill_page ⇒ Object
318 319 320 321 322 323 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 318 def fetch_backfill_page(*) rows = @replicator.admin_dataset do |ds| ds.where(Sequel[signalwire_sid: nil] | Sequel[front_message_id: nil]).all end return rows, nil end |
#handle_item(item) ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/webhookdb/replicator/front_signalwire_message_channel_app_v1.rb', line 241 def handle_item(item) front_id = item.fetch(:front_message_id) sw_id = item.fetch(:signalwire_sid) if (front_id && sw_id) || (!front_id && !sw_id) msg = "row should have a front id OR signalwire id, should not have been inserted, or selected: #{item}" raise Webhookdb::InvariantViolation, msg end sender = @replicator.format_phone(item.fetch(:sender)) recipient = @replicator.format_phone(item.fetch(:recipient)) body = item.fetch(:body) idempotency_key = "fsmca-fims-#{item.fetch(:external_id)}" idempotency = Webhookdb::Idempotency.once_ever.stored.using_seperate_connection.under_key(idempotency_key) if front_id.nil? texted_at = Time.parse(item.fetch(:data).fetch("date_created")) if texted_at < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago # Do not sync old rows, just mark them synced item[:front_message_id] = "skipped_due_to_age" else # sync the message into Front front_response_body = idempotency.execute do self._sync_front_inbound(sender:, texted_at:, item:, body:) end item[:front_message_id] = front_response_body.fetch("message_uid") end else = Time.at(item.fetch(:data).fetch("payload").fetch("created_at")) if < Webhookdb::Front.channel_sync_refreshness_cutoff.seconds.ago # Do not sync old rows, just mark them synced item[:signalwire_sid] = "skipped_due_to_age" else # send the SMS via signalwire signalwire_resp = idempotency.execute do Webhookdb::Signalwire.send_sms( from: sender, to: recipient, body:, space_url: @signalwire_sint.api_url, project_id: @signalwire_sint.backfill_key, api_key: @signalwire_sint.backfill_secret, logger: @replicator.logger, ) end item[:signalwire_sid] = signalwire_resp.fetch("sid") end end @replicator.upsert_webhook_body(item.deep_stringify_keys) end |