Class: Shoryuken::Processor

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Util
Defined in:
lib/shoryuken/processor.rb

Defined Under Namespace

Classes: MessageVisibilityExtender

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#elapsed, #fire_event, #logger, #unparse_queues, #watchdog, #worker_name

Constructor Details

#initialize(manager) ⇒ Processor

Returns a new instance of Processor


8
9
10
# File 'lib/shoryuken/processor.rb', line 8

def initialize(manager)
  @manager = manager
end

Instance Attribute Details

#proxy_idObject

Returns the value of attribute proxy_id


12
13
14
# File 'lib/shoryuken/processor.rb', line 12

def proxy_id
  @proxy_id
end

Instance Method Details

#process(queue, sqs_msg) ⇒ Object


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/shoryuken/processor.rb', line 14

def process(queue, sqs_msg)
  @manager.async.real_thread(proxy_id, Thread.current)

  worker = Shoryuken.worker_registry.fetch_worker(queue, sqs_msg)

  timer = auto_visibility_timeout(queue, sqs_msg, worker.class)

  begin
    body = get_body(worker.class, sqs_msg)

    worker.class.server_middleware.invoke(worker, queue, sqs_msg, body) do
      worker.perform(sqs_msg, body)
    end

    @manager.async.processor_done(queue, current_actor)
  ensure
    timer.cancel if timer
  end
end