Class: Toiler::Actor::Processor

Inherits:
Concurrent::Actor::RestartingContext
  • Object
show all
Includes:
Utils::ActorLogging
Defined in:
lib/toiler/actor/processor.rb

Overview

Responsible for processing sqs messages and notifying Fetcher when done

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue) ⇒ Processor

Returns a new instance of Processor.



15
16
17
18
19
20
21
22
23
# File 'lib/toiler/actor/processor.rb', line 15

def initialize(queue)
  super()

  @queue = queue
  @worker_class = Toiler.worker_class_registry[queue]
  @executing = false
  @thread = nil
  init_options
end

Instance Attribute Details

#body_parserObject (readonly)

Returns the value of attribute body_parser.



12
13
14
# File 'lib/toiler/actor/processor.rb', line 12

def body_parser
  @body_parser
end

#executingObject (readonly)

Returns the value of attribute executing.



12
13
14
# File 'lib/toiler/actor/processor.rb', line 12

def executing
  @executing
end

#queueObject (readonly)

Returns the value of attribute queue.



12
13
14
# File 'lib/toiler/actor/processor.rb', line 12

def queue
  @queue
end

#threadObject (readonly)

Returns the value of attribute thread.



12
13
14
# File 'lib/toiler/actor/processor.rb', line 12

def thread
  @thread
end

#worker_classObject (readonly)

Returns the value of attribute worker_class.



12
13
14
# File 'lib/toiler/actor/processor.rb', line 12

def worker_class
  @worker_class
end

Instance Method Details

#default_executorObject



25
26
27
# File 'lib/toiler/actor/processor.rb', line 25

def default_executor
  Concurrent.global_io_executor
end

#executing?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/toiler/actor/processor.rb', line 41

def executing?
  @executing
end

#fetcherObject



29
30
31
# File 'lib/toiler/actor/processor.rb', line 29

def fetcher
  @fetcher ||= Toiler.fetcher @queue
end

#on_message(msg) ⇒ Object



33
34
35
36
37
38
39
# File 'lib/toiler/actor/processor.rb', line 33

def on_message(msg)
  method, *args = msg
  send(method, *args)
rescue StandardError, SystemStackError => e
  # if clients misbehave and cause a stack level too deep exception, we should be able to recover
  error "Processor #{@queue} failed processing, reason: #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
end