Module: Amigo
- Defined in:
- lib/amigo.rb,
lib/amigo/job.rb,
lib/amigo/retry.rb,
lib/amigo/router.rb,
lib/amigo/version.rb,
lib/amigo/autoscaler.rb,
lib/amigo/audit_logger.rb,
lib/amigo/spec_helpers.rb,
lib/amigo/scheduled_job.rb,
lib/amigo/deprecated_jobs.rb,
lib/amigo/memory_pressure.rb,
lib/amigo/threading_event.rb,
lib/amigo/queue_backoff_job.rb,
lib/amigo/semaphore_backoff_job.rb,
lib/amigo/autoscaler/handlers/log.rb,
lib/amigo/autoscaler/checkers/fake.rb,
lib/amigo/autoscaler/handlers/fake.rb,
lib/amigo/autoscaler/checkers/chain.rb,
lib/amigo/autoscaler/handlers/chain.rb,
lib/amigo/autoscaler/handlers/heroku.rb,
lib/amigo/autoscaler/handlers/sentry.rb,
lib/amigo/rate_limited_error_handler.rb,
lib/amigo/autoscaler/checkers/sidekiq.rb,
lib/amigo/autoscaler/checkers/web_latency.rb,
lib/amigo/autoscaler/checkers/puma_pool_usage.rb
Overview
Wrap another Sidekiq error handler so invoking it is rate limited.
Useful when wrapping a usage-based error reporter like Sentry, which can be hammered in the case of an issue like connectivity that causes all jobs and retries to fail. It is suggested that all errors are still reported to something like application logs, since entirely silencing errors can make debugging problems tricky.
Usage:
Sidekiq.configure_server do |config|
config.error_handlers << Amigo::RateLimitedErrorHandler.new(
Sentry::Sidekiq::ErrorHandler.new,
sample_rate: ENV.fetch('ASYNC_ERROR_RATE_LIMITER_SAMPLE_RATE', '0.5').to_f,
ttl: ENV.fetch('ASYNC_ERROR_RATE_LIMITER_TTL', '120').to_f,
)
end
See notes about sample_rate and ttl, and fingerprint for how exceptions are fingerprinted for uniqueness.
Rate limiting is done in-memory so is unique across the entire process- threads/workers share rate limiting, but multiple processes do not. So if 2 processes have 10 threads each, the error handler would be invoked twice if they all error for the same reason.
Thread-based limiting (20 errors in the case above) or cross-process limiting (1 error in the case above) can be added in the future.
Defined Under Namespace
Modules: DeprecatedJobs, Job, QueueBackoffJob, Retry, ScheduledJob, SemaphoreBackoffJob, SpecHelpers Classes: AuditLogger, Autoscaler, Error, Event, MemoryPressure, RateLimitedErrorHandler, Router, StartSchedulerFailed, ThreadingEvent
Constant Summary collapse
- VERSION =
"1.14.0"
Class Attribute Summary collapse
-
.audit_logger_class ⇒ Object
Returns the value of attribute audit_logger_class.
-
.log_callback ⇒ Object
Proc called with [job, level, message, params].
-
.on_publish_error ⇒ Object
A single callback to be run when an event publication errors, almost always due to an error in a subscriber.
-
.registered_jobs ⇒ Object
Every subclass of Amigo::Job and Amigo::ScheduledJob goes here.
-
.router_class ⇒ Object
Returns the value of attribute router_class.
-
.structured_logging ⇒ Object
Returns the value of attribute structured_logging.
-
.subscribers ⇒ Object
An Array of callbacks to be run when an event is published.
-
.synchronous_mode ⇒ Object
If true, perform event work synchronously rather than asynchronously.
Class Method Summary collapse
- ._subscriber(event) ⇒ Object
-
.install_amigo_jobs ⇒ Object
Install Amigo so that every publish will be sent to the AuditLogger job and will invoke the relevant jobs in registered_jobs via the Router job.
- .log(job, level, message, params) ⇒ Object
-
.publish(eventname, *payload) ⇒ Object
Publish an event with the specified
eventnameandpayloadto any configured publishers. - .register_job(job) ⇒ Object
-
.register_subscriber(&block) ⇒ Object
Register a hook to be called when an event is sent.
-
.registered_event_jobs ⇒ Object
Return an array of all Job subclasses that respond to event publishing (have patterns).
-
.registered_scheduled_jobs ⇒ Object
Return an array of all Job subclasses that are scheduled (have intervals).
- .reset_logging ⇒ Object
-
.start_scheduler(load_from_hash = Sidekiq::Cron::Job.method(:load_from_hash)) ⇒ Object
Start the scheduler.
- .unregister_subscriber(block_ref) ⇒ Object
Class Attribute Details
.audit_logger_class ⇒ Object
Returns the value of attribute audit_logger_class.
108 109 110 |
# File 'lib/amigo.rb', line 108 def audit_logger_class @audit_logger_class end |
.log_callback ⇒ Object
Proc called with [job, level, message, params]. By default, logs to the job’s logger (or Sidekiq’s if job is nil). If structured_logging is true, the message will be an ‘event’ string (like ‘registered_subscriber’) without any dynamic info. If structured_logging is false, the params will be rendered into the message so are suitable for unstructured logging. Also, the params will also have an :log_message key which will contain the original log message.
117 118 119 |
# File 'lib/amigo.rb', line 117 def log_callback @log_callback end |
.on_publish_error ⇒ Object
A single callback to be run when an event publication errors, almost always due to an error in a subscriber.
The callback receives the exception, the event being published, and the erroring subscriber.
If this is not set, errors from subscribers will be re-raised immediately, since broken subscribers usually indicate a broken application.
Note also that when an error occurs, Amigo.log is always called first. You do NOT need a callback that just logs and swallows the error. If all you want to do is log, and not propogate the error, you can use ‘Amigo.on_publish_error = proc {}`.
157 158 159 |
# File 'lib/amigo.rb', line 157 def on_publish_error @on_publish_error end |
.registered_jobs ⇒ Object
Every subclass of Amigo::Job and Amigo::ScheduledJob goes here. It is used for routing and testing isolated jobs.
140 141 142 |
# File 'lib/amigo.rb', line 140 def registered_jobs @registered_jobs end |
.router_class ⇒ Object
Returns the value of attribute router_class.
108 109 110 |
# File 'lib/amigo.rb', line 108 def router_class @router_class end |
.structured_logging ⇒ Object
Returns the value of attribute structured_logging.
108 109 110 |
# File 'lib/amigo.rb', line 108 def structured_logging @structured_logging end |
.subscribers ⇒ Object
An Array of callbacks to be run when an event is published.
143 144 145 |
# File 'lib/amigo.rb', line 143 def subscribers @subscribers end |
.synchronous_mode ⇒ Object
If true, perform event work synchronously rather than asynchronously. Only useful for testing.
136 137 138 |
# File 'lib/amigo.rb', line 136 def synchronous_mode @synchronous_mode end |
Class Method Details
._subscriber(event) ⇒ Object
223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/amigo.rb', line 223 def _subscriber(event) event_json = event.as_json begin self.audit_logger_class.perform_async(event_json) rescue StandardError => e # If the audit logger cannot perform, let's say because Redis is down, # we can run the job manually. This is pretty important for anything used for auditing; # it should be as resilient as possible. self.log(nil, :error, "amigo_audit_log_subscriber_error", error: e, event: event_json) self.audit_logger_class.new.perform(event_json) end self.router_class.perform_async(event_json) end |
.install_amigo_jobs ⇒ Object
Install Amigo so that every publish will be sent to the AuditLogger job and will invoke the relevant jobs in registered_jobs via the Router job.
217 218 219 220 221 |
# File 'lib/amigo.rb', line 217 def install_amigo_jobs return self.register_subscriber do |ev| self._subscriber(ev) end end |
.log(job, level, message, params) ⇒ Object
124 125 126 127 128 129 130 131 132 |
# File 'lib/amigo.rb', line 124 def log(job, level, , params) params ||= {} if !self.structured_logging && !params.empty? paramstr = params.map { |k, v| "#{k}=#{v}" }.join(" ") params[:log_message] = = "#{message} #{paramstr}" end self.log_callback[job, level, , params] end |
.publish(eventname, *payload) ⇒ Object
Publish an event with the specified eventname and payload to any configured publishers.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/amigo.rb', line 161 def publish(eventname, *payload) ev = Event.new(SecureRandom.uuid, eventname, payload) self.subscribers.to_a.each do |hook| hook.call(ev) rescue StandardError => e self.log( nil, :error, "amigo_subscriber_hook_error", error: e, hook: _block_repr(hook), event: ev&.as_json, ) raise e if self.on_publish_error.nil? if self.on_publish_error.respond_to?(:arity) && self.on_publish_error.arity == 1 self.on_publish_error.call(e) else self.on_publish_error.call(e, ev, hook) end end end |
.register_job(job) ⇒ Object
237 238 239 240 |
# File 'lib/amigo.rb', line 237 def register_job(job) self.registered_jobs << job self.registered_jobs.uniq! end |
.register_subscriber(&block) ⇒ Object
Register a hook to be called when an event is sent. If a subscriber errors, on_publish_error is called with the exception, event, and subscriber.
184 185 186 187 188 189 |
# File 'lib/amigo.rb', line 184 def register_subscriber(&block) raise LocalJumpError, "no block given" unless block self.log nil, :info, "amigo_installed_subscriber", block: _block_repr(block) self.subscribers << block return block end |
.registered_event_jobs ⇒ Object
Return an array of all Job subclasses that respond to event publishing (have patterns).
203 204 205 |
# File 'lib/amigo.rb', line 203 def registered_event_jobs return self.registered_jobs.select(&:event_job?) end |
.registered_scheduled_jobs ⇒ Object
Return an array of all Job subclasses that are scheduled (have intervals).
208 209 210 |
# File 'lib/amigo.rb', line 208 def registered_scheduled_jobs return self.registered_jobs.select(&:scheduled_job?) end |
.reset_logging ⇒ Object
119 120 121 122 |
# File 'lib/amigo.rb', line 119 def reset_logging self.log_callback = ->(job, level, msg, _params) { (job || Sidekiq).logger.send(level, msg) } self.structured_logging = false end |
.start_scheduler(load_from_hash = Sidekiq::Cron::Job.method(:load_from_hash)) ⇒ Object
Start the scheduler. This should generally be run in the Sidekiq worker process, not a webserver process.
245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/amigo.rb', line 245 def start_scheduler(load_from_hash=Sidekiq::Cron::Job.method(:load_from_hash)) hash = self.registered_scheduled_jobs.each_with_object({}) do |job, memo| self.log(nil, :info, "scheduling_job_cron", {job_name: job.name, job_cron: job.cron_expr}) memo[job.name] = { "class" => job.name, "cron" => job.cron_expr, } end load_errs = load_from_hash.call(hash) || {} raise StartSchedulerFailed, "Errors loading sidekiq-cron jobs: %p" % [load_errs] unless load_errs.empty? end |
.unregister_subscriber(block_ref) ⇒ Object
198 199 200 |
# File 'lib/amigo.rb', line 198 def unregister_subscriber(block_ref) self.subscribers.delete(block_ref) end |