Class: Hookshot

Inherits:
Object
  • Object
show all
Defined in:
lib/hookshot.rb,
lib/hookshot/version.rb

Constant Summary collapse

PREFIX =
"hookshot"
NEW_JOBS_LIST =
"#{PREFIX}:jobs"
DELAYED_SET =
"#{PREFIX}:delayed"
FAILURES_LIST =
"#{PREFIX}:failures"
BLACKLIST =
"#{PREFIX}:throttle:blacklist"
WHITELIST =
"#{PREFIX}:throttle:whitelist"
FINAL_FAILURE =

Special sentinel value returned from ‘get_next_failure` when the job will no longer be retried.

-1
JOB_KEY_LIFETIME =

Jobs are retried for up to 48 hours. Though we delete the job info when hookshot is done with each job, it’s still a good idea to clean up any keys that have managed to stick around somehow.

86400 * 4
FailureQueueEmpty =

Raised by ‘get_next_failure` if there are no pending failures.

Class.new(StandardError)
VERSION =
"0.3.2"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis) ⇒ Hookshot

Hookshot expects to be initialized with an instance of ‘Redis` provided by the `redis` rubygem.

Example:

require 'hookshot'
require 'redis'
hookshot = Hookshot.new(Redis.new(port: 6379, host: 'localhost'))


37
38
39
# File 'lib/hookshot.rb', line 37

def initialize(redis)
  @redis = redis
end

Instance Attribute Details

#redisObject (readonly)

Returns the value of attribute redis.



27
28
29
# File 'lib/hookshot.rb', line 27

def redis
  @redis
end

Instance Method Details

#blacklistObject

blacklist returns an array of currently-blacklisted domains.

Example:

hookshot.blacklist
#=> ["example.com"]


203
204
205
# File 'lib/hookshot.rb', line 203

def blacklist
  redis.smembers(BLACKLIST)
end

#blacklist!(domain) ⇒ Object

blacklist! adds a domain to the list of currently blacklisted domains. Any jobs submitted with a domain in this list will be automatically dropped by hookshot.

The format of the domain should be the full domain. The port should not be included if it is ‘80` or `443`. For example:

| URL | Domain | |—————————|———————| | example.com/post | example.com | | example.com/post | example.com | | example.com:8000 | example.com:8000 | | example.com:80 | example.com |

Example:

hookshot.blacklist!("example.com")


172
173
174
# File 'lib/hookshot.rb', line 172

def blacklist!(domain)
  redis.sadd(BLACKLIST, domain)
end

#enqueue(url:, headers:, context:, payload:, activate_at: nil) ⇒ Object

enqueue takes a URL, a hash of headers, a payload (request body), and a ‘context` value. It submits these values to hookshot for processing.

The ‘context` value can be any string, and will be returned to you via `get_next_failure` if the job can’t be successfully completed after about 48 hours of retries.

In Shopify, we pass our ‘WebhookSubscription` object ID for the `context` value, so that we can notify merchants when their webhooks are failing, and delete subscriptions that fail consistently.

‘activate_at` is an optional parameter that specifies a number of seconds to wait before making this job active in hookshot. You should normally call `enqueue_in` rather than pass `activate_at` explicitly.

Example:

hookshot.enqueue(
  url: 'http://localhost:8080/post',
  headers: {"X-My-Header" => "value"},
  context: "42",
  payload: "request body")


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/hookshot.rb', line 63

def enqueue(url:, headers:, context:, payload:, activate_at: nil)
  uuid = SecureRandom.uuid
  redis.pipelined do
    redis.hmset(
      job_key(uuid),
      "url", url,
      "headers", serialize_headers(headers),
      "context", context,
      "payload", payload,
      "failures", 0)
    redis.expire(job_key(uuid), JOB_KEY_LIFETIME)
    if activate_at
      redis.zadd(DELAYED_SET, activate_at, uuid)
    else
      redis.lpush(NEW_JOBS_LIST, uuid)
    end
  end

  uuid
end

#enqueue_in(duration, url:, headers:, context:, payload:) ⇒ Object

