Class: GirlFriday::WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/girl_friday/work_queue.rb

Defined Under Namespace

Classes: Ready, Shutdown, Work

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, options = {}, &block) ⇒ WorkQueue

Returns a new instance of WorkQueue.

Raises:

  • (ArgumentError)


9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/girl_friday/work_queue.rb', line 9

def initialize(name, options={}, &block)
  raise ArgumentError, "#{self.class.name} requires a block" unless block_given?
  @name = name.to_s
  @size = options[:size] || 5
  @processor = block
  @error_handlers = (Array(options[:error_handler] || ErrorHandler.default)).map(&:new)

  @shutdown = false
  @shutting_down = false
  @busy_workers = []
  @ready_workers = nil
  @created_at = Time.now.to_i
  @total_processed = @total_errors = @total_queued = 0
  @persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || {}))
  @weakref = WeakRef.new(self)
  start
  GirlFriday.add_queue @weakref
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



8
9
10
# File 'lib/girl_friday/work_queue.rb', line 8

def name
  @name
end

Class Method Details

.immediate!Object



28
29
30
31
# File 'lib/girl_friday/work_queue.rb', line 28

def self.immediate!
  alias_method :push, :push_immediately
  alias_method :<<, :push_immediately
end

.queue!Object



33
34
35
36
# File 'lib/girl_friday/work_queue.rb', line 33

def self.queue!
  alias_method :push, :push_async
  alias_method :<<, :push_async
end

Instance Method Details

#push_async(work, &block) ⇒ Object Also known as: push, <<



44
45
46
# File 'lib/girl_friday/work_queue.rb', line 44

def push_async(work, &block)
  @supervisor << Work[work, block]
end

#push_immediately(work, &block) ⇒ Object



38
39
40
41
42
# File 'lib/girl_friday/work_queue.rb', line 38

def push_immediately(work, &block)
  result = @processor.call(work)
  return yield result if block
  result
end

#shutdown(&block) ⇒ Object



74
75
76
77
78
# File 'lib/girl_friday/work_queue.rb', line 74

def shutdown(&block)
  # Runtime state should never be modified by caller thread,
  # only the Supervisor thread.
  @supervisor << Shutdown[block]
end

#statusObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/girl_friday/work_queue.rb', line 50

def status
  { @name => {
      :pid => $$,
      :pool_size => @size,
      :ready => @ready_workers ? @ready_workers.size : 0,
      :busy => @busy_workers.size,
      :backlog => @persister.size,
      :total_queued => @total_queued,
      :total_processed => @total_processed,
      :total_errors => @total_errors,
      :uptime => Time.now.to_i - @created_at,
      :created_at => @created_at,
    }
  }
end

#wait_for_emptyObject

Busy wait for the queue to empty. Useful for testing.



68
69
70
71
72
# File 'lib/girl_friday/work_queue.rb', line 68

def wait_for_empty
  until @persister.size == 0
    sleep 0.1
  end
end

#working?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/girl_friday/work_queue.rb', line 80

def working?
  @busy_workers.size > 0
end