Class: MassTransit::Amqp

Inherits:
Object
  • Object
show all
Defined in:
lib/masstransit/transports/amqp.rb

Overview

The wrapper class on top of AMQP, that provides the standard masstransit ‘transport’ api

Instance Method Summary collapse

Instance Method Details

#bind(exchange) ⇒ Object

binds the queue to the exchange



38
39
40
41
42
# File 'lib/masstransit/transports/amqp.rb', line 38

def bind(exchange)
  ex = @client.exchange(exchange, :type=>:fanout, :durable=>true)
  q = @client.queue(@queue)
  q.bind(ex)
end

#closeObject

closes the connection to the Amqp server



24
25
26
# File 'lib/masstransit/transports/amqp.rb', line 24

def close()
  @client.close()
end

#create_message(data, serializer) ⇒ Object

creates a transport ready message object



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/masstransit/transports/amqp.rb', line 49

def create_message(data, serializer)
  msg_name = data.class.name.gsub("::",".")
  msg = data
  
  env = Envelope.new
  env.MessageType = msg_name
  #this needs to be a string for .net
  env.Message = serializer.serialize(msg)
  
  return env
end

#get_message(rmsg) ⇒ Object



61
62
63
# File 'lib/masstransit/transports/amqp.rb', line 61

def get_message(rmsg)
  return rmsg[:payload]
end

#monitor(&block) ⇒ Object



65
66
67
68
69
70
71
72
# File 'lib/masstransit/transports/amqp.rb', line 65

def monitor(&block)
  #basic consume / pop loop here
  q = @client.queue(@queue)
  q.subscribe(:consumer_tag => 'testtag1', :timeout => 30) do |msg|
    block.call msg
  end

end

#open(config) ⇒ Object

opens a connection to the Amqp server



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/masstransit/transports/amqp.rb', line 9

def open(config)
  @client = Bunny.new(
    :logging=>true,
    :host => config.server,
    :port   => config.port,
    :user   => config.user,
    :pass   => config.password,
    :vhost  => config.vdir,
    :insist => config.insist
  )
  @queue = config.queue
  @client.start
end

#publish(exchange, data) ⇒ Object



79
80
81
82
# File 'lib/masstransit/transports/amqp.rb', line 79

def publish(exchange, data)
  ex = @client.exchange(exchange, :type=>:fanout, :durable=>true)
  ex.publish(data)
end

#queue_declare(name) ⇒ Object

declares a queue on the server



29
30
31
# File 'lib/masstransit/transports/amqp.rb', line 29

def queue_declare(name)
  @client.queue(name)
end

#queue_delete(name) ⇒ Object



33
34
35
# File 'lib/masstransit/transports/amqp.rb', line 33

def queue_delete(name)
  #how to?
end

#send(queue, data) ⇒ Object

pushes the message onto the exchange



75
76
77
# File 'lib/masstransit/transports/amqp.rb', line 75

def send(queue, data)
  @client.queue(queue).publish(data)
end

#unbind(queue, exchange) ⇒ Object

unbnids the queue from the exchange



45
46
# File 'lib/masstransit/transports/amqp.rb', line 45

def unbind(queue, exchange)
end