Module: QueueMap

Extended by:
QueueMap
Included in:
QueueMap
Defined in:
lib/queue_map.rb,
lib/queue_map/version.rb

Defined Under Namespace

Classes: Consumer

Constant Summary collapse

BUNNY_MUTEX =
Mutex.new
DEFAULT_ON_TIMEOUT =
lambda { |r| nil }
VERSION =
"0.7"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connection_infoObject

Returns the value of attribute connection_info.



13
14
15
# File 'lib/queue_map.rb', line 13

def connection_info
  @connection_info
end

#consumer_base_pathObject



21
22
23
# File 'lib/queue_map.rb', line 21

def consumer_base_path
  @consumer_base_path ||= "lib/consumers"
end

#consumer_pathObject

Returns the value of attribute consumer_path.



10
11
12
# File 'lib/queue_map.rb', line 10

def consumer_path
  @consumer_path
end

#modeObject

Returns the value of attribute mode.



10
11
12
# File 'lib/queue_map.rb', line 10

def mode
  @mode
end

Instance Method Details

#consumer(name) ⇒ Object



73
74
75
# File 'lib/queue_map.rb', line 73

def consumer(name)
  consumers[name] ||= QueueMap::Consumer.from_file(consumer_path[name], :strategy => mode || :fork)
end

#consumersObject



69
70
71
# File 'lib/queue_map.rb', line 69

def consumers
  @consumers ||= { }
end

#map(collection, name, options = {}) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/queue_map.rb', line 36

def map(collection, name, options = {})
  return queue_map_internal(collection, name) if mode == :test

  with_bunny do |bunny|
    q = bunny.queue(name.to_s)
    response_queue_name = response_queue_name(name)
    response_queue = bunny.queue(response_queue_name, :durable => false, :exclusive => true, :auto_delete => true)

    (0..(collection.length - 1)).each do |i|
      q.publish(Marshal.dump(:input => collection[i], :index => i, :response_queue => response_queue_name))
    end

    results = {}
    begin
      Timeout.timeout(options[:timeout] || 5) do
        response_queue.subscribe(:message_max => collection.length) do |msg|
          response = Marshal.load(msg)
          results[response[:index]] = response[:result]
        end
      end
    rescue Timeout::Error => e
    end

    (0..(collection.length - 1)).map do |i|
      results[i] || (options[:on_timeout] || DEFAULT_ON_TIMEOUT).call(collection[i])
    end
  end
end

#new_bunny_connectionObject



77
78
79
80
81
82
83
# File 'lib/queue_map.rb', line 77

def new_bunny_connection
  BUNNY_MUTEX.synchronize do
    bunny = Bunny.new((@connection_info || { }).merge(:spec => '08'))
    bunny.start
    bunny
  end
end

#queue_map_internal(collection, name, *args) ⇒ Object



65
66
67
# File 'lib/queue_map.rb', line 65

def queue_map_internal(collection, name, *args)
  collection.map(&consumer(name).worker_proc)
end

#response_queue_name(name) ⇒ Object



31
32
33
34
# File 'lib/queue_map.rb', line 31

def response_queue_name(name)
  @inc ||= 0
  "#{name}_response_#{unique_name}_#{@inc += 1}"
end

#unique_nameObject



17
18
19
# File 'lib/queue_map.rb', line 17

def unique_name
  @unique_name ||= "#{`hostname`.chomp}-#{Process.pid}-#{Time.now.usec}"
end

#with_bunny(&block) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
# File 'lib/queue_map.rb', line 85

def with_bunny(&block)
  bunny = new_bunny_connection
  begin
    yield bunny
  ensure
    BUNNY_MUTEX.synchronize do
      bunny.stop rescue nil
      bunny.close_connection rescue nil
    end
  end
end