Class: Webhookdb::WebhookSubscription

Inherits:
Object
  • Object
show all
Defined in:
lib/webhookdb/webhook_subscription.rb

Overview

Webhook subscriptions have a few parts:

  • The WebhookSubscription itself (this model), which represents a user’s desire to receive all webhooks at a URL.

  • The individual Delivery, which is a single ‘rowupsert’ event being delivered to a subscription. That is, if multiple rowupserts are done, there will be multiple Deliveries to a single Subscription, one for each rowupsert. Likewise, if a single rowupsert is done, but there are multiple Subscriptions, there will be multiple Deliveries, one to each Subscription.

  • Async job that listens for rowupsert events and enqueues new deliveries.

  • When a delivery is ‘enqueued’, it is created in the database, and then a sidekiq job is put into Redis. This sidekiq job operates OUTSIDE of our normal job system since we do not want to bother with audit logging or routing (enough history is in the DB already, though we could add it if needed).

  • We attempt the delivery until it succeeds, or we run out of attempts. See #attempt_delivery.

Defined Under Namespace

Classes: Delivery

Constant Summary collapse

TIMEOUT =

Amount of time we wait for a response from the server.

10.seconds
MAX_DELIVERY_ATTEMPTS =

An individual will be delivered this many times before giving up.

25

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.backoff_for_attempt(attempt) ⇒ Object



159
160
161
162
163
164
# File 'lib/webhookdb/webhook_subscription.rb', line 159

def self.backoff_for_attempt(attempt)
  return 1 if attempt <= 1
  return attempt * 2 if attempt <= 10
  return attempt * 3 if attempt <= 20
  return attempt * 4
end

Instance Method Details

#_fatal(d, e) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/webhookdb/webhook_subscription.rb', line 166

def _fatal(d, e)
  Webhookdb::DeveloperAlert.new(
    subsystem: "Webhook Subscriptions",
    emoji: ":hook:",
    fallback: "Error delivering WebhookSubscription::Delivery[id: #{d.id}, subscription_id: #{self.id}]: #{e}",
    fields: [
      {title: "Org", value: self.fetch_organization.display_string, short: true},
      {title: "Creator", value: self.created_by&.email, short: true},
      {title: "Delivery", value: "#{d.id}, Subscription: #{self.id}, Attempts: #{d.attempt_count}"},
      {title: "URL", value: self.deliver_to_url, short: false},
      {title: "Exception", value: e.inspect, short: false},
    ],
  ).emit
end

#_retry(delivery, attempt) ⇒ Object



154
155
156
157
# File 'lib/webhookdb/webhook_subscription.rb', line 154

def _retry(delivery, attempt)
  delay = self.class.backoff_for_attempt(attempt)
  Webhookdb::Jobs::WebhookSubscriptionDeliveryEvent.perform_in(delay, delivery.id)
end

#active?Boolean

Returns:

  • (Boolean)


49
50
51
# File 'lib/webhookdb/webhook_subscription.rb', line 49

def active?
  return !self.deactivated?
end

#associated_idObject



187
188
189
190
191
# File 'lib/webhookdb/webhook_subscription.rb', line 187

def associated_id
  return self.organization.key unless self.organization_id.nil?
  return self.service_integration.opaque_id unless self.service_integration_id.nil?
  return ""
end

#associated_typeObject



181
182
183
184
185
# File 'lib/webhookdb/webhook_subscription.rb', line 181

def associated_type
  return "organization" unless self.organization_id.nil?
  return "service_integration" unless self.service_integration_id.nil?
  return ""
end

#attempt_delivery(d) ⇒ Object

