Module: Sqskiq
- Defined in:
- lib/sqskiq.rb,
lib/sqskiq/aws.rb,
lib/sqskiq/fetch.rb,
lib/sqskiq/delete.rb,
lib/sqskiq/worker.rb,
lib/sqskiq/manager.rb,
lib/sqskiq/process.rb,
lib/sqskiq/batch_process.rb,
lib/sqskiq/signal_handler.rb
Defined Under Namespace
Modules: AWS, SignalHandler, Worker Classes: BatchProcessor, Deleter, Fetcher, Manager, Processor
Class Method Summary collapse
- .aws_access_key_id=(value) ⇒ Object
- .aws_secret_access_key=(value) ⇒ Object
-
.bootstrap(worker_config, worker_class) ⇒ Object
Configures and starts actor system.
- .configure {|_self| ... } ⇒ Object
-
.configure_signal_listeners ⇒ Object
Subscribes actors to receive system signals Each actor when receives a signal should execute appropriate code to exit cleanly.
-
.valid_config_from(worker_config) ⇒ Object
checks the provided configuration and add the defaults when not specified.
Class Method Details
.aws_access_key_id=(value) ⇒ Object
69 70 71 |
# File 'lib/sqskiq.rb', line 69 def self.aws_access_key_id=(value) @aws_access_key_id = value end |
.aws_secret_access_key=(value) ⇒ Object
73 74 75 |
# File 'lib/sqskiq.rb', line 73 def self.aws_secret_access_key=(value) @aws_secret_access_key = value end |
.bootstrap(worker_config, worker_class) ⇒ Object
Configures and starts actor system
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/sqskiq.rb', line 12 def self.bootstrap(worker_config, worker_class) config = valid_config_from(worker_config) credentials = [ @aws_access_key_id, @aws_secret_access_key, config[:queue_name] ] Celluloid::Actor[:manager] = @manager = Manager.new(config[:empty_queue_throttle]) Celluloid::Actor[:fetcher] = @fetcher = Fetcher.pool(:size => config[:num_fetchers], :args => credentials) Celluloid::Actor[:deleter] = @deleter = Deleter.pool(:size => config[:num_deleters], :args => credentials) Celluloid::Actor[:processor] = @processor = Processor.pool(:size => config[:num_workers], :args => worker_class) Celluloid::Actor[:batcher] = @batcher = BatchProcessor.pool(:size => config[:num_batches]) configure_signal_listeners @manager.bootstrap while @manager.running? do sleep 2 end @manager.terminate end |
.configure {|_self| ... } ⇒ Object
65 66 67 |
# File 'lib/sqskiq.rb', line 65 def self.configure yield self end |
.configure_signal_listeners ⇒ Object
Subscribes actors to receive system signals Each actor when receives a signal should execute appropriate code to exit cleanly
34 35 36 37 38 39 40 41 42 |
# File 'lib/sqskiq.rb', line 34 def self.configure_signal_listeners ['SIGTERM', 'TERM', 'SIGINT'].each do |signal| trap(signal) do @manager.publish('SIGTERM') @batcher.publish('SIGTERM') @processor.publish('SIGTERM') end end end |
.valid_config_from(worker_config) ⇒ Object
checks the provided configuration and add the defaults when not specified
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/sqskiq.rb', line 47 def self.valid_config_from(worker_config) num_workers = (worker_config[:processors].nil? || worker_config[:processors].to_i < 2)? 20 : worker_config[:processors] # messy code due to celluloid pool constraint of 2 as min pool size: see spec for better understanding num_fetchers = num_workers / 10 num_fetchers = num_fetchers + 1 if num_workers % 10 > 0 num_fetchers = 2 if num_fetchers < 2 num_deleters = num_batches = num_fetchers { num_workers: num_workers, num_fetchers: num_fetchers, num_batches: num_batches, num_deleters: num_deleters, queue_name: worker_config[:queue_name], empty_queue_throttle: worker_config[:empty_queue_throttle] || 0 } end |