Module: Isono::AmqpClient
- Included in:
- Node
- Defined in:
- lib/isono/amqp_client.rb
Overview
Instance Attribute Summary collapse
-
#amqp_client ⇒ Object
readonly
Returns the value of attribute amqp_client.
-
#mq ⇒ Object
readonly
Returns the value of attribute mq.
Instance Method Summary collapse
- #after_close ⇒ Object
- #after_connect ⇒ Object
- #amq ⇒ Object
- #amqp_server_uri ⇒ Object
- #before_close ⇒ Object
- #before_connect ⇒ Object
- #close(&blk) ⇒ Object
- #connect(broker_uri, *args, &blk) ⇒ Object
- #connected? ⇒ Boolean
-
#create_channel ⇒ Object
Create new AMQP channel object.
- #on_close ⇒ Object
- #on_connect ⇒ Object
- #on_disconnected ⇒ Object
-
#publish_to(exname, message, opts = {}) ⇒ void
Publish a message to the designated exchange.
Instance Attribute Details
#amqp_client ⇒ Object (readonly)
Returns the value of attribute amqp_client.
50 51 52 |
# File 'lib/isono/amqp_client.rb', line 50 def amqp_client @amqp_client end |
#mq ⇒ Object (readonly)
Returns the value of attribute mq.
50 51 52 |
# File 'lib/isono/amqp_client.rb', line 50 def mq @mq end |
Instance Method Details
#after_close ⇒ Object
140 141 |
# File 'lib/isono/amqp_client.rb', line 140 def after_close end |
#after_connect ⇒ Object
134 135 |
# File 'lib/isono/amqp_client.rb', line 134 def after_connect end |
#amq ⇒ Object
117 118 119 120 |
# File 'lib/isono/amqp_client.rb', line 117 def amq raise 'AMQP connection is not established yet' unless connected? Thread.current[:mq] end |
#amqp_server_uri ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/isono/amqp_client.rb', line 52 def amqp_server_uri raise "The connection is not established yet." unless @amqp_client && connected? URI::AMQP.build(:host => @amqp_client.settings[:host], :port => @amqp_client.settings[:port], :path => @amqp_client.settings[:vhost] ) end |
#before_close ⇒ Object
137 138 |
# File 'lib/isono/amqp_client.rb', line 137 def before_close end |
#before_connect ⇒ Object
131 132 |
# File 'lib/isono/amqp_client.rb', line 131 def before_connect end |
#close(&blk) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/isono/amqp_client.rb', line 143 def close(&blk) return unless connected? prepare_close { @amqp_client.close { begin on_close after_close blk.call if blk ensure @amqp_client = nil Thread.current[:mq] = nil end } } end |
#connect(broker_uri, *args, &blk) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/isono/amqp_client.rb', line 61 def connect(broker_uri, *args, &blk) raise "the connection is still alive for: #{amqp_server_uri}" if connected? broker_uri = URI.parse(broker_uri.to_s) unless broker_uri.is_a?(URI) default = ::AMQP.settings opts = {:host => broker_uri.host, :port => broker_uri.port || default[:port], :vhost => broker_uri.vhost || default[:vhost], :user=>broker_uri.user || default[:user], :pass=>broker_uri.password ||default[:pass] } opts.merge!(args) if args.is_a?(Hash) prepare_connect { @amqp_client = ::AMQP.connect(opts) @amqp_client.instance_eval { def settings @settings end @on_disconnect = Proc.new do # This block will be executed when you start the Agent if the AMQP server has been stopped. Isono.at_disconnected.each do |blk| blk.call end blk.call(:error) end } @amqp_client.connection_status { |t| case t when :connected # here is tried also when reconnected on_connect when :disconnected # This block is executed if the AMQP server goes down during startup. on_disconnected Isono.at_disconnected.each do |blk| blk.call end end } # the block argument is called once at the initial connection. @amqp_client.callback { after_connect if blk blk.arity == 1 ? blk.call(self) : blk.call end } } self end |
#connected? ⇒ Boolean
113 114 115 |
# File 'lib/isono/amqp_client.rb', line 113 def connected? !@amqp_client.nil? && @amqp_client.connected? end |
#create_channel ⇒ Object
Note:
Do not have to close by user. Channel close is performed as part of connection close.
Create new AMQP channel object
164 165 166 |
# File 'lib/isono/amqp_client.rb', line 164 def create_channel AMQP::Channel.new(@amqp_client) end |
#on_close ⇒ Object
128 129 |
# File 'lib/isono/amqp_client.rb', line 128 def on_close end |
#on_connect ⇒ Object
122 123 |
# File 'lib/isono/amqp_client.rb', line 122 def on_connect end |
#on_disconnected ⇒ Object
125 126 |
# File 'lib/isono/amqp_client.rb', line 125 def on_disconnected end |
#publish_to(exname, message, opts = {}) ⇒ void
This method returns an undefined value.
Publish a message to the designated exchange.
180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/isono/amqp_client.rb', line 180 def publish_to(exname, , opts={}) EventMachine.schedule { ex = amq.exchanges[exname] || raise("Undefined exchange name : #{exname}") case ex.type when :topic unless opts.has_key? :key opts[:key] = '*' end end ex.publish(Serializer.instance.marshal(), opts) } end |