Module: Isono::AmqpClient

Included in:
Node
Defined in:
lib/isono/amqp_client.rb

Overview

AMQP Client module for master and agent

class Client

include Isono::AmqpClient

def logger()
end

end

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#amqp_clientObject (readonly)

Returns the value of attribute amqp_client.



51
52
53
# File 'lib/isono/amqp_client.rb', line 51

def amqp_client
  @amqp_client
end

#mqObject (readonly)

Returns the value of attribute mq.



51
52
53
# File 'lib/isono/amqp_client.rb', line 51

def mq
  @mq
end

Instance Method Details

#after_closeObject



133
134
# File 'lib/isono/amqp_client.rb', line 133

def after_close
end

#after_connectObject



127
128
# File 'lib/isono/amqp_client.rb', line 127

def after_connect
end

#amqObject



110
111
112
113
# File 'lib/isono/amqp_client.rb', line 110

def amq
  raise 'AMQP connection is not established yet' unless connected?
  Thread.current[:mq]
end

#amqp_server_uriObject



53
54
55
56
57
58
59
60
# File 'lib/isono/amqp_client.rb', line 53

def amqp_server_uri
  raise "The connection is not established yet." unless @amqp_client && connected?

  URI::AMQP.build(:host => @amqp_client.settings[:host],
                  :port => @amqp_client.settings[:port],
                  :path => @amqp_client.settings[:vhost]
                  )
end

#before_closeObject



130
131
# File 'lib/isono/amqp_client.rb', line 130

def before_close
end

#before_connectObject



124
125
# File 'lib/isono/amqp_client.rb', line 124

def before_connect
end

#close(&blk) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/isono/amqp_client.rb', line 136

def close(&blk)
  return unless connected?

  prepare_close {
    @amqp_client.close {
      begin
        on_close
        after_close
        blk.call if blk
      ensure
        @amqp_client = nil
        Thread.current[:mq] = nil
      end
    }
  }
end

#connect(broker_uri, *args, &blk) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/isono/amqp_client.rb', line 62

def connect(broker_uri, *args, &blk)
  raise "the connection is still alive for: #{amqp_server_uri}" if connected?

  broker_uri = URI.parse(broker_uri.to_s) unless broker_uri.is_a?(URI)
  default = ::AMQP.settings
  opts = {:host => broker_uri.host, 
    :port => broker_uri.port || default[:port],
    :vhost => broker_uri.vhost || default[:vhost],
    :user=>broker_uri.user || default[:user],
    :pass=>broker_uri.password ||default[:pass]
  }
  opts.merge!(args) if args.is_a?(Hash)

  prepare_connect {
    @amqp_client = ::AMQP.connect(opts)
    @amqp_client.instance_eval {
      def settings
        @settings
      end
    }
    @amqp_client.connection_status { |t|
      case t
      when :connected
        # here is tried also when reconnected
        on_connect
      when :disconnected
        on_disconnected
      end
    }
    # the block argument is called once at the initial connection.
    @amqp_client.callback {
      after_connect
      if blk
        blk.arity == 1 ? blk.call(self) : blk.call
      end
    }
    @amqp_client.errback {
      logger.error("Failed to connect to the broker: #{amqp_server_uri}")
      blk.call(self) if blk && blk.arity == 1
    }
  }
  self
end

#connected?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/isono/amqp_client.rb', line 106

def connected?
  !@amqp_client.nil? && @amqp_client.connected?
end

#create_channelObject

Note:

Do not have to close by user. Channel close is performed as part of connection close.

Create new AMQP channel object



157
158
159
# File 'lib/isono/amqp_client.rb', line 157

def create_channel
  MQ.new(@amqp_client)
end

#on_closeObject



121
122
# File 'lib/isono/amqp_client.rb', line 121

def on_close
end

#on_connectObject



115
116
# File 'lib/isono/amqp_client.rb', line 115

def on_connect
end

#on_disconnectedObject



118
119
# File 'lib/isono/amqp_client.rb', line 118

def on_disconnected
end

#publish_to(exname, message, opts = {}) ⇒ void

This method returns an undefined value.

Publish a message to the designated exchange.

Examples:

Want to broadcast the data to all bound queues:

publish_to('topic exchange', 'data', :key=>'*')

Want to send the data to the specific queue(s):

publish_to('exchange name', 'group.1', 'data')

Parameters:

  • exname (String)

    The exchange name

  • message (String)

    Message body to be sent

  • opts (Hash) (defaults to: {})

    Options with the message. :key => ‘keyname’



173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/isono/amqp_client.rb', line 173

def publish_to(exname, message, opts={})
  EventMachine.schedule {
    ex = amq.exchanges[exname] || raise("Undefined exchange name : #{exname}")
    case ex.type
    when :topic
      unless opts.has_key? :key
        opts[:key] = '*'
      end
    end
    ex.publish(Serializer.instance.marshal(message), opts)
  }
end