Class: Webhookdb::WebhookSubscription
- Inherits:
-
Object
- Object
- Webhookdb::WebhookSubscription
- Defined in:
- lib/webhookdb/webhook_subscription.rb
Overview
Webhook subscriptions have a few parts:
-
The WebhookSubscription itself (this model), which represents a user’s desire to receive all webhooks at a URL.
-
The individual Delivery, which is a single ‘rowupsert’ event being delivered to a subscription. That is, if multiple rowupserts are done, there will be multiple Deliveries to a single Subscription, one for each rowupsert. Likewise, if a single rowupsert is done, but there are multiple Subscriptions, there will be multiple Deliveries, one to each Subscription.
-
Async job that listens for rowupsert events and enqueues new deliveries.
-
When a delivery is ‘enqueued’, it is created in the database, and then a sidekiq job is put into Redis. This sidekiq job operates OUTSIDE of our normal job system since we do not want to bother with audit logging or routing (enough history is in the DB already, though we could add it if needed).
-
We attempt the delivery until it succeeds, or we run out of attempts. See #attempt_delivery.
Defined Under Namespace
Classes: Delivery
Constant Summary collapse
- TIMEOUT =
Amount of time we wait for a response from the server.
10.seconds
- MAX_DELIVERY_ATTEMPTS =
An individual will be delivered this many times before giving up.
25
Class Method Summary collapse
Instance Method Summary collapse
- #_fatal(d, e) ⇒ Object
- #_retry(delivery, attempt) ⇒ Object
- #active? ⇒ Boolean
- #associated_id ⇒ Object
- #associated_type ⇒ Object
-
#attempt_delivery(d) ⇒ Object
Attempt to deliver the payload in
d
to the configured URL (see #deliver). -
#before_create ⇒ Object
:Sequel Hooks:.
- #create_delivery(payload) ⇒ Object
- #deactivate(at: Time.now) ⇒ Object
- #deactivated? ⇒ Boolean
-
#deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {}) ⇒ Object
Deliver the webhook payload to the configured URL.
- #deliver_test_event(external_id: SecureRandom.hex(6)) ⇒ Object
-
#enqueue_delivery(payload) ⇒ Object
Create a new Delivery and enqueue it for async processing.
- #fetch_organization ⇒ Object
- #status ⇒ Object
Class Method Details
.backoff_for_attempt(attempt) ⇒ Object
159 160 161 162 163 164 |
# File 'lib/webhookdb/webhook_subscription.rb', line 159 def self.backoff_for_attempt(attempt) return 1 if attempt <= 1 return attempt * 2 if attempt <= 10 return attempt * 3 if attempt <= 20 return attempt * 4 end |
Instance Method Details
#_fatal(d, e) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/webhookdb/webhook_subscription.rb', line 166 def _fatal(d, e) Webhookdb::DeveloperAlert.new( subsystem: "Webhook Subscriptions", emoji: ":hook:", fallback: "Error delivering WebhookSubscription::Delivery[id: #{d.id}, subscription_id: #{self.id}]: #{e}", fields: [ {title: "Org", value: self.fetch_organization.display_string, short: true}, {title: "Creator", value: self.created_by&.email, short: true}, {title: "Delivery", value: "#{d.id}, Subscription: #{self.id}, Attempts: #{d.attempt_count}"}, {title: "URL", value: self.deliver_to_url, short: false}, {title: "Exception", value: e.inspect, short: false}, ], ).emit end |
#_retry(delivery, attempt) ⇒ Object
154 155 156 157 |
# File 'lib/webhookdb/webhook_subscription.rb', line 154 def _retry(delivery, attempt) delay = self.class.backoff_for_attempt(attempt) Webhookdb::Jobs::WebhookSubscriptionDeliveryEvent.perform_in(delay, delivery.id) end |
#active? ⇒ Boolean
49 50 51 |
# File 'lib/webhookdb/webhook_subscription.rb', line 49 def active? return !self.deactivated? end |
#associated_id ⇒ Object
187 188 189 190 191 |
# File 'lib/webhookdb/webhook_subscription.rb', line 187 def associated_id return self.organization.key unless self.organization_id.nil? return self.service_integration.opaque_id unless self.service_integration_id.nil? return "" end |
#associated_type ⇒ Object
181 182 183 184 185 |
# File 'lib/webhookdb/webhook_subscription.rb', line 181 def associated_type return "organization" unless self.organization_id.nil? return "service_integration" unless self.service_integration_id.nil? return "" end |
#attempt_delivery(d) ⇒ Object
Attempt to deliver the payload in d
to the configured URL (see #deliver). Noops if the subscription is deactivated.
If the attempt succeeds, no attempts are enqueued.
If the attempt fails, another async job to reattempt delivery will be enqueued for some time in the future based on the number of attempts. The timestamp and http status are stored on the delivery for future analysis.
After too many failures, no more attempts will be enqueued. Instead, a developer alert is emitted.
In the future, we will support manually re-attempting delivery (success of which should clear deactivated subscriptions), and automatic deactivation (after some criteria of abandonment has been met).
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/webhookdb/webhook_subscription.rb', line 127 def attempt_delivery(d) return if self.deactivated? d.db.transaction do d.lock! attempt = d.attempt_count + 1 begin r = self.deliver(**d.payload.symbolize_keys, headers: {"Whdb-Attempt" => attempt.to_s}) d.add_attempt(status: r.code) rescue StandardError => e self.logger.error( "webhook_subscription_delivery_failure", error: e, webhook_subscription_id: self.id, webhook_subscription_delivery_id: d.id, ) d.add_attempt(status: e.is_a?(Webhookdb::Http::Error) ? e.status : 0) if attempt < MAX_DELIVERY_ATTEMPTS self._retry(d, attempt) else self._fatal(d, e) end ensure d.save_changes end end end |
#before_create ⇒ Object
:Sequel Hooks:
197 198 199 |
# File 'lib/webhookdb/webhook_subscription.rb', line 197 def before_create self[:opaque_id] ||= Webhookdb::Id.new_opaque_id("wsb") end |
#create_delivery(payload) ⇒ Object
101 102 103 |
# File 'lib/webhookdb/webhook_subscription.rb', line 101 def create_delivery(payload) return Webhookdb::WebhookSubscription::Delivery.create(webhook_subscription: self, payload:) end |
#deactivate(at: Time.now) ⇒ Object
57 58 59 60 |
# File 'lib/webhookdb/webhook_subscription.rb', line 57 def deactivate(at: Time.now) self.deactivated_at = at return self end |
#deactivated? ⇒ Boolean
53 54 55 |
# File 'lib/webhookdb/webhook_subscription.rb', line 53 def deactivated? return !!self.deactivated_at end |
#deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {}) ⇒ Object
Deliver the webhook payload to the configured URL. This does NOT create or deal with WebhookSubscription::Delivery; it is for the actual delivering.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/webhookdb/webhook_subscription.rb', line 73 def deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {}) body = { service_name:, table_name:, row:, external_id:, external_id_column:, } return Webhookdb::Http.post( self.deliver_to_url, body, headers: {"Whdb-Webhook-Secret" => self.webhook_secret}.merge(headers), timeout: TIMEOUT, logger: self.logger, ) end |
#deliver_test_event(external_id: SecureRandom.hex(6)) ⇒ Object
90 91 92 93 94 95 96 97 98 99 |
# File 'lib/webhookdb/webhook_subscription.rb', line 90 def deliver_test_event(external_id: SecureRandom.hex(6)) return self.deliver( service_name: "test service", table_name: "test_table_name", external_id:, external_id_column: "external_id", row: {data: ["alpha", "beta", "charlie", "delta"]}, headers: {"Whdb-Test-Event" => "1"}, ) end |
#enqueue_delivery(payload) ⇒ Object
Create a new Delivery and enqueue it for async processing.
106 107 108 109 110 |
# File 'lib/webhookdb/webhook_subscription.rb', line 106 def enqueue_delivery(payload) delivery = self.create_delivery(payload) Webhookdb::Jobs::WebhookSubscriptionDeliveryEvent.perform_async(delivery.id) return delivery end |
#fetch_organization ⇒ Object
62 63 64 |
# File 'lib/webhookdb/webhook_subscription.rb', line 62 def fetch_organization return self.organization || self.service_integration.organization end |
#status ⇒ Object
66 67 68 |
# File 'lib/webhookdb/webhook_subscription.rb', line 66 def status return self.deactivated? ? "deactivated" : "active" end |