Attempt to deliver the payload in d to the configured URL (see #deliver). Noops if the subscription is deactivated.

If the attempt succeeds, no attempts are enqueued.

If the attempt fails, another async job to reattempt delivery will be enqueued for some time in the future based on the number of attempts. The timestamp and http status are stored on the delivery for future analysis.

After too many failures, no more attempts will be enqueued. Instead, a developer alert is emitted.

In the future, we will support manually re-attempting delivery (success of which should clear deactivated subscriptions), and automatic deactivation (after some criteria of abandonment has been met).



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/webhookdb/webhook_subscription.rb', line 127

def attempt_delivery(d)
  return if self.deactivated?
  d.db.transaction do
    d.lock!
    attempt = d.attempt_count + 1
    begin
      r = self.deliver(**d.payload.symbolize_keys, headers: {"Whdb-Attempt" => attempt.to_s})
      d.add_attempt(status: r.code)
    rescue StandardError => e
      self.logger.error(
        "webhook_subscription_delivery_failure",
        error: e,
        webhook_subscription_id: self.id,
        webhook_subscription_delivery_id: d.id,
      )
      d.add_attempt(status: e.is_a?(Webhookdb::Http::Error) ? e.status : 0)
      if attempt < MAX_DELIVERY_ATTEMPTS
        self._retry(d, attempt)
      else
        self._fatal(d, e)
      end
    ensure
      d.save_changes
    end
  end
end

#before_createObject

:Sequel Hooks:



197
198
199
# File 'lib/webhookdb/webhook_subscription.rb', line 197

def before_create
  self[:opaque_id] ||= Webhookdb::Id.new_opaque_id("wsb")
end

#create_delivery(payload) ⇒ Object



101
102
103
# File 'lib/webhookdb/webhook_subscription.rb', line 101

def create_delivery(payload)
  return Webhookdb::WebhookSubscription::Delivery.create(webhook_subscription: self, payload:)
end

#deactivate(at: Time.now) ⇒ Object



57
58
59
60
# File 'lib/webhookdb/webhook_subscription.rb', line 57

def deactivate(at: Time.now)
  self.deactivated_at = at
  return self
end

#deactivated?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/webhookdb/webhook_subscription.rb', line 53

def deactivated?
  return !!self.deactivated_at
end

#deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {}) ⇒ Object

Deliver the webhook payload to the configured URL. This does NOT create or deal with WebhookSubscription::Delivery; it is for the actual delivering.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/webhookdb/webhook_subscription.rb', line 73

def deliver(service_name:, table_name:, row:, external_id:, external_id_column:, headers: {})
  body = {
    service_name:,
    table_name:,
    row:,
    external_id:,
    external_id_column:,
  }
  return Webhookdb::Http.post(
    self.deliver_to_url,
    body,
    headers: {"Whdb-Webhook-Secret" => self.webhook_secret}.merge(headers),
    timeout: TIMEOUT,
    logger: self.logger,
  )
end

#deliver_test_event(external_id: SecureRandom.hex(6)) ⇒ Object



90
91
92
93
94
95
96
97
98
99
# File 'lib/webhookdb/webhook_subscription.rb', line 90

def deliver_test_event(external_id: SecureRandom.hex(6))
  return self.deliver(
    service_name: "test service",
    table_name: "test_table_name",
    external_id:,
    external_id_column: "external_id",
    row: {data: ["alpha", "beta", "charlie", "delta"]},
    headers: {"Whdb-Test-Event" => "1"},
  )
end

#enqueue_delivery(payload) ⇒ Object

Create a new Delivery and enqueue it for async processing.



106
107
108
109
110
# File 'lib/webhookdb/webhook_subscription.rb', line 106

def enqueue_delivery(payload)
  delivery = self.create_delivery(payload)
  Webhookdb::Jobs::WebhookSubscriptionDeliveryEvent.perform_async(delivery.id)
  return delivery
end

#fetch_organizationObject



62
63
64
# File 'lib/webhookdb/webhook_subscription.rb', line 62

def fetch_organization
  return self.organization || self.service_integration.organization
end

#statusObject



66
67
68
# File 'lib/webhookdb/webhook_subscription.rb', line 66

def status
  return self.deactivated? ? "deactivated" : "active"
end