Class: MassTransit::Bus

Inherits:
Object
  • Object
show all
Defined in:
lib/masstransit/bus.rb

Overview

The bus abstracts the desired transportation, and manages the callbacks orchestrates the threading

three main queues (data, control, poison)

Instance Method Summary collapse

Constructor Details

#initialize(conf) ⇒ Bus


7
8
9
10
11
12
13
14
# File 'lib/masstransit/bus.rb', line 7

def initialize(conf)
  @subscriptions = {}
  @serializer = conf.serializer
  @transport = conf.transport
  @queue = conf.queue
  
  @transport.open(conf)
end

Instance Method Details

#closeObject


49
50
51
# File 'lib/masstransit/bus.rb', line 49

def close()
  
end

#consume(rmsg) ⇒ Object

takes a rabbitmq message, strips off the noise and gets back to an envelope


65
66
67
68
69
70
71
72
# File 'lib/masstransit/bus.rb', line 65

def consume(rmsg)
  data = @transport.get_message(rmsg)
  #payload is a string
  env = @serializer.deserialize data
  puts 'oeuaoeusatoehuntsaoheuntsahoeusnth'
  puts env
  deliver(env)
end

#deliver(env) ⇒ Object

for local distribution


75
76
77
78
79
80
81
82
# File 'lib/masstransit/bus.rb', line 75

def deliver(env)
  consumers = @subscriptions[env.MessageType]
  consumers = [] if consumers.nil?
  consumers.each do |c|
    obj = @serializer.deserialize(env.Message)
    c.call obj
  end
end

#publish(message) ⇒ Object

this will publish the message object to an exchange in rabbitmq that is equal to the message class name. this is a direct concept from .net and should be adopted into a more ruby manner


56
57
58
59
60
61
# File 'lib/masstransit/bus.rb', line 56

def publish(message)
  envelope = @transport.create_message(message, @serializer)
  data = @serializer.serialize(envelope)
  
  @transport.publish(envelope.MessageType, data) #exchange?
end

#startObject

tells the bus to start listening for messages this method blocks forver. Need to implement better ctrl-c support


42
43
44
45
46
47
# File 'lib/masstransit/bus.rb', line 42

def start()
  #start listening
  @transport.monitor do |rmsg|
    consume(rmsg)
  end
end

#subscribe(message_name, &block) ⇒ Object

this will register an exchange in rabbit for the 'message_name' and then bind the queue to that exchange. it then sets the subscriptions to the callback provided


19
20
21
22
23
24
25
26
# File 'lib/masstransit/bus.rb', line 19

def subscribe(message_name, &block)
  @subscriptions = {} if @subscriptions.nil?
  consumers = @subscriptions[message_name]
  consumers = [] if consumers.nil?
  consumers << block
  @subscriptions[message_name] = consumers
  @transport.bind message_name
end

#unsubscribe(message_name) ⇒ Object

this will unregister the queue with the exchange in rabbitmq for the message_name. It then removes the callbacks in the subscriptions


31
32
33
34
35
36
37
# File 'lib/masstransit/bus.rb', line 31

def unsubscribe(message_name)
  @transport.unbind(message_name, @queue)
  
  #has key check
  @subscriptions.delete(message_name)

end