Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/march_hare/session.rb', line 81

def initialize(connection_factory, opts = {})
  @cf               = connection_factory
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)
  @connection       = self.new_connection_impl
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



261
262
263
# File 'lib/march_hare/session.rb', line 261

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



77
78
79
# File 'lib/march_hare/session.rb', line 77

def channels
  @channels
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :requested_heartbeat (Integer) — default: 580

    Heartbeat timeout used. 0 means no heartbeat.

  • :tls (Boolean) — default: false

    Set to true to use TLS/SSL connection. This will switch port to 5671 by default.

  • :thread_factory (java.util.concurrent.ThreadFactory)

    Thread factory RabbitMQ Java client will use (useful in restricted PaaS platforms such as GAE)

See Also:



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/march_hare/session.rb', line 43

def self.connect(options={})
  cf = ConnectionFactory.new

  cf.uri                = options[:uri]          if options[:uri]
  cf.host               = hostname_from(options) if include_host?(options)
  cf.port               = options[:port].to_i    if options[:port]
  cf.virtual_host       = vhost_from(options)    if include_vhost?(options)
  cf.connection_timeout = timeout_from(options)  if include_timeout?(options)
  cf.username           = username_from(options) if include_username?(options)
  cf.password           = password_from(options) if include_password?(options)

  cf.requested_heartbeat = heartbeat_from(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  cf.thread_factory      = thread_factory_from(options)    if include_thread_factory?(options)
  cf.exception_handler   = exception_handler_from(options) if include_exception_handler?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    if options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    else
      cf.use_ssl_protocol(tls)
    end
  end


  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



178
179
180
181
182
183
184
185
186
# File 'lib/march_hare/session.rb', line 178

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if !signal.initiated_by_application
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/march_hare/session.rb', line 195

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  new_connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) do
      self.new_connection_impl
    end
  end
  @thread_pool = ThreadPools.dynamically_growing
  self.recover_shutdown_hooks(new_connection)

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, new_connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end

  @connection = new_connection
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



172
173
174
# File 'lib/march_hare/session.rb', line 172

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



129
130
131
132
133
134
135
136
# File 'lib/march_hare/session.rb', line 129

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



145
146
147
# File 'lib/march_hare/session.rb', line 145

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/march_hare/session.rb', line 111

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



189
190
191
# File 'lib/march_hare/session.rb', line 189

def disable_automatic_recovery
  @connetion.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



240
241
242
# File 'lib/march_hare/session.rb', line 240

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



245
246
247
# File 'lib/march_hare/session.rb', line 245

def heartbeat=(n)
  @connection.heartbeat = n
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



162
163
164
# File 'lib/march_hare/session.rb', line 162

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



152
153
154
155
156
157
158
159
# File 'lib/march_hare/session.rb', line 152

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



167
168
169
# File 'lib/march_hare/session.rb', line 167

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



139
140
141
# File 'lib/march_hare/session.rb', line 139

def open?
  @connection.open?
end

#recover_shutdown_hooks(connection) ⇒ Object



233
234
235
236
237
# File 'lib/march_hare/session.rb', line 233

def recover_shutdown_hooks(connection)
  @shutdown_hooks.each do |sh|
    connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



276
277
278
# File 'lib/march_hare/session.rb', line 276

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#startObject

No-op, exists for better API compatibility with Bunny.



250
251
252
253
254
255
256
257
258
259
# File 'lib/march_hare/session.rb', line 250

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
end

#to_sString

Returns:

  • (String)


266
267
268
# File 'lib/march_hare/session.rb', line 266

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



281
282
283
# File 'lib/march_hare/session.rb', line 281

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end