Class: PipelineToolkit::Cucumber::AMQP

Inherits:
Object
  • Object
show all
Defined in:
lib/pipeline_toolkit/cucumber/amqp.rb

Instance Method Summary collapse

Constructor Details

#initializeAMQP

Returns a new instance of AMQP.



8
9
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 8

def initialize
end

Instance Method Details

#closeObject



20
21
22
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 20

def close
  @mq.stop
end

#connected?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 16

def connected?
  @mq.connected?
end

#purge_queue(queue) ⇒ Object



24
25
26
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 24

def purge_queue(queue)
  @mq.queue(queue).purge
end

#queue_size(queue) ⇒ Object



28
29
30
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 28

def queue_size(queue)
  @mq.queue(queue).status[:message_count]
end

#receive_messages(queue, number, timeout = 5) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 41

def receive_messages(queue, number, timeout = 5)
  received_msgs = []
  Timeout::timeout(timeout) do
    number.times do
      message = @mq.queue(queue).pop
      sleep(0.1)
      retry if message[:payload] == :queue_empty
      received_msgs << MessageCoder.decode(message[:payload])
    end
  end
  received_msgs
rescue Timeout::Error
  raise "Timed out waiting for AMQP messages. Received #{received_msgs.size}"
end

#send_messages(exchange, messages, exchange_type = :fanout) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 32

def send_messages(exchange, messages, exchange_type = :fanout)
  exchange = @mq.exchange(exchange, :passive => true, :type => exchange_type)
  messages.each do |message|
    opts = message.has_key?('routing_key') ? {:key => message['routing_key']} : {}
    message.default = nil
    exchange.publish(MessageCoder.encode(message), opts)
  end
end

#start(host, port) ⇒ Object



11
12
13
14
# File 'lib/pipeline_toolkit/cucumber/amqp.rb', line 11

def start(host, port)
  @mq = Bunny.new(:host => host, :port => port)
  @mq.start
end