Class: Toiler::Actor::Fetcher

Inherits:
Concurrent::Actor::RestartingContext
  • Object
show all
Includes:
Utils::ActorLogging
Defined in:
lib/toiler/actor/fetcher.rb

Overview

Actor pulling messages only when processors are ready, otherwise idle

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils::ActorLogging

#debug, #error, #fatal, #info, #warn

Constructor Details

#initialize(queue_name, count, provider) ⇒ Fetcher

Returns a new instance of Fetcher.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/toiler/actor/fetcher.rb', line 17

def initialize(queue_name, count, provider)
  super()

  debug "Initializing Fetcher for queue #{queue_name} and provider #{provider}..."
  @wait = Toiler.options[:wait] || 60
  @free_processors = count
  @executing = false
  @waiting_messages = 0
  @concurrency = count
  @scheduled_task = nil
  init_queue(queue_name, provider)
  debug "Finished initializing Fetcher for queue #{queue_name} and provider #{provider}..."
  tell :pull_messages
end

Instance Attribute Details

#ack_deadlineObject (readonly)

Returns the value of attribute ack_deadline.



13
14
15
# File 'lib/toiler/actor/fetcher.rb', line 13

def ack_deadline
  @ack_deadline
end

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



13
14
15
# File 'lib/toiler/actor/fetcher.rb', line 13

def concurrency
  @concurrency
end

#executingObject (readonly)

Returns the value of attribute executing.



13
14
15
# File 'lib/toiler/actor/fetcher.rb', line 13

def executing
  @executing
end

#free_processorsObject (readonly)

Returns the value of attribute free_processors.



13
14
15
# File 'lib/toiler/actor/fetcher.rb', line 13

def free_processors
  @free_processors
end

#queueObject (readonly)

Returns the value of attribute queue.



13
14
15
# File 'lib/toiler/actor/fetcher.rb', line 13

def queue
  @queue
end

#scheduled_taskObject (readonly)

Returns the value of attribute scheduled_task.



13
14
15
# File 'lib/toiler/actor/fetcher.rb', line 13

def scheduled_task
  @scheduled_task
end

#waitObject (readonly)

Returns the value of attribute wait.



13
14
15
# File 'lib/toiler/actor/fetcher.rb', line 13

def wait
  @wait
end

#waiting_messagesObject (readonly)

Returns the value of attribute waiting_messages.



13
14
15
# File 'lib/toiler/actor/fetcher.rb', line 13

def waiting_messages
  @waiting_messages
end

Instance Method Details

#default_executorObject



32
33
34
# File 'lib/toiler/actor/fetcher.rb', line 32

def default_executor
  Concurrent.global_fast_executor
end

#executing?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/toiler/actor/fetcher.rb', line 47

def executing?
  @executing
end

#on_message(msg) ⇒ Object



36
37
38
39
40
41
42
43
44
45
# File 'lib/toiler/actor/fetcher.rb', line 36

def on_message(msg)
  @executing = true
  method, *args = msg
  send(method, *args)
rescue StandardError, SystemStackError => e
  # if we misbehave and cause a stack level too deep exception, we should be able to recover
  error "Fetcher #{@queue.name} raised exception #{e.class}: #{e.message}\n#{e.backtrace.join("\n")}"
ensure
  @executing = false
end