Class: BatchAny::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/batch_any/manager.rb

Defined Under Namespace

Classes: FiberError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeManager

Returns a new instance of Manager.



14
15
16
17
# File 'lib/batch_any/manager.rb', line 14

def initialize
  @computations = []
  @awaiting_services = {}
end

Instance Attribute Details

#exceptionsObject (readonly)

Returns the value of attribute exceptions.



12
13
14
# File 'lib/batch_any/manager.rb', line 12

def exceptions
  @exceptions
end

Instance Method Details

#add_computation(&block) ⇒ Object



19
20
21
22
23
24
25
26
# File 'lib/batch_any/manager.rb', line 19

def add_computation(&block)
  fiber = Fiber.new do
    Thread.current[:batch_any_manager] = self
    block.call
  end
  @computations << fiber
  fiber
end

#enqueue_item(item) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/batch_any/manager.rb', line 53

def enqueue_item(item)
  service_class = item.service_class
  @awaiting_services[service_class] ||= []
  awaiting_services = @awaiting_services[service_class]
  service = awaiting_services.find { |service| service.can_serve?(item) }
  if service
    service.items << item
  else
    awaiting_services << item.service_class.new(item)
  end
  Fiber.yield
end

#runObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/batch_any/manager.rb', line 28

def run
  @exceptions = []
  while @computations.any?
    @computations.each do |computation|
      begin
        computation.resume
      rescue => e
        @exceptions << FiberError.new(computation, e)
      end
    end
    linear_keep_if!(@computations, &:alive?)
    @awaiting_services.values.each do |services|
      services.each do |service|
        begin
          service.fetch
        rescue => e
          service.items.each { |item| item.exception = e }
        end
      end
    end
    @awaiting_services.clear
  end
  @exceptions
end