Class: MarchHare::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/session.rb

Overview

Connection to a RabbitMQ node.

Used to open and close connections and open (create) new channels.

Defined Under Namespace

Classes: BlockBlockedUnblockedListener

Constant Summary collapse

DEFAULT_NETWORK_RECOVERY_INTERVAL =

Default reconnection interval for TCP connection failures

5.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_factory, opts = {}) ⇒ Session

Returns a new instance of Session.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/march_hare/session.rb', line 77

def initialize(connection_factory, opts = {})
  @cf               = connection_factory
  # executors cannot be restarted after shutdown,
  # so we really need a factory here. MK.
  @executor_factory = opts[:executor_factory] || build_executor_factory_from(opts)
  @connection       = self.new_connection_impl
  @channels         = JavaConcurrent::ConcurrentHashMap.new

  # should automatic recovery from network failures be used?
  @automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
                             true
                           else
                             opts[:automatically_recover] || opts[:automatic_recovery]
                           end
  @network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
  @shutdown_hooks            = Array.new

  if @automatically_recover
    self.add_automatic_recovery_hook
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(selector, *args) ⇒ Object



255
256
257
# File 'lib/march_hare/session.rb', line 255

def method_missing(selector, *args)
  @connection.__send__(selector, *args)
end

Instance Attribute Details

#channelsArray<MarchHare::Channel> (readonly)

Returns Channels opened on this connection.

Returns:



73
74
75
# File 'lib/march_hare/session.rb', line 73

def channels
  @channels
end

Class Method Details

.connect(options = {}) ⇒ Object

Connects to a RabbitMQ node.

Parameters:

  • options (Hash) (defaults to: {})

    Connection options

Options Hash (options):

  • :host (String) — default: "127.0.0.1"

    Hostname or IP address to connect to

  • :port (Integer) — default: 5672

    Port RabbitMQ listens on

  • :username (String) — default: "guest"

    Username

  • :password (String) — default: "guest"

    Password

  • :vhost (String) — default: "/"

    Virtual host to use

  • :heartbeat (Integer) — default: 600

    Heartbeat interval. 0 means no heartbeat.

  • :tls (Boolean) — default: false

    Set to true to use TLS/SSL connection. This will switch port to 5671 by default.

See Also:



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/march_hare/session.rb', line 42

def self.connect(options={})
  cf = ConnectionFactory.new

  cf.uri                = options[:uri]          if options[:uri]
  cf.host               = hostname_from(options) if include_host?(options)
  cf.port               = options[:port].to_i    if options[:port]
  cf.virtual_host       = vhost_from(options)    if include_vhost?(options)
  cf.connection_timeout = timeout_from(options)  if include_timeout?(options)
  cf.username           = username_from(options) if include_username?(options)
  cf.password           = password_from(options) if include_password?(options)

  cf.requested_heartbeat = heartbeat_from(options)          if include_heartbeat?(options)
  cf.connection_timeout  = connection_timeout_from(options) if include_connection_timeout?(options)

  tls = (options[:ssl] || options[:tls])
  case tls
  when true then
    cf.use_ssl_protocol
  when String then
    if options[:trust_manager]
      cf.use_ssl_protocol(tls, options[:trust_manager])
    else
      cf.use_ssl_protocol(tls)
    end
  end


  new(cf, options)
end

Instance Method Details

#add_automatic_recovery_hookObject



174
175
176
177
178
179
180
181
182
# File 'lib/march_hare/session.rb', line 174

def add_automatic_recovery_hook
  fn = Proc.new do |_, signal|
    if !signal.initiated_by_application
      self.automatically_recover
    end
  end

  @automatic_recovery_hook = self.on_shutdown(&fn)
end

#automatically_recoverObject

Begins automatic connection recovery (typically only used internally to recover from network failures)



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/march_hare/session.rb', line 191

def automatically_recover
  ms = @network_recovery_interval * 1000
  # recovering immediately makes little sense. Wait a bit first. MK.
  java.lang.Thread.sleep(ms)

  @connection = converting_rjc_exceptions_to_ruby do
    reconnecting_on_network_failures(ms) do
      self.new_connection_impl
    end
  end
  @thread_pool = ThreadPools.dynamically_growing
  self.recover_shutdown_hooks

  # sorting channels by id means that the cases like the following:
  #
  # ch1 = conn.create_channel
  # ch2 = conn.create_channel
  #
  # x   = ch1.topic("logs", :durable => false)
  # q   = ch2.queue("", :exclusive => true)
  #
  # q.bind(x)
  #
  # will recover correctly because exchanges and queues will be recovered
  # in the order the user expects and before bindings.
  @channels.sort_by {|id, _| id}.each do |id, ch|
    begin
      ch.automatically_recover(self, @connection)
    rescue Exception, java.io.IOException => e
      # TODO: logging
      $stderr.puts e
    end
  end
