Class: ScoutApm::BackgroundJobIntegrations::SidekiqMiddleware

Inherits:
Object
  • Object
show all
Defined in:
lib/scout_apm/background_job_integrations/sidekiq.rb

Overview

We insert this middleware into the Sidekiq stack, to capture each job, and time them.

Constant Summary collapse

UNKNOWN_CLASS_PLACEHOLDER =
'UnknownJob'.freeze
ACTIVE_JOB_KLASS =

This name was changed in Sidekiq 8

if sidekiq_version_8?
  'Sidekiq::ActiveJob::Wrapper'.freeze
else
  'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper'.freeze
end
DELAYED_WRAPPER_KLASS =
'Sidekiq::Extensions::DelayedClass'.freeze

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.sidekiq_version_8?Boolean

Returns:

  • (Boolean)


75
76
77
78
79
80
81
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 75

def self.sidekiq_version_8?
  if defined?(::Sidekiq::VERSION)
    ::Sidekiq::VERSION.to_i >= 8
  else
    false
  end
end

Instance Method Details

#add_context!(msg, class_name) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 139

def add_context!(msg, class_name)
  return if class_name == UNKNOWN_CLASS_PLACEHOLDER
  
  klass = class_name.constantize rescue nil
  return if klass.nil?

  # Only allow required and optional parameters, as others aren't fully supported by Sidekiq by default.
  # This also keeps it easy in terms of the canonical signature of parameters.
  allowed_parameter_types = [:req, :opt]

  known_parameters =
    klass.instance_method(:perform).parameters.each_with_object([]) do |(type, name), acc|
      acc << name if allowed_parameter_types.include?(type)
    end

  return if known_parameters.empty?

  job_args = if msg["class"] == ACTIVE_JOB_KLASS
      arguments = msg.fetch('args', [])
      # Don't think this can actually happen. With perform_all_later, 
      # it appears we go through this middleware individually (even with multiples of the same job type).
      return if arguments.length > 1

      arguments.first.fetch('arguments', [])
    else
      msg.fetch('args', [])
    end

  # Reduce known parameters to just the ones that are present in the job arguments (excluding non altered optional params)
  known_parameters = known_parameters[0...job_args.length]

  ScoutApm::Context.add(filter_params(known_parameters.zip(job_args).to_h))
end

#call(_worker, msg, queue) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 52

def call(_worker, msg, queue)
  req = ScoutApm::RequestManager.lookup
  req.annotate_request(:queue_latency => latency(msg))
  class_name = job_class(msg)

  add_context!(msg, class_name) if capture_job_args?

  begin
    req.start_layer(ScoutApm::Layer.new('Queue', queue))
    started_queue = true
    req.start_layer(ScoutApm::Layer.new('Job', class_name))
    started_job = true

    yield
  rescue
    req.error!
    raise
  ensure
    req.stop_layer if started_job
    req.stop_layer if started_queue
  end
end

#capture_job_args?Boolean

Returns:

  • (Boolean)


135
136
137
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 135

def capture_job_args?
  ScoutApm::Agent.instance.context.config.value("job_params_capture")
end

#filter_key?(key) ⇒ Boolean

Check, if a key should be filtered

Returns:

  • (Boolean)


218
219
220
221
222
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 218

def filter_key?(key)
  params_to_filter.any? do |filter|
    key.to_s == filter.to_s # key.to_s.include?(filter.to_s)
  end
end

#filter_params(params) ⇒ Object

Replaces parameter values with a string / set in config file



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 196

def filter_params(params)
  return params unless filtered_params_config

  params.each do |k, v|
    if filter_key?(k)
      params[k] = "[FILTERED]"
      next
    end

    if filter_value?(v)
      params[k] = "[UNSUPPORTED TYPE]"
    end
  end

  params
end

#filter_value?(value) ⇒ Boolean

Returns:

  • (Boolean)


213
214
215
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 213

def filter_value?(value)
  !ScoutApm::Context::VALID_TYPES.any? { |klass| value.is_a?(klass) }
end

#filtered_params_configObject

TODO: Flip this over to use a new class like filtered exceptions? Some shared logic between this and the error service.



230
231
232
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 230

def filtered_params_config
  ScoutApm::Agent.instance.context.config.value("job_filtered_params")
end

#job_class(msg) ⇒ Object

Capturing the class name is a little tricky, since we need to handle several cases:

  1. ActiveJob, with the class in the key ‘wrapped’

  2. ActiveJob, but the ‘wrapped’ key is wrong (due to YAJL serializing weirdly), find it in args.job_class

  3. DelayedJob wrapper, deserializing using YAML into the real object, which can be introspected

  4. No wrapper, just sidekiq’s class



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 98

def job_class(msg)
  job_class = msg.fetch('class', UNKNOWN_CLASS_PLACEHOLDER)

  if job_class == ACTIVE_JOB_KLASS && msg.key?('wrapped') && msg['wrapped'].is_a?(String)
    begin
      job_class = msg['wrapped'].to_s
    rescue
      ACTIVE_JOB_KLASS
    end
  elsif job_class == ACTIVE_JOB_KLASS && msg.try(:[], 'args').try(:[], 'job_class')
    begin
      job_class = msg['args']['job_class'].to_s
    rescue
      ACTIVE_JOB_KLASS
    end
  elsif job_class == DELAYED_WRAPPER_KLASS
    begin
      # Extract the info out of the wrapper
      yml = msg['args'].first
      deserialized_args = YAML.load(yml)
      klass, method, *rest = deserialized_args

      # If this is an instance of a class, get the class itself
      # Prevents instances from coming through named like "#<Foo:0x007ffd7a9dd8a0>"
      klass = klass.class unless klass.is_a? Module

      job_class = [klass, method].map(&:to_s).join(".")
    rescue
      DELAYED_WRAPPER_KLASS
    end
  end

  job_class
rescue
  UNKNOWN_CLASS_PLACEHOLDER
end

#latency(msg, time = Time.now.to_f) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 173

def latency(msg, time = Time.now.to_f)
  created_at = msg['enqueued_at'] || msg['created_at']
  if created_at
    # Sidekiq 8+ uses milliseconds, older versions use seconds.
    # Do it this way because downstream expects seconds.
    if self.class.sidekiq_version_8?
      # Convert milliseconds to seconds for consistency.
      (time - (created_at.to_f / 1000.0))
    else
      (time - created_at)
    end
  else
    0
  end
rescue
  0
end

#params_to_filterObject



224
225
226
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 224

def params_to_filter
  @params_to_filter ||= filtered_params_config + rails_filtered_params
end

#rails_filtered_paramsObject



234
235
236
237
238
239
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 234

def rails_filtered_params
  return [] unless defined?(Rails)
  Rails.configuration.filter_parameters
rescue 
  []
end