Module: GirlFriday
- Defined in:
- lib/girl_friday/server.rb,
lib/girl_friday.rb,
lib/girl_friday/batch.rb,
lib/girl_friday/version.rb,
lib/girl_friday/work_queue.rb,
lib/girl_friday/persistence.rb,
lib/girl_friday/error_handler.rb
Overview
QUEUE1 = GirlFriday::Queue.new(‘ham_cannon’, :size => 15) do |msg|
puts msg
end QUEUE2 = GirlFriday::Queue.new(‘image_crawler’, :size => 5) do |msg|
puts msg
end
Defined Under Namespace
Modules: ErrorHandler, Store Classes: Batch, Server, WorkQueue
Constant Summary collapse
- VERSION =
"0.11.2"
- Queue =
WorkQueue
Class Method Summary collapse
- .add_queue(ref) ⇒ Object
- .queues ⇒ Object
- .remove_queue(ref) ⇒ Object
-
.shutdown!(timeout = 30) ⇒ Object
Notify girl_friday to shutdown ASAP.
- .status ⇒ Object
Class Method Details
.add_queue(ref) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/girl_friday.rb', line 24 def self.add_queue(ref) @lock.synchronize do @queues ||= [] @queues.reject! { |q| !q.weakref_alive? } @queues << ref end end |
.queues ⇒ Object
38 39 40 |
# File 'lib/girl_friday.rb', line 38 def self.queues @queues || [] end |
.remove_queue(ref) ⇒ Object
32 33 34 35 36 |
# File 'lib/girl_friday.rb', line 32 def self.remove_queue(ref) @lock.synchronize do @queues.delete ref end end |
.shutdown!(timeout = 30) ⇒ Object
Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.
Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/girl_friday.rb', line 60 def self.shutdown!(timeout=30) qs = queues.select { |q| q.weakref_alive? } count = qs.size if count > 0 m = Mutex.new var = ConditionVariable.new qs.each do |q| next if !q.weakref_alive? begin q.__getobj__.shutdown do |queue| m.synchronize do count -= 1 var.signal if count == 0 end end rescue WeakRef::RefError m.synchronize do count -= 1 var.signal if count == 0 end end end m.synchronize do var.wait(m, timeout) if count != 0 end end count end |
.status ⇒ Object
42 43 44 45 46 47 48 49 50 |
# File 'lib/girl_friday.rb', line 42 def self.status queues.inject({}) do |memo, queue| begin memo = memo.merge(queue.__getobj__.status) rescue WeakRef::RefError end memo end end |