Module: Isono::AmqpClient

Included in:
Node
Defined in:
lib/isono/amqp_client.rb

Overview

AMQP Client module for master and agent

class Client

include Isono::AmqpClient

def logger()
end

end

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#amqp_clientObject (readonly)

Returns the value of attribute amqp_client.



50
51
52
# File 'lib/isono/amqp_client.rb', line 50

def amqp_client
  @amqp_client
end

#mqObject (readonly)

Returns the value of attribute mq.



50
51
52
# File 'lib/isono/amqp_client.rb', line 50

def mq
  @mq
end

Instance Method Details

#after_closeObject



132
133
# File 'lib/isono/amqp_client.rb', line 132

def after_close
end

#after_connectObject



126
127
# File 'lib/isono/amqp_client.rb', line 126

def after_connect
end

#amqObject



109
110
111
112
# File 'lib/isono/amqp_client.rb', line 109

def amq
  raise 'AMQP connection is not established yet' unless connected?
  Thread.current[:mq]
end

#amqp_server_uriObject



52
53
54
55
56
57
58
59
# File 'lib/isono/amqp_client.rb', line 52

def amqp_server_uri
  raise "The connection is not established yet." unless @amqp_client && connected?

  URI::AMQP.build(:host => @amqp_client.settings[:host],
                  :port => @amqp_client.settings[:port],
                  :path => @amqp_client.settings[:vhost]
                  )
end

#before_closeObject



129
130
# File 'lib/isono/amqp_client.rb', line 129

def before_close
end

#before_connectObject



123
124
# File 'lib/isono/amqp_client.rb', line 123

def before_connect
end

#close(&blk) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/isono/amqp_client.rb', line 135

def close(&blk)
  return unless connected?

  prepare_close {
    @amqp_client.close {
      begin
        on_close
        after_close
        blk.call if blk
      ensure
        @amqp_client = nil
        Thread.current[:mq] = nil
      end
    }
  }
end

#connect(broker_uri, *args, &blk) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/isono/amqp_client.rb', line 61

def connect(broker_uri, *args, &blk)
  raise "the connection is still alive for: #{amqp_server_uri}" if connected?

  broker_uri = URI.parse(broker_uri.to_s) unless broker_uri.is_a?(URI)
  default = ::AMQP.settings
  opts = {:host => broker_uri.host, 
    :port => broker_uri.port || default[:port],
    :vhost => broker_uri.vhost || default[:vhost],
    :user=>broker_uri.user || default[:user],
    :pass=>broker_uri.password ||default[:pass]
  }
  opts.merge!(args) if args.is_a?(Hash)

  prepare_connect {
    @amqp_client = ::AMQP.connect(opts)
    @amqp_client.instance_eval {
      def settings
        @settings
      end
    }
    @amqp_client.connection_status { |t|
      case t
      when :connected
        # here is tried also when reconnected
        on_connect
      when :disconnected
        on_disconnected
      end
    }
    # the block argument is called once at the initial connection.
    @amqp_client.callback {
      after_connect
      if blk
        blk.arity == 1 ? blk.call(self) : blk.call
      end
    }
    @amqp_client.errback {
      logger.error("Failed to connect to the broker: #{amqp_server_uri}")
      blk.call(self) if blk && blk.arity == 1
    }
  }
  self
end

#connected?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/isono/amqp_client.rb', line 105

def connected?
  !@amqp_client.nil? && @amqp_client.connected?
end

#create_channelObject

Note:

Do not have to close by user. Channel close is performed as part of connection close.

Create new AMQP channel object



156
157
158
# File 'lib/isono/amqp_client.rb', line 156

def create_channel
  AMQP::Channel.new(@amqp_client)
end

#on_closeObject



120
121
# File 'lib/isono/amqp_client.rb', line 120

def on_close
end

#on_connectObject



114
115
# File 'lib/isono/amqp_client.rb', line 114

def on_connect
end

#on_disconnectedObject



117
118
# File 'lib/isono/amqp_client.rb', line 117

def on_disconnected
end

#publish_to(exname, message, opts = {}) ⇒ void

This method returns an undefined value.

Publish a message to the designated exchange.

Examples:

Want to broadcast the data to all bound queues:

publish_to('topic exchange', 'data', :key=>'*')

Want to send the data to the specific queue(s):

publish_to('exchange name', 'group.1', 'data')

Parameters:

  • exname (String)

    The exchange name

  • message (String)

    Message body to be sent

  • opts (Hash) (defaults to: {})

    Options with the message. :key => ‘keyname’



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/isono/amqp_client.rb', line 172

def publish_to(exname, message, opts={})
  EventMachine.schedule {
    ex = amq.exchanges[exname] || raise("Undefined exchange name : #{exname}")
    case ex.type
    when :topic
      unless opts.has_key? :key
        opts[:key] = '*'
      end
    end
    ex.publish(Serializer.instance.marshal(message), opts)
  }
end