end

#clear_blocked_connection_callbacksObject

Clears all callbacks defined with #on_blocked and #on_unblocked.



168
169
170
# File 'lib/march_hare/session.rb', line 168

def clear_blocked_connection_callbacks
  @connection.clear_blocked_listeners
end

#closeObject

Closes connection gracefully.

This includes shutting down consumer work pool gracefully, waiting up to 5 seconds for all consumer deliveries to be processed.



125
126
127
128
129
130
131
132
# File 'lib/march_hare/session.rb', line 125

def close
  @channels.select { |_, ch| ch.open? }.each do |_, ch|
    ch.close
  end

  maybe_shut_down_executor
  @connection.close
end

#closed?Boolean

Returns true if this channel is closed.

Returns:

  • (Boolean)

    true if this channel is closed



141
142
143
# File 'lib/march_hare/session.rb', line 141

def closed?
  !@connection.open?
end

#create_channel(n = nil) ⇒ MarchHare::Channel

Opens a new channel.

Parameters:

  • (nil): (Integer)

    Channel number. Pass nil to let MarchHare allocate an available number in a safe way.

Returns:

See Also:



107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/march_hare/session.rb', line 107

def create_channel(n = nil)
  jc = if n
         @connection.create_channel(n)
       else
         @connection.create_channel
       end

  ch = Channel.new(self, jc)
  register_channel(ch)

  ch
end

#disable_automatic_recoveryObject



185
186
187
# File 'lib/march_hare/session.rb', line 185

def disable_automatic_recovery
  @connetion.remove_shutdown_listener(@automatic_recovery_hook) if @automatic_recovery_hook
end

#flushObject

Flushes the socket used by this connection.



234
235
236
# File 'lib/march_hare/session.rb', line 234

def flush
  @connection.flush
end

#heartbeat=(n) ⇒ Object



239
240
241
# File 'lib/march_hare/session.rb', line 239

def heartbeat=(n)
  @connection.heartbeat = n
end

#on_blocked(&block) ⇒ Object

Defines a connection.blocked handler



158
159
160
# File 'lib/march_hare/session.rb', line 158

def on_blocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_blocked(block))
end

#on_shutdown(&block) ⇒ Object

Defines a shutdown event callback. Shutdown events are broadcasted when a connection is closed, either explicitly or forcefully, or due to a network/peer failure.



148
149
150
151
152
153
154
155
# File 'lib/march_hare/session.rb', line 148

def on_shutdown(&block)
  sh = ShutdownListener.new(self, &block)
  @shutdown_hooks << sh

  @connection.add_shutdown_listener(sh)

  sh
end

#on_unblocked(&block) ⇒ Object

Defines a connection.unblocked handler



163
164
165
# File 'lib/march_hare/session.rb', line 163

def on_unblocked(&block)
  self.add_blocked_listener(BlockBlockedUnblockedListener.for_unblocked(block))
end

#open?Boolean Also known as: connected?

Returns true if connection is open, false otherwise.

Returns:

  • (Boolean)

    true if connection is open, false otherwise



135
136
137
# File 'lib/march_hare/session.rb', line 135

def open?
  @connection.open?
end

#recover_shutdown_hooksObject



227
228
229
230
231
# File 'lib/march_hare/session.rb', line 227

def recover_shutdown_hooks
  @shutdown_hooks.each do |sh|
    @connection.add_shutdown_listener(sh)
  end
end

#register_channel(ch) ⇒ Object



270
271
272
# File 'lib/march_hare/session.rb', line 270

def register_channel(ch)
  @channels[ch.channel_number] = ch
end

#startObject

No-op, exists for better API compatibility with Bunny.



244
245
246
247
248
249
250
251
252
253
# File 'lib/march_hare/session.rb', line 244

def start
  # no-op
  #
  # This method mimics Bunny::Session#start in Bunny 0.9.
  # Without it, #method_missing will proxy the call to com.rabbitmq.client.AMQConnection,
  # which happens to have a #start method which is not idempotent.
  #
  # So we stub out #start in case someone migrating from Bunny forgets to remove
  # the call to #start. MK.
end

#to_sString

Returns:

  • (String)


260
261
262
# File 'lib/march_hare/session.rb', line 260

def to_s
  "#<#{self.class.name}:#{object_id} #{@cf.username}@#{@cf.host}:#{@cf.port}, vhost=#{@cf.virtual_host}>"
end

#unregister_channel(ch) ⇒ Object



275
276
277
# File 'lib/march_hare/session.rb', line 275

def unregister_channel(ch)
  @channels.delete(ch.channel_number)
end