Class: Sqspoller::SqsPoller
- Inherits:
-
Object
- Object
- Sqspoller::SqsPoller
- Defined in:
- lib/sqspoller/sqs_poller.rb
Class Method Summary collapse
- .daemonize(filename) ⇒ Object
- .initialize_worker(worker_configuration, total_poller_threads) ⇒ Object
- .start_poller(filename, queue_config_name, access_key_id, secret_access_key, region, log_filename = nil) ⇒ Object
- .start_poller_with_config(config, queue_config_name, access_key_id, secret_access_key, region) ⇒ Object
- .sym(map) ⇒ Object
Class Method Details
.daemonize(filename) ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/sqspoller/sqs_poller.rb', line 22 def daemonize(filename) raise 'Must run as root' if Process.euid != 0 raise 'First fork failed' if (pid = fork) == -1 exit unless pid.nil? Process.setsid raise 'Second fork failed' if (pid = fork) == -1 exit unless pid.nil? puts "Daemon pid: #{Process.pid}" # Or save it somewhere, etc. Dir.chdir '/' File.umask 0000 STDIN.reopen filename STDOUT.reopen '/dev/null', 'a' STDERR.reopen STDOUT end |
.initialize_worker(worker_configuration, total_poller_threads) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/sqspoller/sqs_poller.rb', line 83 def initialize_worker(worker_configuration, total_poller_threads) worker_thread_count = worker_configuration[:concurrency] worker_task = worker_configuration[:worker_class].split('::').inject(Object) {|o,c| o.const_get c}.new(worker_configuration) waiting_tasks_ratio = worker_configuration[:waiting_tasks_ratio] waiting_tasks_ratio = 1 if waiting_tasks_ratio.nil? if worker_thread_count.nil? = MessageDelegator.new total_poller_threads, waiting_tasks_ratio, worker_task else = MessageDelegator.new worker_thread_count, waiting_tasks_ratio, worker_task end return end |
.start_poller(filename, queue_config_name, access_key_id, secret_access_key, region, log_filename = nil) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/sqspoller/sqs_poller.rb', line 65 def start_poller(filename, queue_config_name, access_key_id, secret_access_key, region, log_filename=nil) puts "Starting poller" config = YAML.load(ERB.new(IO.read(filename)).result) config = sym(config) if log_filename.nil? || log_filename.empty? puts "Did not receive log file name" fork do Process.daemon start_poller_with_config config, queue_config_name, access_key_id, secret_access_key, region end else puts "Did receive log file name" daemonize log_filename start_poller_with_config config, queue_config_name, access_key_id, secret_access_key, region end end |
.start_poller_with_config(config, queue_config_name, access_key_id, secret_access_key, region) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/sqspoller/sqs_poller.rb', line 41 def start_poller_with_config(config, queue_config_name, access_key_id, secret_access_key, region) puts "Started poller method" @logger = Logger.new(STDOUT) total_poller_threads = 0 qcs = [] queues_config = config[queue_config_name] || config[queue_config_name.to_sym] queues_config.keys.each { |queue| total_poller_threads += queues_config[queue][:polling_threads] } = initialize_worker config[:worker_configuration], total_poller_threads queues_config.keys.each { |queue| @logger.info "Creating QueueController object for queue: #{queue}" qc = QueueController.new queue, queues_config[queue][:polling_threads], , access_key_id, secret_access_key, region qcs << qc } qcs.each { |qc| qc.start } qcs.each{ |qc| qc.threads.each { |thread| thread.join } } end |
.sym(map) ⇒ Object
15 16 17 18 19 20 |
# File 'lib/sqspoller/sqs_poller.rb', line 15 def sym(map) if map.class == Hash map = map.inject({}){|memo,(k,v)| memo[k.to_sym] = sym(v); memo} end return map end |