Class: ScoutApm::BackgroundJobIntegrations::SidekiqMiddleware
- Inherits:
-
Object
- Object
- ScoutApm::BackgroundJobIntegrations::SidekiqMiddleware
- Defined in:
- lib/scout_apm/background_job_integrations/sidekiq.rb
Overview
We insert this middleware into the Sidekiq stack, to capture each job, and time them.
Constant Summary collapse
- UNKNOWN_CLASS_PLACEHOLDER =
'UnknownJob'.freeze
- ACTIVE_JOB_KLASS =
This name was changed in Sidekiq 8
if sidekiq_version_8? 'Sidekiq::ActiveJob::Wrapper'.freeze else 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper'.freeze end
- DELAYED_WRAPPER_KLASS =
'Sidekiq::Extensions::DelayedClass'.freeze
Class Method Summary collapse
Instance Method Summary collapse
- #add_context!(msg, class_name) ⇒ Object
- #call(_worker, msg, queue) ⇒ Object
- #capture_job_args? ⇒ Boolean
-
#filter_key?(key) ⇒ Boolean
Check, if a key should be filtered.
-
#filter_params(params) ⇒ Object
Replaces parameter values with a string / set in config file.
- #filter_value?(value) ⇒ Boolean
-
#filtered_params_config ⇒ Object
TODO: Flip this over to use a new class like filtered exceptions? Some shared logic between this and the error service.
-
#job_class(msg) ⇒ Object
Capturing the class name is a little tricky, since we need to handle several cases: 1.
- #latency(msg, time = Time.now.to_f) ⇒ Object
- #params_to_filter ⇒ Object
- #rails_filtered_params ⇒ Object
Class Method Details
.sidekiq_version_8? ⇒ Boolean
75 76 77 78 79 80 81 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 75 def self.sidekiq_version_8? if defined?(::Sidekiq::VERSION) ::Sidekiq::VERSION.to_i >= 8 else false end end |
Instance Method Details
#add_context!(msg, class_name) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 139 def add_context!(msg, class_name) return if class_name == UNKNOWN_CLASS_PLACEHOLDER klass = class_name.constantize rescue nil return if klass.nil? # Only allow required and optional parameters, as others aren't fully supported by Sidekiq by default. # This also keeps it easy in terms of the canonical signature of parameters. allowed_parameter_types = [:req, :opt] known_parameters = klass.instance_method(:perform).parameters.each_with_object([]) do |(type, name), acc| acc << name if allowed_parameter_types.include?(type) end return if known_parameters.empty? job_args = if msg["class"] == ACTIVE_JOB_KLASS arguments = msg.fetch('args', []) # Don't think this can actually happen. With perform_all_later, # it appears we go through this middleware individually (even with multiples of the same job type). return if arguments.length > 1 arguments.first.fetch('arguments', []) else msg.fetch('args', []) end # Reduce known parameters to just the ones that are present in the job arguments (excluding non altered optional params) known_parameters = known_parameters[0...job_args.length] ScoutApm::Context.add(filter_params(known_parameters.zip(job_args).to_h)) end |
#call(_worker, msg, queue) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 52 def call(_worker, msg, queue) req = ScoutApm::RequestManager.lookup req.annotate_request(:queue_latency => latency(msg)) class_name = job_class(msg) add_context!(msg, class_name) if capture_job_args? begin req.start_layer(ScoutApm::Layer.new('Queue', queue)) started_queue = true req.start_layer(ScoutApm::Layer.new('Job', class_name)) started_job = true yield rescue req.error! raise ensure req.stop_layer if started_job req.stop_layer if started_queue end end |
#capture_job_args? ⇒ Boolean
135 136 137 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 135 def capture_job_args? ScoutApm::Agent.instance.context.config.value("job_params_capture") end |
#filter_key?(key) ⇒ Boolean
Check, if a key should be filtered
218 219 220 221 222 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 218 def filter_key?(key) params_to_filter.any? do |filter| key.to_s == filter.to_s # key.to_s.include?(filter.to_s) end end |
#filter_params(params) ⇒ Object
Replaces parameter values with a string / set in config file
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 196 def filter_params(params) return params unless filtered_params_config params.each do |k, v| if filter_key?(k) params[k] = "[FILTERED]" next end if filter_value?(v) params[k] = "[UNSUPPORTED TYPE]" end end params end |
#filter_value?(value) ⇒ Boolean
213 214 215 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 213 def filter_value?(value) !ScoutApm::Context::VALID_TYPES.any? { |klass| value.is_a?(klass) } end |
#filtered_params_config ⇒ Object
TODO: Flip this over to use a new class like filtered exceptions? Some shared logic between this and the error service.
230 231 232 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 230 def filtered_params_config ScoutApm::Agent.instance.context.config.value("job_filtered_params") end |
#job_class(msg) ⇒ Object
Capturing the class name is a little tricky, since we need to handle several cases:
-
ActiveJob, with the class in the key ‘wrapped’
-
ActiveJob, but the ‘wrapped’ key is wrong (due to YAJL serializing weirdly), find it in args.job_class
-
DelayedJob wrapper, deserializing using YAML into the real object, which can be introspected
-
No wrapper, just sidekiq’s class
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 98 def job_class(msg) job_class = msg.fetch('class', UNKNOWN_CLASS_PLACEHOLDER) if job_class == ACTIVE_JOB_KLASS && msg.key?('wrapped') && msg['wrapped'].is_a?(String) begin job_class = msg['wrapped'].to_s rescue ACTIVE_JOB_KLASS end elsif job_class == ACTIVE_JOB_KLASS && msg.try(:[], 'args').try(:[], 'job_class') begin job_class = msg['args']['job_class'].to_s rescue ACTIVE_JOB_KLASS end elsif job_class == DELAYED_WRAPPER_KLASS begin # Extract the info out of the wrapper yml = msg['args'].first deserialized_args = YAML.load(yml) klass, method, *rest = deserialized_args # If this is an instance of a class, get the class itself # Prevents instances from coming through named like "#<Foo:0x007ffd7a9dd8a0>" klass = klass.class unless klass.is_a? Module job_class = [klass, method].map(&:to_s).join(".") rescue DELAYED_WRAPPER_KLASS end end job_class rescue UNKNOWN_CLASS_PLACEHOLDER end |
#latency(msg, time = Time.now.to_f) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 173 def latency(msg, time = Time.now.to_f) created_at = msg['enqueued_at'] || msg['created_at'] if created_at # Sidekiq 8+ uses milliseconds, older versions use seconds. # Do it this way because downstream expects seconds. if self.class.sidekiq_version_8? # Convert milliseconds to seconds for consistency. (time - (created_at.to_f / 1000.0)) else (time - created_at) end else 0 end rescue 0 end |
#params_to_filter ⇒ Object
224 225 226 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 224 def params_to_filter @params_to_filter ||= filtered_params_config + rails_filtered_params end |
#rails_filtered_params ⇒ Object
234 235 236 237 238 239 |
# File 'lib/scout_apm/background_job_integrations/sidekiq.rb', line 234 def rails_filtered_params return [] unless defined?(Rails) Rails.configuration.filter_parameters rescue [] end |