enqeueue_in calls enqueue with an ‘activate_at` parameter to delay the job’s execution.

Example:

hookshot.enqueue_in(60, # seconds
  url: 'http://localhost:8080/post',
  headers: {"X-My-Header" => "value"},
  context: "42",
  payload: "request body")


94
95
96
97
98
99
100
101
102
# File 'lib/hookshot.rb', line 94

def enqueue_in(duration, url:, headers:, context:, payload:)
  enqueue_time = (Time.now + duration).to_i
  enqueue(
    url: url,
    headers: headers,
    context: context,
    payload: payload,
    activate_at: enqueue_time)
end

#get_next_failureObject

Jobs that fail many times in a row are returned back to the application. Specifically, the ‘context` value passed in via the `enqueue*` methods is returned.

‘get_next_failure` returns two values: the number of failures so far for this job, and the `context` passed in with the job. If `nfailures` is equal to `Hookshot::FINAL_FAILURE`, the job will not be retried. However, if `nfailures` is any other value, the job will still be retried; this is just an advisory because the job has been failing for at least 24 hours.

This method is non-blocking: if there is no item present in the failures queue, it will raise FailureQueueEmpty.

Example:

loop {
  nfailures, context = hookshot.get_next_failure
  if nfailures == Hookshot::FINAL_FAILURE
    delete_webhook_subscription(context)
  end
}

Raises:



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/hookshot.rb', line 125

def get_next_failure
  # block indefinitely waiting for the next failure.
  line = redis.lpop(FAILURES_LIST)
  raise FailureQueueEmpty if line.nil?

  nfailures, failed_id = line.split('|', 2)

  if nfailures.to_i == 0 || failed_id.empty?
    raise "Invalid line from hookshot: #{line}"
  end

  [nfailures.to_i, failed_id]
end

#queue_statsObject

Hookshot writes a lot of statistics to statsd/datadog, but to quickly check the current queue sizes, use ‘queue_stats`.

Example:

hookshot.queue_stats
# => { pending: 42, delayed: 42, failures: 42 }


146
147
148
149
150
151
152
153
# File 'lib/hookshot.rb', line 146

def queue_stats
  pending, delayed, failures = redis.pipelined do
    redis.llen NEW_JOBS_LIST
    redis.zcard DELAYED_SET
    redis.llen FAILURES_LIST
  end
  { pending: pending, delayed: delayed, failures: failures }
end

#remove_blacklist(domain) ⇒ Object

remove_blacklist removes a currently-blacklisted domain from the blacklist. If the domain was not blacklisted, this method has no effect.

Example:

hookshot.remove_blacklist("google.com")


223
224
225
# File 'lib/hookshot.rb', line 223

def remove_blacklist(domain)
  redis.srem(BLACKLIST, domain)
end

#remove_whitelist(domain) ⇒ Object

remove_whitelist removes a currently-whitelisted domain from the whitelist. If the domain was not whitelisted, this method has no effect.

Example:

hookshot.remove_whitelist("google.com")


233
234
235
# File 'lib/hookshot.rb', line 233

def remove_whitelist(domain)
  redis.srem(WHITELIST, domain)
end

#whitelistObject

whitelist returns an array of currently-whitelisted domains.

Example:

hookshot.whitelist
#=> ["example.com"]


213
214
215
# File 'lib/hookshot.rb', line 213

def whitelist
  redis.smembers(WHITELIST)
end

#whitelist!(domain) ⇒ Object

whitelist! adds a domain to the list of currently whitelisted domains. Normally, jobs are throttled to a maximum requests per second per domain. Whitelisted domains are granted a much higher initial rate.

The format of the domain should be the full domain. The port should not be included if it is ‘80` or `443`. For example:

| URL | Domain | |—————————|———————| | example.com/post | example.com | | example.com/post | example.com | | example.com:8000 | example.com:8000 | | example.com:80 | example.com |

Example:

hookshot.whitelist!("google.com")


193
194
195
# File 'lib/hookshot.rb', line 193

def whitelist!(domain)
  redis.sadd(WHITELIST, domain)
end