Module: Amigo

Defined in:
lib/amigo.rb,
lib/amigo/job.rb,
lib/amigo/retry.rb,
lib/amigo/router.rb,
lib/amigo/version.rb,
lib/amigo/autoscaler.rb,
lib/amigo/audit_logger.rb,
lib/amigo/spec_helpers.rb,
lib/amigo/scheduled_job.rb,
lib/amigo/deprecated_jobs.rb,
lib/amigo/autoscaler/heroku.rb,
lib/amigo/queue_backoff_job.rb,
lib/amigo/semaphore_backoff_job.rb,
lib/amigo/semaphore_backoff_job.rb,
lib/amigo/rate_limited_error_handler.rb

Overview

Wrap another Sidekiq error handler so invoking it is rate limited.

Useful when wrapping a usage-based error reporter like Sentry, which can be hammered in the case of an issue like connectivity that causes all jobs and retries to fail. It is suggested that all errors are still reported to something like application logs, since entirely silencing errors can make debugging problems tricky.

Usage:

Sidekiq.configure_server do |config|
  config.error_handlers << Amigo::RateLimitedErrorHandler.new(
    Sentry::Sidekiq::ErrorHandler.new,
    sample_rate: ENV.fetch('ASYNC_ERROR_RATE_LIMITER_SAMPLE_RATE', '0.5').to_f,
    ttl: ENV.fetch('ASYNC_ERROR_RATE_LIMITER_TTL', '120').to_f,
  )
end

See notes about sample_rate and ttl, and fingerprint for how exceptions are fingerprinted for uniqueness.

Rate limiting is done in-memory so is unique across the entire process- threads/workers share rate limiting, but multiple processes do not. So if 2 processes have 10 threads each, the error handler would be invoked twice if they all error for the same reason.

Thread-based limiting (20 errors in the case above) or cross-process limiting (1 error in the case above) can be added in the future.

Defined Under Namespace

Modules: DeprecatedJobs, Job, QueueBackoffJob, Retry, ScheduledJob, SemaphoreBackoffJob, SpecHelpers Classes: AuditLogger, Autoscaler, Error, Event, RateLimitedErrorHandler, Router, StartSchedulerFailed

Constant Summary collapse

VERSION =
"1.6.0"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.audit_logger_classObject

Returns the value of attribute audit_logger_class.



109
110
111
# File 'lib/amigo.rb', line 109

def audit_logger_class
  @audit_logger_class
end

.log_callbackObject

Proc called with [job, level, message, params]. By default, logs to the job’s logger (or Sidekiq’s if job is nil). If structured_logging is true, the message will be an ‘event’ string (like ‘registered_subscriber’) without any dynamic info. If structured_logging is false, the params will be rendered into the message so are suitable for unstructured logging. Also, the params will also have an :log_message key which will contain the original log message.



118
119
120
# File 'lib/amigo.rb', line 118

def log_callback
  @log_callback
end

.on_publish_errorObject

A single callback to be run when an event publication errors, almost always due to an error in a subscriber.

The callback receives the exception, the event being published, and the erroring subscriber.

If this is not set, errors from subscribers will be re-raised immediately, since broken subscribers usually indicate a broken application.

