Module: Sqskiq
- Defined in:
- lib/sqskiq.rb,
lib/sqskiq/aws.rb,
lib/sqskiq/fetch.rb,
lib/sqskiq/delete.rb,
lib/sqskiq/worker.rb,
lib/sqskiq/manager.rb,
lib/sqskiq/process.rb,
lib/sqskiq/batch_process.rb,
lib/sqskiq/signal_handler.rb
Defined Under Namespace
Modules: AWS, SignalHandler, Worker
Classes: BatchProcessor, Deleter, Fetcher, Manager, Processor
Class Method Summary
collapse
Class Method Details
.aws_access_key_id=(value) ⇒ Object
68
69
70
|
# File 'lib/sqskiq.rb', line 68
def self.aws_access_key_id=(value)
@aws_access_key_id = value
end
|
.aws_secret_access_key=(value) ⇒ Object
72
73
74
|
# File 'lib/sqskiq.rb', line 72
def self.aws_secret_access_key=(value)
@aws_secret_access_key = value
end
|
.bootstrap(options, worker_class) ⇒ Object
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/sqskiq.rb', line 10
def self.bootstrap(options, worker_class)
params = [ @aws_access_key_id, @aws_secret_access_key, options[:queue_name] ]
configured_pool_sizes = pool_sizes(options)
Celluloid::Actor[:manager] = @manager = Manager.new
Celluloid::Actor[:fetcher] = @fetcher = Fetcher.pool(:size => configured_pool_sizes[:num_fetchers], :args => params)
Celluloid::Actor[:processor] = @processor = Processor.pool(:size => configured_pool_sizes[:num_workers], :args => worker_class)
Celluloid::Actor[:batch_processor] = @batch_processor = BatchProcessor.pool(:size => configured_pool_sizes[:num_batches])
Celluloid::Actor[:deleter] = @deleter = Deleter.pool(:size => configured_pool_sizes[:num_deleters], :args => params)
configure_signal_listeners
@manager.bootstrap
while @manager.running? do
sleep 2
end
@fetcher.__shutdown__
@batch_processor.__shutdown__
@processor.__shutdown__
@deleter.__shutdown__
@manager.terminate
end
|
64
65
66
|
# File 'lib/sqskiq.rb', line 64
def self.configure
yield self
end
|
36
37
38
39
40
41
42
43
44
|
# File 'lib/sqskiq.rb', line 36
def self.configure_signal_listeners
['SIGTERM', 'TERM', 'SIGINT'].each do |signal|
trap(signal) do
@manager.publish('SIGTERM')
@batch_processor.publish('SIGTERM')
@processor.publish('SIGTERM')
end
end
end
|
.pool_sizes(options) ⇒ Object
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# File 'lib/sqskiq.rb', line 46
def self.pool_sizes(options)
num_workers = (options[:processors].nil? || options[:processors].to_i < 2)? 20 : options[:processors]
num_fetchers = num_workers / 10
num_fetchers = num_fetchers + 1 if num_workers % 10 > 0
num_fetchers = 2 if num_fetchers < 2
num_deleters = num_batches = num_fetchers
{ num_workers: num_workers, num_fetchers: num_fetchers, num_batches: num_batches, num_deleters: num_deleters }
end
|