Class: BackgroundProxy::Worker
- Inherits:
-
Object
- Object
- BackgroundProxy::Worker
- Defined in:
- lib/background_proxy/worker.rb
Constant Summary collapse
- SLEEP_INTERVAL =
5
Instance Method Summary collapse
- #async(method, *args) ⇒ Object
- #check_background_queue(thread_num = 0) ⇒ Object
- #current_thread_count ⇒ Object
- #current_threads ⇒ Object
-
#initialize(client, options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #log(level, msg) ⇒ Object
- #spawn_threads!(num_threads) ⇒ Object
- #stop! ⇒ Object
- #stopped? ⇒ Boolean
Constructor Details
#initialize(client, options = {}) ⇒ Worker
Returns a new instance of Worker.
5 6 7 8 9 10 11 12 13 14 |
# File 'lib/background_proxy/worker.rb', line 5 def initialize(client, = {}) @queue = Queue.new @client = client @logger = [:logger] spawn_threads!(.fetch(:threads, 3)) at_exit do check_background_queue until @queue.empty? end end |
Instance Method Details
#async(method, *args) ⇒ Object
32 33 34 35 36 37 38 |
# File 'lib/background_proxy/worker.rb', line 32 def async(method, *args) log :debug, "async run: #{method}, #{args}" @queue.push({ method: method, args: args }) end |
#check_background_queue(thread_num = 0) ⇒ Object
55 56 57 58 59 60 61 62 63 |
# File 'lib/background_proxy/worker.rb', line 55 def check_background_queue(thread_num = 0) log :debug, "Checking background queue on thread #{thread_num} (#{self.current_thread_count} active)" while !@queue.empty? p = @queue.pop(true) rescue next; @client.send(p[:method], *p[:args]) end end |
#current_thread_count ⇒ Object
20 21 22 |
# File 'lib/background_proxy/worker.rb', line 20 def current_thread_count Thread.list.count {|t| t[:background_logger] == self.object_id} end |
#current_threads ⇒ Object
16 17 18 |
# File 'lib/background_proxy/worker.rb', line 16 def current_threads Thread.list.select {|t| t[:background_logger] == self.object_id} end |
#log(level, msg) ⇒ Object
65 66 67 68 |
# File 'lib/background_proxy/worker.rb', line 65 def log(level, msg) return unless @logger @logger.log(level, msg) end |
#spawn_threads!(num_threads) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/background_proxy/worker.rb', line 40 def spawn_threads!(num_threads) num_threads.times do |thread_num| log :debug, "Spawning background worker thread #{thread_num}." Thread.new do Thread.current[:background_logger] = self.object_id while !stopped? self.check_background_queue(thread_num) sleep rand(SLEEP_INTERVAL) end end end end |
#stop! ⇒ Object
28 29 30 |
# File 'lib/background_proxy/worker.rb', line 28 def stop! @stopped = true end |
#stopped? ⇒ Boolean
24 25 26 |
# File 'lib/background_proxy/worker.rb', line 24 def stopped? !!@stopped end |