Class: ActiveJob::StructuredEventSubscriber
- Inherits:
-
ActiveSupport::StructuredEventSubscriber
- Object
- ActiveSupport::StructuredEventSubscriber
- ActiveJob::StructuredEventSubscriber
- Defined in:
- lib/active_job/structured_event_subscriber.rb
Overview
:nodoc:
Instance Method Summary collapse
- #discard(event) ⇒ Object
- #enqueue(event) ⇒ Object
- #enqueue_all(event) ⇒ Object
- #enqueue_at(event) ⇒ Object
- #enqueue_retry(event) ⇒ Object
- #interrupt(event) ⇒ Object
- #perform(event) ⇒ Object
- #perform_start(event) ⇒ Object
- #resume(event) ⇒ Object
- #retry_stopped(event) ⇒ Object
- #step(event) ⇒ Object
- #step_skipped(event) ⇒ Object
- #step_started(event) ⇒ Object
Instance Method Details
#discard(event) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/active_job/structured_event_subscriber.rb', line 137 def discard(event) job = event.payload[:job] exception = event.payload[:error] emit_event("active_job.discarded", job_class: job.class.name, job_id: job.job_id, exception_class: exception.class.name, exception_message: exception. ) end |
#enqueue(event) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/active_job/structured_event_subscriber.rb', line 7 def enqueue(event) job = event.payload[:job] adapter = event.payload[:adapter] exception = event.payload[:exception_object] || job.enqueue_error payload = { job_class: job.class.name, job_id: job.job_id, queue: job.queue_name, adapter: ActiveJob.adapter_name(adapter), aborted: event.payload[:aborted], } if exception payload[:exception_class] = exception.class.name payload[:exception_message] = exception. end if job.class.log_arguments? payload[:arguments] = job.arguments end emit_event("active_job.enqueued", payload) end |
#enqueue_all(event) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/active_job/structured_event_subscriber.rb', line 56 def enqueue_all(event) jobs = event.payload[:jobs] adapter = event.payload[:adapter] enqueued_count = event.payload[:enqueued_count].to_i failed_count = jobs.size - enqueued_count emit_event("active_job.bulk_enqueued", adapter: ActiveJob.adapter_name(adapter), job_count: jobs.size, enqueued_count: enqueued_count, failed_enqueue_count: failed_count, enqueued_classes: jobs.filter_map do |job| job.class.name if jobs.count == enqueued_count || job.successfully_enqueued? end.tally ) end |
#enqueue_at(event) ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/active_job/structured_event_subscriber.rb', line 31 def enqueue_at(event) job = event.payload[:job] adapter = event.payload[:adapter] exception = event.payload[:exception_object] || job.enqueue_error payload = { job_class: job.class.name, job_id: job.job_id, queue: job.queue_name, scheduled_at: job.scheduled_at, adapter: ActiveJob.adapter_name(adapter), aborted: event.payload[:aborted], } if exception payload[:exception_class] = exception.class.name payload[:exception_message] = exception. end if job.class.log_arguments? payload[:arguments] = job.arguments end emit_event("active_job.enqueued_at", payload) end |
#enqueue_retry(event) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/active_job/structured_event_subscriber.rb', line 109 def enqueue_retry(event) job = event.payload[:job] exception = event.payload[:error] wait = event.payload[:wait] emit_event("active_job.retry_scheduled", job_class: job.class.name, job_id: job.job_id, executions: job.executions, wait_seconds: wait.to_i, exception_class: exception&.class&.name, exception_message: exception&. ) end |
#interrupt(event) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/active_job/structured_event_subscriber.rb', line 149 def interrupt(event) job = event.payload[:job] description = event.payload[:description] reason = event.payload[:reason] emit_event("active_job.interrupt", job_class: job.class.name, job_id: job.job_id, description: description, reason: reason, ) end |
#perform(event) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/active_job/structured_event_subscriber.rb', line 87 def perform(event) job = event.payload[:job] exception = event.payload[:exception_object] adapter = event.payload[:adapter] payload = { job_class: job.class.name, job_id: job.job_id, queue: job.queue_name, adapter: ActiveJob.adapter_name(adapter), aborted: event.payload[:aborted], duration: event.duration.round(2), } if exception payload[:exception_class] = exception.class.name payload[:exception_message] = exception. payload[:exception_backtrace] = exception.backtrace end emit_event("active_job.completed", payload) end |
#perform_start(event) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/active_job/structured_event_subscriber.rb', line 73 def perform_start(event) job = event.payload[:job] payload = { job_class: job.class.name, job_id: job.job_id, queue: job.queue_name, enqueued_at: job.enqueued_at&.utc&.iso8601(9), } if job.class.log_arguments? payload[:arguments] = job.arguments end emit_event("active_job.started", payload) end |
#resume(event) ⇒ Object
162 163 164 165 166 167 168 169 170 171 |
# File 'lib/active_job/structured_event_subscriber.rb', line 162 def resume(event) job = event.payload[:job] description = event.payload[:description] emit_event("active_job.resume", job_class: job.class.name, job_id: job.job_id, description: description, ) end |
#retry_stopped(event) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/active_job/structured_event_subscriber.rb', line 124 def retry_stopped(event) job = event.payload[:job] exception = event.payload[:error] emit_event("active_job.retry_stopped", job_class: job.class.name, job_id: job.job_id, executions: job.executions, exception_class: exception.class.name, exception_message: exception. ) end |
#step(event) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/active_job/structured_event_subscriber.rb', line 197 def step(event) job = event.payload[:job] step = event.payload[:step] exception = event.payload[:exception_object] payload = { job_class: job.class.name, job_id: job.job_id, step: step.name, cursor: step.cursor, interrupted: event.payload[:interrupted], duration: event.duration.round(2), } if exception payload[:exception_class] = exception.class.name payload[:exception_message] = exception. end emit_event("active_job.step", payload) end |
#step_skipped(event) ⇒ Object
173 174 175 176 177 178 179 180 181 182 |
# File 'lib/active_job/structured_event_subscriber.rb', line 173 def step_skipped(event) job = event.payload[:job] step = event.payload[:step] emit_event("active_job.step_skipped", job_class: job.class.name, job_id: job.job_id, step: step.name, ) end |
#step_started(event) ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/active_job/structured_event_subscriber.rb', line 184 def step_started(event) job = event.payload[:job] step = event.payload[:step] emit_event("active_job.step_started", job_class: job.class.name, job_id: job.job_id, step: step.name, cursor: step.cursor, resumed: step.resumed?, ) end |