Module: Sneakers::Worker

Includes:
Concerns::Logging, Concerns::Metrics, ErrorReporter
Defined in:
lib/sneakers/worker.rb

Defined Under Namespace

Modules: ClassMethods

Constant Summary collapse

Classes =
[]

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ErrorReporter

#worker_error

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id



6
7
8
# File 'lib/sneakers/worker.rb', line 6

def id
  @id
end

#optsObject (readonly)

Returns the value of attribute opts



6
7
8
# File 'lib/sneakers/worker.rb', line 6

def opts
  @opts
end

#queueObject (readonly)

Returns the value of attribute queue



6
7
8
# File 'lib/sneakers/worker.rb', line 6

def queue
  @queue
end

Class Method Details

.included(base) ⇒ Object



119
120
121
122
# File 'lib/sneakers/worker.rb', line 119

def self.included(base)
  base.extend ClassMethods
  Classes << base if base.is_a? Class
end

Instance Method Details

#ack!Object



33
# File 'lib/sneakers/worker.rb', line 33

def ack!; :ack end

#do_work(delivery_info, metadata, msg, handler) ⇒ Object



44
45
46
47
48
49
50
# File 'lib/sneakers/worker.rb', line 44

def do_work(delivery_info, , msg, handler)
  worker_trace "Working off: #{msg.inspect}"

  @pool.post do
    process_work(delivery_info, , msg, handler)
  end
end

#initialize(queue = nil, pool = nil, opts = {}) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/sneakers/worker.rb', line 14

def initialize(queue = nil, pool = nil, opts = {})
  opts = opts.merge(self.class.queue_opts || {})
  queue_name = self.class.queue_name
  opts = Sneakers::CONFIG.merge(opts)

  @should_ack =  opts[:ack]
  @pool = pool || Concurrent::FixedThreadPool.new(opts[:threads] || Sneakers::Configuration::DEFAULTS[:threads])
  @call_with_params = respond_to?(:work_with_params)
  @content_type = opts[:content_type]

  @queue = queue || Sneakers::Queue.new(
    queue_name,
    opts
  )

  @opts = opts
  @id = Utils.make_worker_id(queue_name)
end

#log_msg(msg) ⇒ Object

Construct a log message with some standard prefix for this worker



109
110
111
# File 'lib/sneakers/worker.rb', line 109

def log_msg(msg)
  "[#{@id}][#{Thread.current}][#{@queue.name}][#{@queue.opts}] #{msg}"
end

#process_work(delivery_info, metadata, msg, handler) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/sneakers/worker.rb', line 52

def process_work(delivery_info, , msg, handler)
  res = nil
  error = nil

  begin
    metrics.increment("work.#{self.class.name}.started")
    metrics.timing("work.#{self.class.name}.time") do
      deserialized_msg = ContentType.deserialize(msg, @content_type ||  && [:content_type])
      if @call_with_params
        res = work_with_params(deserialized_msg, delivery_info, )
      else
        res = work(deserialized_msg)
      end
    end
  rescue StandardError, ScriptError => ex
    res = :error
    error = ex
    worker_error(ex, log_msg: log_msg(msg), class: self.class.name,
                 message: msg, delivery_info: delivery_info, metadata: )
  end

  if @should_ack

    if res == :ack
      # note to future-self. never acknowledge multiple (multiple=true) messages under threads.
      handler.acknowledge(delivery_info, , msg)
    elsif res == :error
      handler.error(delivery_info, , msg, error)
    elsif res == :reject
      handler.reject(delivery_info, , msg)
    elsif res == :requeue
      handler.reject(delivery_info, , msg, true)
    else
      handler.noop(delivery_info, , msg)
    end
    metrics.increment("work.#{self.class.name}.handled.#{res || 'noop'}")
  end

  metrics.increment("work.#{self.class.name}.ended")
end

#publish(msg, opts) ⇒ Object



37
38
39
40
41
42
# File 'lib/sneakers/worker.rb', line 37

def publish(msg, opts)
  to_queue = opts.delete(:to_queue)
  opts[:routing_key] ||= to_queue
  return unless opts[:routing_key]
  @queue.exchange.publish(Sneakers::ContentType.serialize(msg, opts[:content_type]), opts)
end

#reject!Object



34
# File 'lib/sneakers/worker.rb', line 34

def reject!; :reject; end

#requeue!Object



35
# File 'lib/sneakers/worker.rb', line 35

def requeue!; :requeue; end

#runObject



102
103
104
105
106
# File 'lib/sneakers/worker.rb', line 102

def run
  worker_trace "New worker: subscribing."
  @queue.subscribe(self)
  worker_trace "New worker: I'm alive."
end

#stopObject



93
94
95
96
97
98
99
100
# File 'lib/sneakers/worker.rb', line 93

def stop
  worker_trace "Stopping worker: unsubscribing."
  @queue.unsubscribe
  worker_trace "Stopping worker: shutting down thread pool."
  @pool.shutdown
  @pool.wait_for_termination
  worker_trace "Stopping worker: I'm gone."
end

#worker_trace(msg) ⇒ Object



113
114
115
# File 'lib/sneakers/worker.rb', line 113

def worker_trace(msg)
  logger.debug(log_msg(msg))
end