Class: Sqskiq::Manager

Inherits:
Object
  • Object
show all
Includes:
Celluloid, SignalHandler
Defined in:
lib/sqskiq/manager.rb

Instance Method Summary collapse

Methods included from SignalHandler

#shutting_down, #subscribe_for_shutdown

Constructor Details

#initialize(empty_queue_throttle) ⇒ Manager

Returns a new instance of Manager.



12
13
14
15
# File 'lib/sqskiq/manager.rb', line 12

def initialize(empty_queue_throttle)
  @empty_queue_throttle = empty_queue_throttle
  subscribe_for_shutdown      
end

Instance Method Details

#batch_done(messages) ⇒ Object



30
31
32
33
# File 'lib/sqskiq/manager.rb', line 30

def batch_done(messages)
  @deleter.async.delete(messages)
  new_fetch(1)
end

#bootstrapObject



17
18
19
20
21
22
23
# File 'lib/sqskiq/manager.rb', line 17

def bootstrap
  @fetcher = Celluloid::Actor[:fetcher]
  @batcher = Celluloid::Actor[:batcher]
  @deleter = Celluloid::Actor[:deleter]

  new_fetch(@fetcher.size)
end

#fetch_done(messages) ⇒ Object



25
26
27
28
# File 'lib/sqskiq/manager.rb', line 25

def fetch_done(messages)
  @empty_queue = messages.empty?
  @batcher.async.process(messages) unless @shutting_down
end

#new_fetch(num) ⇒ Object



35
36
37
38
39
# File 'lib/sqskiq/manager.rb', line 35

def new_fetch(num)
  after(throttle) do
    num.times { @fetcher.async.fetch unless @shutting_down }
  end
end

#running?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/sqskiq/manager.rb', line 41

def running?
  not (@shutting_down and @deleter.busy_size == 0 and @batcher.busy_size == 0)
end

#throttleObject



45
46
47
# File 'lib/sqskiq/manager.rb', line 45

def throttle
  @empty_queue ? @empty_queue_throttle : 0
end