Class: Workling::Clients::AmqpExchangeClient

Inherits:
BrokerBase show all
Defined in:
lib/workling/clients/amqp_exchange_client.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from BrokerBase

#dispatch

Methods inherited from Base

#dispatch, installed?, #logger

Class Method Details

.loadObject



8
9
10
11
12
13
14
15
16
17
# File 'lib/workling/clients/amqp_exchange_client.rb', line 8

def self.load
  begin
    require 'mq'
  rescue Exception => e
    raise WorklingError.new(
      "WORKLING: couldn't find the ruby amqp client - you need it for the amqp runner. " \
      "Install from github: gem sources -a http://gems.github.com/ && sudo gem install tmm1-amqp "
    )
  end
end

Instance Method Details

#closeObject

no need for explicit closing. when the event loop terminates, the connection is closed anyway.



30
# File 'lib/workling/clients/amqp_exchange_client.rb', line 30

def close; true; end

#connectObject

starts the client.



20
21
22
23
24
25
26
# File 'lib/workling/clients/amqp_exchange_client.rb', line 20

def connect
  begin
    @amq = MQ.new
  rescue
    raise WorklingError.new("couldn't start amq client. if you're running this in a server environment, then make sure the server is evented (ie use thin or evented mongrel, not normal mongrel.)")
  end
end

#request(exchange_name, value) ⇒ Object

publish message to exchange using the specified routing key



47
48
49
50
51
52
53
54
# File 'lib/workling/clients/amqp_exchange_client.rb', line 47

def request(exchange_name, value)
  exchange_name = "amq.topic"

  key = value.delete(:routing_key)
  msg = YAML.dump(value)
  exchange = @amq.topic(exchange_name)
  exchange.publish(msg, :routing_key => key)
end

#retrieve(key) ⇒ Object

request and retrieve work



41
42
43
# File 'lib/workling/clients/amqp_exchange_client.rb', line 41

def retrieve(key)
    @amq.queue(key)
end

#subscribe(key) ⇒ Object

subscribe to a queue



33
34
35
36
37
38
# File 'lib/workling/clients/amqp_exchange_client.rb', line 33

def subscribe(key)
  @amq.queue(key).subscribe do |header, body|
    value = YAML.load(body)
    yield value
  end
end