Module: GirlFriday

Defined in:
lib/girl_friday/server.rb,
lib/girl_friday.rb,
lib/girl_friday/batch.rb,
lib/girl_friday/version.rb,
lib/girl_friday/work_queue.rb,
lib/girl_friday/persistence.rb,
lib/girl_friday/error_handler.rb

Overview

QUEUE1 = GirlFriday::Queue.new(‘ham_cannon’, :size => 15) do |msg|

puts msg

end QUEUE2 = GirlFriday::Queue.new(‘image_crawler’, :size => 5) do |msg|

puts msg

end

Defined Under Namespace

Modules: ErrorHandler, Store Classes: Batch, Server, WorkQueue

Constant Summary collapse

VERSION =
"0.11.2"
Queue =
WorkQueue

Class Method Summary collapse

Class Method Details

.add_queue(ref) ⇒ Object



24
25
26
27
28
29
30
# File 'lib/girl_friday.rb', line 24

def self.add_queue(ref)
  @lock.synchronize do
    @queues ||= []
    @queues.reject! { |q| !q.weakref_alive? }
    @queues << ref
  end
end

.queuesObject



38
39
40
# File 'lib/girl_friday.rb', line 38

def self.queues
  @queues || []
end

.remove_queue(ref) ⇒ Object



32
33
34
35
36
# File 'lib/girl_friday.rb', line 32

def self.remove_queue(ref)
  @lock.synchronize do
    @queues.delete ref
  end
end

.shutdown!(timeout = 30) ⇒ Object

Notify girl_friday to shutdown ASAP. Workers will not pick up any new work; any new work pushed onto the queues will be pushed onto the backlog (and persisted). This method will block until all queues are quiet or the timeout has passed.

Note that shutdown! just works with existing queues. If you create a new queue, it will act as normal.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/girl_friday.rb', line 60

def self.shutdown!(timeout=30)
  qs = queues.select { |q| q.weakref_alive? }
  count = qs.size

  if count > 0
    m = Mutex.new
    var = ConditionVariable.new

    qs.each do |q|
      next if !q.weakref_alive?
      begin
        q.__getobj__.shutdown do |queue|
          m.synchronize do
            count -= 1
            var.signal if count == 0
          end
        end
      rescue WeakRef::RefError
        m.synchronize do
          count -= 1
          var.signal if count == 0
        end
      end
    end

    m.synchronize do
      var.wait(m, timeout) if count != 0
    end
  end
  count
end

.statusObject



42
43
44
45
46
47
48
49
50
# File 'lib/girl_friday.rb', line 42

def self.status
  queues.inject({}) do |memo, queue|
    begin
      memo = memo.merge(queue.__getobj__.status)
    rescue WeakRef::RefError
    end
    memo
  end
end