Class: ActiveJob::StructuredEventSubscriber

Inherits:
ActiveSupport::StructuredEventSubscriber
  • Object
show all
Defined in:
lib/active_job/structured_event_subscriber.rb

Overview

:nodoc:

Instance Method Summary collapse

Instance Method Details

#discard(event) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
# File 'lib/active_job/structured_event_subscriber.rb', line 137

def discard(event)
  job = event.payload[:job]
  exception = event.payload[:error]

  emit_event("active_job.discarded",
    job_class: job.class.name,
    job_id: job.job_id,
    exception_class: exception.class.name,
    exception_message: exception.message
  )
end

#enqueue(event) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/active_job/structured_event_subscriber.rb', line 7

def enqueue(event)
  job = event.payload[:job]
  adapter = event.payload[:adapter]
  exception = event.payload[:exception_object] || job.enqueue_error
  payload = {
    job_class: job.class.name,
    job_id: job.job_id,
    queue: job.queue_name,
    adapter: ActiveJob.adapter_name(adapter),
    aborted: event.payload[:aborted],
  }

  if exception
    payload[:exception_class] = exception.class.name
    payload[:exception_message] = exception.message
  end

  if job.class.log_arguments?
    payload[:arguments] = job.arguments
  end

  emit_event("active_job.enqueued", payload)
end

#enqueue_all(event) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/active_job/structured_event_subscriber.rb', line 56

def enqueue_all(event)
  jobs = event.payload[:jobs]
  adapter = event.payload[:adapter]
  enqueued_count = event.payload[:enqueued_count].to_i
  failed_count = jobs.size - enqueued_count

  emit_event("active_job.bulk_enqueued",
    adapter: ActiveJob.adapter_name(adapter),
    job_count: jobs.size,
    enqueued_count: enqueued_count,
    failed_enqueue_count: failed_count,
    enqueued_classes: jobs.filter_map do |job|
      job.class.name if jobs.count == enqueued_count || job.successfully_enqueued?
    end.tally
  )
end

#enqueue_at(event) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/active_job/structured_event_subscriber.rb', line 31

def enqueue_at(event)
  job = event.payload[:job]
  adapter = event.payload[:adapter]
  exception = event.payload[:exception_object] || job.enqueue_error
  payload = {
    job_class: job.class.name,
    job_id: job.job_id,
    queue: job.queue_name,
    scheduled_at: job.scheduled_at,
    adapter: ActiveJob.adapter_name(adapter),
    aborted: event.payload[:aborted],
  }

  if exception
    payload[:exception_class] = exception.class.name
    payload[:exception_message] = exception.message
  end

  if job.class.log_arguments?
    payload[:arguments] = job.arguments
  end

  emit_event("active_job.enqueued_at", payload)
end

#enqueue_retry(event) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/active_job/structured_event_subscriber.rb', line 109

def enqueue_retry(event)
  job = event.payload[:job]
  exception = event.payload[:error]
  wait = event.payload[:wait]

  emit_event("active_job.retry_scheduled",
    job_class: job.class.name,
    job_id: job.job_id,
    executions: job.executions,
    wait_seconds: wait.to_i,
    exception_class: exception&.class&.name,
    exception_message: exception&.message
  )
end

#interrupt(event) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/active_job/structured_event_subscriber.rb', line 149

def interrupt(event)
  job = event.payload[:job]
  description = event.payload[:description]
  reason = event.payload[:reason]

  emit_event("active_job.interrupt",
    job_class: job.class.name,
    job_id: job.job_id,
    description: description,
    reason: reason,
  )
end

#perform(event) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/active_job/structured_event_subscriber.rb', line 87

def perform(event)
  job = event.payload[:job]
  exception = event.payload[:exception_object]
  adapter = event.payload[:adapter]
  payload = {
    job_class: job.class.name,
    job_id: job.job_id,
    queue: job.queue_name,
    adapter: ActiveJob.adapter_name(adapter),
    aborted: event.payload[:aborted],
    duration: event.duration.round(2),
  }

  if exception
    payload[:exception_class] = exception.class.name
    payload[:exception_message] = exception.message
    payload[:exception_backtrace] = exception.backtrace
  end

  emit_event("active_job.completed", payload)
end

#perform_start(event) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/active_job/structured_event_subscriber.rb', line 73

def perform_start(event)
  job = event.payload[:job]
  payload = {
    job_class: job.class.name,
    job_id: job.job_id,
    queue: job.queue_name,
    enqueued_at: job.enqueued_at&.utc&.iso8601(9),
  }
  if job.class.log_arguments?
    payload[:arguments] = job.arguments
  end
  emit_event("active_job.started", payload)
end

#resume(event) ⇒ Object



162
163
164
165
166
167
168
169
170
171
# File 'lib/active_job/structured_event_subscriber.rb', line 162

def resume(event)
  job = event.payload[:job]
  description = event.payload[:description]

  emit_event("active_job.resume",
    job_class: job.class.name,
    job_id: job.job_id,
    description: description,
  )
end

#retry_stopped(event) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/active_job/structured_event_subscriber.rb', line 124

def retry_stopped(event)
  job = event.payload[:job]
  exception = event.payload[:error]

  emit_event("active_job.retry_stopped",
    job_class: job.class.name,
    job_id: job.job_id,
    executions: job.executions,
    exception_class: exception.class.name,
    exception_message: exception.message
  )
end

#step(event) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/active_job/structured_event_subscriber.rb', line 197

def step(event)
  job = event.payload[:job]
  step = event.payload[:step]
  exception = event.payload[:exception_object]
  payload = {
    job_class: job.class.name,
    job_id: job.job_id,
    step: step.name,
    cursor: step.cursor,
    interrupted: event.payload[:interrupted],
    duration: event.duration.round(2),
  }

  if exception
    payload[:exception_class] = exception.class.name
    payload[:exception_message] = exception.message
  end

  emit_event("active_job.step", payload)
end

#step_skipped(event) ⇒ Object



173
174
175
176
177
178
179
180
181
182
# File 'lib/active_job/structured_event_subscriber.rb', line 173

def step_skipped(event)
  job = event.payload[:job]
  step = event.payload[:step]

  emit_event("active_job.step_skipped",
    job_class: job.class.name,
    job_id: job.job_id,
    step: step.name,
  )
end

#step_started(event) ⇒ Object



184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/active_job/structured_event_subscriber.rb', line 184

def step_started(event)
  job = event.payload[:job]
  step = event.payload[:step]

  emit_event("active_job.step_started",
    job_class: job.class.name,
    job_id: job.job_id,
    step: step.name,
    cursor: step.cursor,
    resumed: step.resumed?,
  )
end