Note also that when an error occurs, Amigo.log is always called first. You do NOT need a callback that just logs and swallows the error. If all you want to do is log, and not propogate the error, you can use ‘Amigo.on_publish_error = proc {}`.



158
159
160
# File 'lib/amigo.rb', line 158

def on_publish_error
  @on_publish_error
end

.registered_jobsObject

Every subclass of Amigo::Job and Amigo::ScheduledJob goes here. It is used for routing and testing isolated jobs.



141
142
143
# File 'lib/amigo.rb', line 141

def registered_jobs
  @registered_jobs
end

.router_classObject

Returns the value of attribute router_class.



109
110
111
# File 'lib/amigo.rb', line 109

def router_class
  @router_class
end

.structured_loggingObject

Returns the value of attribute structured_logging.



109
110
111
# File 'lib/amigo.rb', line 109

def structured_logging
  @structured_logging
end

.subscribersObject

An Array of callbacks to be run when an event is published.



144
145
146
# File 'lib/amigo.rb', line 144

def subscribers
  @subscribers
end

.synchronous_modeObject

If true, perform event work synchronously rather than asynchronously. Only useful for testing.



137
138
139
# File 'lib/amigo.rb', line 137

def synchronous_mode
  @synchronous_mode
end

Class Method Details

._subscriber(event) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/amigo.rb', line 212

def _subscriber(event)
  event_json = event.as_json
  begin
    self.audit_logger_class.perform_async(event_json)
  rescue StandardError => e
    # If the audit logger cannot perform, let's say because Redis is down,
    # we can run the job manually. This is pretty important for anything used for auditing;
    # it should be as resilient as possible.
    self.log(nil, :error, "amigo_audit_log_subscriber_error", error: e, event: event_json)
    self.audit_logger_class.new.perform(event_json)
  end
  self.router_class.perform_async(event_json)
end

.install_amigo_jobsObject

Install Amigo so that every publish will be sent to the AuditLogger job and will invoke the relevant jobs in registered_jobs via the Router job.



206
207
208
209
210
# File 'lib/amigo.rb', line 206

def install_amigo_jobs
  return self.register_subscriber do |ev|
    self._subscriber(ev)
  end
end

.log(job, level, message, params) ⇒ Object



125
126
127
128
129
130
131
132
133
# File 'lib/amigo.rb', line 125

def log(job, level, message, params)
  params ||= {}
  if !self.structured_logging && !params.empty?
    paramstr = params.map { |k, v| "#{k}=#{v}" }.join(" ")
    params[:log_message] = message
    message = "#{message} #{paramstr}"
  end
  self.log_callback[job, level, message, params]
end

.publish(eventname, *payload) ⇒ Object

Publish an event with the specified eventname and payload to any configured publishers.



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/amigo.rb', line 162

def publish(eventname, *payload)
  ev = Event.new(SecureRandom.uuid, eventname, payload)

  self.subscribers.to_a.each do |hook|
    hook.call(ev)
  rescue StandardError => e
    self.log(nil, :error, "amigo_subscriber_hook_error", error: e, hook: hook, event: ev&.as_json)
    raise e if self.on_publish_error.nil?
    if self.on_publish_error.respond_to?(:arity) && self.on_publish_error.arity == 1
      self.on_publish_error.call(e)
    else
      self.on_publish_error.call(e, ev, hook)
    end
  end
end

.register_job(job) ⇒ Object



226
227
228
229
# File 'lib/amigo.rb', line 226

def register_job(job)
  self.registered_jobs << job
  self.registered_jobs.uniq!
end

.register_subscriber(&block) ⇒ Object

Register a hook to be called when an event is sent. If a subscriber errors, on_publish_error is called with the exception, event, and subscriber.

Raises:

  • (LocalJumpError)


180
181
182
183
184
185
# File 'lib/amigo.rb', line 180

def register_subscriber(&block)
  raise LocalJumpError, "no block given" unless block
  self.log nil, :info, "amigo_installed_subscriber", block: block
  self.subscribers << block
  return block
end

.registered_event_jobsObject

Return an array of all Job subclasses that respond to event publishing (have patterns).



192
193
194
# File 'lib/amigo.rb', line 192

def registered_event_jobs
  return self.registered_jobs.select(&:event_job?)
end

.registered_scheduled_jobsObject

Return an array of all Job subclasses that are scheduled (have intervals).



197
198
199
# File 'lib/amigo.rb', line 197

def registered_scheduled_jobs
  return self.registered_jobs.select(&:scheduled_job?)
end

.reset_loggingObject



120
121
122
123
# File 'lib/amigo.rb', line 120

def reset_logging
  self.log_callback = ->(job, level, msg, _params) { (job || Sidekiq).logger.send(level, msg) }
  self.structured_logging = false
end

.start_scheduler(load_from_hash = Sidekiq::Cron::Job.method(:load_from_hash)) ⇒ Object

Start the scheduler. This should generally be run in the Sidekiq worker process, not a webserver process.



234
235
236
237
238
239
240
241
242
243
244
# File 'lib/amigo.rb', line 234

def start_scheduler(load_from_hash=Sidekiq::Cron::Job.method(:load_from_hash))
  hash = self.registered_scheduled_jobs.each_with_object({}) do |job, memo|
    self.log(nil, :info, "scheduling_job_cron", {job_name: job.name, job_cron: job.cron_expr})
    memo[job.name] = {
      "class" => job.name,
      "cron" => job.cron_expr,
    }
  end
  load_errs = load_from_hash.call(hash) || {}
  raise StartSchedulerFailed, "Errors loading sidekiq-cron jobs: %p" % [load_errs] unless load_errs.empty?
end

.unregister_subscriber(block_ref) ⇒ Object



187
188
189
# File 'lib/amigo.rb', line 187

def unregister_subscriber(block_ref)
  self.subscribers.delete(block_ref)
end