Class: Workling::Clients::AmqpClient

Inherits:
BrokerBase show all
Defined in:
lib/workling/clients/amqp_client.rb

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from BrokerBase

#dispatch

Methods inherited from Base

#dispatch, installed?, #logger

Class Method Details

.loadObject



8
9
10
11
12
13
14
15
16
17
# File 'lib/workling/clients/amqp_client.rb', line 8

def self.load
  begin
    require 'mq'
  rescue Exception => e
    raise WorklingError.new(
      "WORKLING: couldn't find the ruby amqp client - you need it for the amqp runner. " \
      "Install from github: gem sources -a http://gems.github.com/ && sudo gem install tmm1-amqp "
    )
  end
end

Instance Method Details

#closeObject

no need for explicit closing. when the event loop terminates, the connection is closed anyway.



31
# File 'lib/workling/clients/amqp_client.rb', line 31

def close; true; end

#connectObject

starts the client.



20
21
22
23
24
25
26
27
# File 'lib/workling/clients/amqp_client.rb', line 20

def connect
  begin
    connection = AMQP.start((Workling.config[:amqp_options] ||{}).symbolize_keys)
    @amq = MQ.new connection
  rescue
    raise WorklingError.new("couldn't start amq client. if you're running this in a server environment, then make sure the server is evented (ie use thin or evented mongrel, not normal mongrel.)")
  end
end

#request(key, value) ⇒ Object



43
# File 'lib/workling/clients/amqp_client.rb', line 43

def request(key, value); @amq.queue(queue_for(key)).publish(Marshal.dump(value)); end

#retrieve(key) ⇒ Object

request and retrieve work



42
# File 'lib/workling/clients/amqp_client.rb', line 42

def retrieve(key); @amq.queue(queue_for(key)); end

#subscribe(key) ⇒ Object

subscribe to a queue



34
35
36
37
38
39
# File 'lib/workling/clients/amqp_client.rb', line 34

def subscribe(key)
  @amq.queue(queue_for(key)).subscribe do |value|
    data = Marshal.load(value) rescue value
    yield data
  end
end