Module: Isono::AmqpClient

Included in:
Node
Defined in:
lib/isono/amqp_client.rb

Overview

AMQP Client module for master and agent

class Client

include Isono::AmqpClient

def logger()
end

end

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#amqp_clientObject (readonly)

Returns the value of attribute amqp_client.



50
51
52
# File 'lib/isono/amqp_client.rb', line 50

def amqp_client
  @amqp_client
end

#mqObject (readonly)

Returns the value of attribute mq.



50
51
52
# File 'lib/isono/amqp_client.rb', line 50

def mq
  @mq
end

Instance Method Details

#after_closeObject



140
141
# File 'lib/isono/amqp_client.rb', line 140

def after_close
end

#after_connectObject



134
135
# File 'lib/isono/amqp_client.rb', line 134

def after_connect
end

#amqObject



117
118
119
120
# File 'lib/isono/amqp_client.rb', line 117

def amq
  raise 'AMQP connection is not established yet' unless connected?
  Thread.current[:mq]
end

#amqp_server_uriObject



52
53
54
55
56
57
58
59
# File 'lib/isono/amqp_client.rb', line 52

def amqp_server_uri
  raise "The connection is not established yet." unless @amqp_client && connected?

  URI::AMQP.build(:host => @amqp_client.settings[:host],
                  :port => @amqp_client.settings[:port],
                  :path => @amqp_client.settings[:vhost]
                  )
end

#before_closeObject



137
138
# File 'lib/isono/amqp_client.rb', line 137

def before_close
end

#before_connectObject



131
132
# File 'lib/isono/amqp_client.rb', line 131

def before_connect
end

#close(&blk) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/isono/amqp_client.rb', line 143

def close(&blk)
  return unless connected?

  prepare_close {
    @amqp_client.close {
      begin
        on_close
        after_close
        blk.call if blk
      ensure
        @amqp_client = nil
        Thread.current[:mq] = nil
      end
    }
  }
end

#connect(broker_uri, *args, &blk) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/isono/amqp_client.rb', line 61

def connect(broker_uri, *args, &blk)
  raise "the connection is still alive for: #{amqp_server_uri}" if connected?

  broker_uri = URI.parse(broker_uri.to_s) unless broker_uri.is_a?(URI)
  default = ::AMQP.settings
  opts = {:host => broker_uri.host, 
    :port => broker_uri.port || default[:port],
    :vhost => broker_uri.vhost || default[:vhost],
    :user=>broker_uri.user || default[:user],
    :pass=>broker_uri.password ||default[:pass]
  }
  opts.merge!(args) if args.is_a?(Hash)

  prepare_connect {
    @amqp_client = ::AMQP.connect(opts)
    @amqp_client.instance_eval {
      def settings
        @settings
      end

      @on_disconnect = Proc.new do
        # This block will be executed when you start the Agent if the AMQP server has been stopped.
        Isono.at_disconnected.each do |blk|
          blk.call
        end
        blk.call(:error)
      end
    }
    @amqp_client.connection_status { |t|
      case t
      when :connected
        # here is tried also when reconnected
        on_connect
      when :disconnected
        # This block is executed if the AMQP server goes down during startup.
        on_disconnected
        Isono.at_disconnected.each do |blk|
          blk.call
        end
      end
    }
    # the block argument is called once at the initial connection.
    @amqp_client.callback {
      after_connect
      if blk
        blk.arity == 1 ? blk.call(self) : blk.call
      end
    }
  }
  self
end

#connected?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/isono/amqp_client.rb', line 113

def connected?
  !@amqp_client.nil? && @amqp_client.connected?
end

#create_channelObject

Note:

Do not have to close by user. Channel close is performed as part of connection close.

Create new AMQP channel object



164
165
166
# File 'lib/isono/amqp_client.rb', line 164

def create_channel
  AMQP::Channel.new(@amqp_client)
end

#on_closeObject



128
129
# File 'lib/isono/amqp_client.rb', line 128

def on_close
end

#on_connectObject



122
123
# File 'lib/isono/amqp_client.rb', line 122

def on_connect
end

#on_disconnectedObject



125
126
# File 'lib/isono/amqp_client.rb', line 125

def on_disconnected
end

#publish_to(exname, message, opts = {}) ⇒ void

This method returns an undefined value.

Publish a message to the designated exchange.

Examples:

Want to broadcast the data to all bound queues:

publish_to('topic exchange', 'data', :key=>'*')

Want to send the data to the specific queue(s):

publish_to('exchange name', 'group.1', 'data')

Parameters:

  • exname (String)

    The exchange name

  • message (String)

    Message body to be sent

  • opts (Hash) (defaults to: {})

    Options with the message. :key => ‘keyname’



180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/isono/amqp_client.rb', line 180

def publish_to(exname, message, opts={})
  EventMachine.schedule {
    ex = amq.exchanges[exname] || raise("Undefined exchange name : #{exname}")
    case ex.type
    when :topic
      unless opts.has_key? :key
        opts[:key] = '*'
      end
    end
    ex.publish(Serializer.instance.marshal(message), opts)
  }
end