Module: Amigo

Defined in:
lib/amigo.rb,
lib/amigo/job.rb,
lib/amigo/retry.rb,
lib/amigo/router.rb,
lib/amigo/version.rb,
lib/amigo/autoscaler.rb,
lib/amigo/audit_logger.rb,
lib/amigo/spec_helpers.rb,
lib/amigo/scheduled_job.rb,
lib/amigo/deprecated_jobs.rb,
lib/amigo/memory_pressure.rb,
lib/amigo/threading_event.rb,
lib/amigo/queue_backoff_job.rb,
lib/amigo/semaphore_backoff_job.rb,
lib/amigo/autoscaler/handlers/log.rb,
lib/amigo/autoscaler/checkers/fake.rb,
lib/amigo/autoscaler/handlers/fake.rb,
lib/amigo/autoscaler/checkers/chain.rb,
lib/amigo/autoscaler/handlers/chain.rb,
lib/amigo/autoscaler/handlers/heroku.rb,
lib/amigo/autoscaler/handlers/sentry.rb,
lib/amigo/rate_limited_error_handler.rb,
lib/amigo/autoscaler/checkers/sidekiq.rb,
lib/amigo/autoscaler/checkers/web_latency.rb,
lib/amigo/autoscaler/checkers/puma_pool_usage.rb

Overview

Wrap another Sidekiq error handler so invoking it is rate limited.

Useful when wrapping a usage-based error reporter like Sentry, which can be hammered in the case of an issue like connectivity that causes all jobs and retries to fail. It is suggested that all errors are still reported to something like application logs, since entirely silencing errors can make debugging problems tricky.

Usage:

Sidekiq.configure_server do |config|
  config.error_handlers << Amigo::RateLimitedErrorHandler.new(
    Sentry::Sidekiq::ErrorHandler.new,
    sample_rate: ENV.fetch('ASYNC_ERROR_RATE_LIMITER_SAMPLE_RATE', '0.5').to_f,
    ttl: ENV.fetch('ASYNC_ERROR_RATE_LIMITER_TTL', '120').to_f,
  )
end

See notes about sample_rate and ttl, and fingerprint for how exceptions are fingerprinted for uniqueness.

Rate limiting is done in-memory so is unique across the entire process- threads/workers share rate limiting, but multiple processes do not. So if 2 processes have 10 threads each, the error handler would be invoked twice if they all error for the same reason.

Thread-based limiting (20 errors in the case above) or cross-process limiting (1 error in the case above) can be added in the future.

Defined Under Namespace

Modules: DeprecatedJobs, Job, QueueBackoffJob, Retry, ScheduledJob, SemaphoreBackoffJob, SpecHelpers Classes: AuditLogger, Autoscaler, Error, Event, MemoryPressure, RateLimitedErrorHandler, Router, StartSchedulerFailed, ThreadingEvent

Constant Summary collapse

VERSION =
"1.14.0"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.audit_logger_classObject

Returns the value of attribute audit_logger_class.



108
109
110
# File 'lib/amigo.rb', line 108

def audit_logger_class
  @audit_logger_class
end

.log_callbackObject

Proc called with [job, level, message, params]. By default, logs to the job’s logger (or Sidekiq’s if job is nil). If structured_logging is true, the message will be an ‘event’ string (like ‘registered_subscriber’) without any dynamic info. If structured_logging is false, the params will be rendered into the message so are suitable for unstructured logging. Also, the params will also have an :log_message key which will contain the original log message.



117
118
119
# File 'lib/amigo.rb', line 117

def log_callback
  @log_callback
end

.on_publish_errorObject

A single callback to be run when an event publication errors, almost always due to an error in a subscriber.

The callback receives the exception, the event being published, and the erroring subscriber.

If this is not set, errors from subscribers will be re-raised immediately, since broken subscribers usually indicate a broken application.

