Class: Toro::Manager

Inherits:
Object
  • Object
show all
Includes:
Actor, ActorManager
Defined in:
lib/toro/manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ActorManager

#actors, #register_actor

Methods included from Actor

included

Constructor Details

#initialize(options = {}) ⇒ Manager

Returns a new instance of Manager.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/toro/manager.rb', line 10

def initialize(options={})
  defaults = {
    concurrency: 1,
    queues: [Toro.options[:default_queue]],
  }
  options = defaults.merge(options)
  @queues = options[:queues]
  @threads = {}
  @ready = options[:concurrency].times.map do
    processor = Processor.new_link(current_actor)
    processor.proxy_id = processor.object_id
    processor
  end
  @busy = []
  @is_done = false
  @fetcher = Fetcher.new({ manager: current_actor, queues: options[:queues] })
  @listener = Listener.new({ queues: @queues, fetcher: @fetcher, manager: current_actor })
end

Instance Attribute Details

#busyObject (readonly)

Returns the value of attribute busy.



8
9
10
# File 'lib/toro/manager.rb', line 8

def busy
  @busy
end

#readyObject (readonly)

Returns the value of attribute ready.



8
9
10
# File 'lib/toro/manager.rb', line 8

def ready
  @ready
end

Instance Method Details

#assign(job) ⇒ Object



52
53
54
55
56
57
# File 'lib/toro/manager.rb', line 52

def assign(job)
  raise 'No processors ready' if !is_ready?
  processor = @ready.pop
  @busy << processor
  processor.async.process(job)
end

#clean_up_for_graceful_shutdownObject



69
70
71
72
73
74
75
76
77
# File 'lib/toro/manager.rb', line 69

def clean_up_for_graceful_shutdown
  if @busy.empty?
    shutdown
    return true
  end

  after(Toro.options[:graceful_shutdown_time]) { clean_up_for_graceful_shutdown }
  false
end

#dispatchObject



63
64
65
66
67
# File 'lib/toro/manager.rb', line 63

def dispatch
  raise "No processors, cannot continue!" if @ready.empty? && @busy.empty?
  raise "No ready processor!?" if @ready.empty?
  @fetcher.async.fetch
end

#hard_shutdown_in(delay) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/toro/manager.rb', line 79

def hard_shutdown_in(delay)
  Toro.logger.info "Pausing up to #{delay} seconds to allow workers to finish..."

  after(delay) do
    # We've reached the timeout and we still have busy processors.
    # They must die but their messages shall live on.
    Toro.logger.warn "Terminating #{@busy.size} busy worker threads"

    requeue

    @busy.each do |processor|
      if processor.alive? && thread = @threads.delete(processor.object_id)
        thread.raise Shutdown
      end
    end

    signal_shutdown
  end
end

#heartbeatObject



119
120
121
122
123
124
125
# File 'lib/toro/manager.rb', line 119

def heartbeat
  return if stopped?

  after(5) do
    heartbeat
  end
end

#is_ready?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/toro/manager.rb', line 59

def is_ready?
  !@ready.empty?
end

#processor_complete(processor) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
# File 'lib/toro/manager.rb', line 127

def processor_complete(processor)
  @threads.delete(processor.object_id)
  @busy.delete(processor)
  if stopped?
    processor.terminate if processor.alive?
    shutdown if @busy.empty?
  else
    @ready << processor if processor.alive?
    dispatch
  end
end

#requeueObject



104
105
106
107
108
109
# File 'lib/toro/manager.rb', line 104

def requeue
  Toro::Database.with_connection do
    Job.where(status: 'running', started_by: Toro.process_identity).
      update_all(status: 'queued', started_by: nil, started_at: nil)
  end
end

#set_thread(proxy_id, thread) ⇒ Object



115
116
117
# File 'lib/toro/manager.rb', line 115

def set_thread(proxy_id, thread)
  @threads[proxy_id] = thread
end

#shutdownObject



99
100
101
102
# File 'lib/toro/manager.rb', line 99

def shutdown
  requeue
  signal_shutdown
end

#signal_shutdownObject



111
112
113
# File 'lib/toro/manager.rb', line 111

def signal_shutdown
  after(0) { signal(:shutdown) }
end

#startObject



29
30
31
32
33
34
# File 'lib/toro/manager.rb', line 29

def start
  @is_done = false
  @listener.async.start
  @ready.each { dispatch }
  heartbeat
end

#stopObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/toro/manager.rb', line 36

def stop
  @is_done = true

  Toro.logger.debug "Shutting down #{@ready.size} quiet workers"
  @ready.each { |processor| processor.terminate if processor.alive? }
  @ready.clear
  @fetcher.terminate if @fetcher.alive?
  if @listener.alive?
    actors[:listener].stop if actors[:listener]
    @listener.terminate
  end
  return if clean_up_for_graceful_shutdown

  hard_shutdown_in(Toro.options[:hard_shutdown_time])
end

#stopped?Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/toro/manager.rb', line 139

def stopped?
  @is_done
end