Module: Isono::AmqpClient

Included in:
Node
Defined in:
lib/isono/amqp_client.rb

Overview

AMQP Client module for master and agent

class Client

include Isono::AmqpClient

def logger()
end

end

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#amqp_clientObject (readonly)

Returns the value of attribute amqp_client.



50
51
52
# File 'lib/isono/amqp_client.rb', line 50

def amqp_client
  @amqp_client
end

#mqObject (readonly)

Returns the value of attribute mq.



50
51
52
# File 'lib/isono/amqp_client.rb', line 50

def mq
  @mq
end

Instance Method Details

#after_closeObject



145
146
# File 'lib/isono/amqp_client.rb', line 145

def after_close
end

#after_connectObject



139
140
# File 'lib/isono/amqp_client.rb', line 139

def after_connect
end

#amqObject



109
110
111
112
# File 'lib/isono/amqp_client.rb', line 109

def amq
  raise 'AMQP connection is not established yet' unless connected?
  Thread.current[:mq]
end

#amqp_server_uriObject



52
53
54
55
56
57
58
59
# File 'lib/isono/amqp_client.rb', line 52

def amqp_server_uri
  raise "The connection is not established yet." unless @amqp_client && connected?

  URI::AMQP.build(:host => @amqp_client.settings[:host],
                  :port => @amqp_client.settings[:port],
                  :path => @amqp_client.settings[:vhost]
                  )
end

#before_closeObject



142
143
# File 'lib/isono/amqp_client.rb', line 142

def before_close
end

#before_connectObject



136
137
# File 'lib/isono/amqp_client.rb', line 136

def before_connect
end

#close(&blk) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/isono/amqp_client.rb', line 148

def close(&blk)
  return unless connected?

  prepare_close {
    @amqp_client.close {
      begin
        on_close
        after_close
        blk.call if blk
      ensure
        @amqp_client = nil
        Thread.current[:mq] = nil
      end
    }
  }
end

#connect(broker_uri, *args, &blk) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/isono/amqp_client.rb', line 61

def connect(broker_uri, *args, &blk)
  raise "the connection is still alive for: #{amqp_server_uri}" if connected?

  broker_uri = URI.parse(broker_uri.to_s) unless broker_uri.is_a?(URI)
  default = ::AMQP.settings
  opts = {:host => broker_uri.host, 
    :port => broker_uri.port || default[:port],
    :vhost => broker_uri.vhost || default[:vhost],
    :user=>broker_uri.user || default[:user],
    :pass=>broker_uri.password ||default[:pass]
  }
  opts.merge!(args) if args.is_a?(Hash)

  prepare_connect {
    @amqp_client = ::AMQP.connect(opts)
    @amqp_client.instance_eval {
      def settings
        @settings
      end
    }
    @amqp_client.connection_status { |t|
      case t
      when :connected
        # here is tried also when reconnected
        on_connect
      when :disconnected
        on_disconnected
      end
    }
    # the block argument is called once at the initial connection.
    @amqp_client.callback {
      after_connect
      if blk
        blk.arity == 1 ? blk.call(self) : blk.call
      end
    }
    @amqp_client.errback {
      logger.error("Failed to connect to the broker: #{amqp_server_uri}")
      blk.call(self) if blk && blk.arity == 1
    }
  }
  self
end

#connected?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/isono/amqp_client.rb', line 105

def connected?
  !@amqp_client.nil? && @amqp_client.connected?
end

#create_channelObject

Note:

Do not have to close by user. Channel close is performed as part of connection close.

Create new AMQP channel object



169
170
171
# File 'lib/isono/amqp_client.rb', line 169

def create_channel
  AMQP::Channel.new(@amqp_client)
end

#on_closeObject



133
134
# File 'lib/isono/amqp_client.rb', line 133

def on_close
end

#on_connectObject



114
115
# File 'lib/isono/amqp_client.rb', line 114

def on_connect
end

#on_disconnectedObject



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/isono/amqp_client.rb', line 117

def on_disconnected
  logger.info("AMQP connection disconnected")
  prepare_close {
    @amqp_client.close {
      begin
        on_close
        after_close
      ensure
        @amqp_client = nil
        Thread.current[:mq] = nil
        EM.stop { exit }
      end
    }
  }
end

#publish_to(exname, message, opts = {}) ⇒ void

This method returns an undefined value.

Publish a message to the designated exchange.

Examples:

Want to broadcast the data to all bound queues:

publish_to('topic exchange', 'data', :key=>'*')

Want to send the data to the specific queue(s):

publish_to('exchange name', 'group.1', 'data')

Parameters:

  • exname (String)

    The exchange name

  • message (String)

    Message body to be sent

  • opts (Hash) (defaults to: {})

    Options with the message. :key => ‘keyname’



185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/isono/amqp_client.rb', line 185

def publish_to(exname, message, opts={})
  EventMachine.schedule {
    ex = amq.exchanges[exname] || raise("Undefined exchange name : #{exname}")
    case ex.type
    when :topic
      unless opts.has_key? :key
        opts[:key] = '*'
      end
    end
    ex.publish(Serializer.instance.marshal(message), opts)
  }
end