Class: Toiler::Actor::Fetcher

Inherits:
Concurrent::Actor::RestartingContext
  • Object
show all
Includes:
Utils::ActorLogging
Defined in:
lib/toiler/actor/fetcher.rb

Overview

Actor polling for messages only when processors are ready, otherwise idle

Constant Summary collapse

FETCH_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils::ActorLogging

#debug, #error, #fatal, #info, #warn

Constructor Details

#initialize(queue, client, count) ⇒ Fetcher

Returns a new instance of Fetcher.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/toiler/actor/fetcher.rb', line 15

def initialize(queue, client, count)
  debug "Initializing Fetcher for queue #{queue}..."
  @queue = Toiler::Aws::Queue.new queue, client
  @wait = Toiler.options[:wait] || 60
  @free_processors = count
  @batch = Toiler.worker_class_registry[queue].batch?
  @visibility_timeout = @queue.visibility_timeout
  @executing = false
  @waiting_messages = 0
  @concurrency = count
  debug "Finished initializing Fetcher for queue #{queue}"
  tell :poll_messages
end

Instance Attribute Details

#concurrencyObject

Returns the value of attribute concurrency.



12
13
14
# File 'lib/toiler/actor/fetcher.rb', line 12

def concurrency
  @concurrency
end

#executingObject

Returns the value of attribute executing.



12
13
14
# File 'lib/toiler/actor/fetcher.rb', line 12

def executing
  @executing
end

#free_processorsObject

Returns the value of attribute free_processors.



12
13
14
# File 'lib/toiler/actor/fetcher.rb', line 12

def free_processors
  @free_processors
end

#queueObject

Returns the value of attribute queue.



12
13
14
# File 'lib/toiler/actor/fetcher.rb', line 12

def queue
  @queue
end

#visibility_timeoutObject

Returns the value of attribute visibility_timeout.



12
13
14
# File 'lib/toiler/actor/fetcher.rb', line 12

def visibility_timeout
  @visibility_timeout
end

#waitObject

Returns the value of attribute wait.



12
13
14
# File 'lib/toiler/actor/fetcher.rb', line 12

def wait
  @wait
end

#waiting_messagesObject

Returns the value of attribute waiting_messages.



12
13
14
# File 'lib/toiler/actor/fetcher.rb', line 12

def waiting_messages
  @waiting_messages
end

Instance Method Details

#default_executorObject



29
30
31
# File 'lib/toiler/actor/fetcher.rb', line 29

def default_executor
  Concurrent.global_fast_executor
end

#executing?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/toiler/actor/fetcher.rb', line 44

def executing?
  @executing
end

#on_message(msg) ⇒ Object



33
34
35
36
37
38
39
40
41
42
# File 'lib/toiler/actor/fetcher.rb', line 33

def on_message(msg)
  @executing = true
  method, *args = msg
  send(method, *args)
rescue StandardError, SystemStackError => e
  # rescue SystemStackError, if we misbehave and cause a stack level too deep exception, we should be able to recover
  error "Fetcher #{queue.name} raised exception #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
ensure
  @executing = false
end