Class: Factor::Runtime::MessageBus

Inherits:
Object
  • Object
show all
Defined in:
lib/runtime/message_bus.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(email, token) ⇒ MessageBus

Returns a new instance of MessageBus.



10
11
12
13
14
15
# File 'lib/runtime/message_bus.rb', line 10

def initialize(email,token)
  @host = "queue.factor.io"
  @vhost = email
  @username=email
  @token=token
end

Instance Attribute Details

#channelObject

Returns the value of attribute channel.



8
9
10
# File 'lib/runtime/message_bus.rb', line 8

def channel
  @channel
end

#connectionObject

Returns the value of attribute connection.



8
9
10
# File 'lib/runtime/message_bus.rb', line 8

def connection
  @connection
end

#exchangeObject

Returns the value of attribute exchange.



8
9
10
# File 'lib/runtime/message_bus.rb', line 8

def exchange
  @exchange
end

#hostObject

Returns the value of attribute host.



8
9
10
# File 'lib/runtime/message_bus.rb', line 8

def host
  @host
end

#queueObject

Returns the value of attribute queue.



8
9
10
# File 'lib/runtime/message_bus.rb', line 8

def queue
  @queue
end

#tokenObject

Returns the value of attribute token.



8
9
10
# File 'lib/runtime/message_bus.rb', line 8

def token
  @token
end

#usernameObject

Returns the value of attribute username.



8
9
10
# File 'lib/runtime/message_bus.rb', line 8

def username
  @username
end

#vhostObject

Returns the value of attribute vhost.



8
9
10
# File 'lib/runtime/message_bus.rb', line 8

def vhost
  @vhost
end

Instance Method Details

#closeObject



56
57
58
59
# File 'lib/runtime/message_bus.rb', line 56

def close
  @connection.close{ EventMachine.stop }

end

#listen(routing_key = "#", &code) ⇒ Object

creates a new queue to listen to the topic exchange



33
34
35
36
37
38
39
40
41
42
43
# File 'lib/runtime/message_bus.rb', line 33

def listen(routing_key="#",&code)
  queue_name=SecureRandom.hex
  @queue = @channel.queue(queue_name)
  @queue.bind(@exchange, :routing_key=>routing_key) # bind queue to the Exchange

  @queue.subscribe do |headers,payload|
    message = Message.new
    message.from_queue headers.routing_key, payload
    code.call(message)
  end
end

#send(message) ⇒ Object



45
46
47
# File 'lib/runtime/message_bus.rb', line 45

def send(message)
  @exchange.publish(message.payload,:routing_key => message.route)
end

#send_and_close(message) ⇒ Object



49
50
51
52
53
54
# File 'lib/runtime/message_bus.rb', line 49

def send_and_close(message)
  send(message)

  EM.add_timer(1, Proc.new { close})

end

#start(topic = "workflow", &code) ⇒ Object

Creates the connection and creates a topic exchange An exchange references a place to send messages to the exchange routes it to the queues based on the route_key



21
22
23
24
25
26
27
28
29
30
# File 'lib/runtime/message_bus.rb', line 21

def start(topic="workflow",&code)
  EventMachine.run do
    #connection_settings={:host=>@host,:user=>@username,:password=>@token,:vhost=>@vhost}
    connection_settings={:host=>@host}
    @connection = AMQP.connect(connection_settings)
    @channel  = AMQP::Channel.new(connection)
    @exchange = @channel.topic(topic,:auto_delete=>true) # new topic exchange
    code.call
  end
end