Note also that when an error occurs, Amigo.log is always called first. You do NOT need a callback that just logs and swallows the error. If all you want to do is log, and not propogate the error, you can use ‘Amigo.on_publish_error = proc {}`.



157
158
159
# File 'lib/amigo.rb', line 157

def on_publish_error
  @on_publish_error
end

.registered_jobsObject

Every subclass of Amigo::Job and Amigo::ScheduledJob goes here. It is used for routing and testing isolated jobs.



140
141
142
# File 'lib/amigo.rb', line 140

def registered_jobs
  @registered_jobs
end

.router_classObject

Returns the value of attribute router_class.



108
109
110
# File 'lib/amigo.rb', line 108

def router_class
  @router_class
end

.structured_loggingObject

Returns the value of attribute structured_logging.



108
109
110
# File 'lib/amigo.rb', line 108

def structured_logging
  @structured_logging
end

.subscribersObject

An Array of callbacks to be run when an event is published.



143
144
145
# File 'lib/amigo.rb', line 143

def subscribers
  @subscribers
end

.synchronous_modeObject

If true, perform event work synchronously rather than asynchronously. Only useful for testing.



136
137
138
# File 'lib/amigo.rb', line 136

def synchronous_mode
  @synchronous_mode
end

Class Method Details

._subscriber(event) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/amigo.rb', line 223

def _subscriber(event)
  event_json = event.as_json
  begin
    self.audit_logger_class.perform_async(event_json)
  rescue StandardError => e
    # If the audit logger cannot perform, let's say because Redis is down,
    # we can run the job manually. This is pretty important for anything used for auditing;
    # it should be as resilient as possible.
    self.log(nil, :error, "amigo_audit_log_subscriber_error", error: e, event: event_json)
    self.audit_logger_class.new.perform(event_json)
  end
  self.router_class.perform_async(event_json)
end

.install_amigo_jobsObject

Install Amigo so that every publish will be sent to the AuditLogger job and will invoke the relevant jobs in registered_jobs via the Router job.



217
218
219
220
221
# File 'lib/amigo.rb', line 217

def install_amigo_jobs
  return self.register_subscriber do |ev|
    self._subscriber(ev)
  end
end

.log(job, level, message, params) ⇒ Object



124
125
126
127
128
129
130
131
132
# File 'lib/amigo.rb', line 124

def log(job, level, message, params)
  params ||= {}
  if !self.structured_logging && !params.empty?
    paramstr = params.map { |k, v| "#{k}=#{v}" }.join(" ")
    params[:log_message] = message
    message = "#{message} #{paramstr}"
  end
  self.log_callback[job, level, message, params]
end

.publish(eventname, *payload) ⇒ Object

Publish an event with the specified eventname and payload to any configured publishers.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/amigo.rb', line 161

def publish(eventname, *payload)
  ev = Event.new(SecureRandom.uuid, eventname, payload)

  self.subscribers.to_a.each do |hook|
    hook.call(ev)
  rescue StandardError => e
    self.log(
      nil,
      :error,
      "amigo_subscriber_hook_error",
      error: e, hook: _block_repr(hook), event: ev&.as_json,
    )
    raise e if self.on_publish_error.nil?
    if self.on_publish_error.respond_to?(:arity) && self.on_publish_error.arity == 1
      self.on_publish_error.call(e)
    else
      self.on_publish_error.call(e, ev, hook)
    end
  end
end

.register_job(job) ⇒ Object



237
238
239
240
# File 'lib/amigo.rb', line 237

def register_job(job)
  self.registered_jobs << job
  self.registered_jobs.uniq!
end

.register_subscriber(&block) ⇒ Object

Register a hook to be called when an event is sent. If a subscriber errors, on_publish_error is called with the exception, event, and subscriber.

Raises:

  • (LocalJumpError)


184
185
186
187
188
189
# File 'lib/amigo.rb', line 184

def register_subscriber(&block)
  raise LocalJumpError, "no block given" unless block
  self.log nil, :info, "amigo_installed_subscriber", block: _block_repr(block)
  self.subscribers << block
  return block
end

.registered_event_jobsObject

Return an array of all Job subclasses that respond to event publishing (have patterns).



203
204
205
# File 'lib/amigo.rb', line 203

def registered_event_jobs
  return self.registered_jobs.select(&:event_job?)
end

.registered_scheduled_jobsObject

Return an array of all Job subclasses that are scheduled (have intervals).



208
209
210
# File 'lib/amigo.rb', line 208

def registered_scheduled_jobs
  return self.registered_jobs.select(&:scheduled_job?)
end

.reset_loggingObject



119
120
121
122
# File 'lib/amigo.rb', line 119

def reset_logging
  self.log_callback = ->(job, level, msg, _params) { (job || Sidekiq).logger.send(level, msg) }
  self.structured_logging = false
end

.start_scheduler(load_from_hash = Sidekiq::Cron::Job.method(:load_from_hash)) ⇒ Object

Start the scheduler. This should generally be run in the Sidekiq worker process, not a webserver process.



245
246
247
248
249
250
251
252
253
254
255
# File 'lib/amigo.rb', line 245

def start_scheduler(load_from_hash=Sidekiq::Cron::Job.method(:load_from_hash))
  hash = self.registered_scheduled_jobs.each_with_object({}) do |job, memo|
    self.log(nil, :info, "scheduling_job_cron", {job_name: job.name, job_cron: job.cron_expr})
    memo[job.name] = {
      "class" => job.name,
      "cron" => job.cron_expr,
    }
  end
  load_errs = load_from_hash.call(hash) || {}
  raise StartSchedulerFailed, "Errors loading sidekiq-cron jobs: %p" % [load_errs] unless load_errs.empty?
end

.unregister_subscriber(block_ref) ⇒ Object



198
199
200
# File 'lib/amigo.rb', line 198

def unregister_subscriber(block_ref)
  self.subscribers.delete(block_ref)
end