Class: RabbitMQClient

Inherits:
Object
  • Object
show all
Includes:
ObjectSpace
Defined in:
lib/jessica/rabbitmq_client.rb

Defined Under Namespace

Classes: ConfirmedMessageListener, Exchange, Queue, QueueConsumer, RabbitMQClientError, ReactiveMessage, ReturnedMessageListener

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ RabbitMQClient

Instance Methods



355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/jessica/rabbitmq_client.rb', line 355

def initialize(options={})
  if options[:factory]
    @factory = options[:factory]
  else
    @factory = self.class.factory(options)
  end

  # queues and exchanges
  @queues = {}
  @exchanges = {}

  connect unless options[:no_auto_connect]
  # Disconnect before the object is destroyed
  define_finalizer(self, lambda {|id| self.disconnect if self.connected? })
  self
end

Instance Attribute Details

#channelObject (readonly)

Returns the value of attribute channel.



350
351
352
# File 'lib/jessica/rabbitmq_client.rb', line 350

def channel
  @channel
end

#connectionObject (readonly)

Returns the value of attribute connection.



351
352
353
# File 'lib/jessica/rabbitmq_client.rb', line 351

def connection
  @connection
end

#factoryObject

Returns the value of attribute factory.



352
353
354
# File 'lib/jessica/rabbitmq_client.rb', line 352

def factory
  @factory
end

Class Method Details

.factory(options) ⇒ Object



338
339
340
341
342
343
344
345
346
347
# File 'lib/jessica/rabbitmq_client.rb', line 338

def factory(options)
  conn_factory = ConnectionFactory.new
  conn_factory.username = options[:user] || 'guest'
  conn_factory.password = options[:pass] || 'guest'
  conn_factory.virtual_host = options[:vhost] || '/'
  conn_factory.requested_heartbeat = options[:requested_heartbeat] || 0
  conn_factory.host = options[:host] || '127.0.0.1'
  conn_factory.port = options[:port] || 5672
  conn_factory
end

Instance Method Details

#connectObject



372
373
374
375
# File 'lib/jessica/rabbitmq_client.rb', line 372

def connect()
  @connection = @factory.new_connection
  @channel = @connection.create_channel
end

#connected?Boolean

Returns:

  • (Boolean)


383
384
385
# File 'lib/jessica/rabbitmq_client.rb', line 383

def connected?
  @connection != nil
end

#delete_exchange(name) ⇒ Object



408
409
410
411
# File 'lib/jessica/rabbitmq_client.rb', line 408

def delete_exchange name
  @channel.exchange_delete name
  @exchanges.delete name
end

#delete_queue(name) ⇒ Object



403
404
405
406
# File 'lib/jessica/rabbitmq_client.rb', line 403

def delete_queue name
  @channel.queue_delete name
  @queues.delete name
end

#disconnectObject



377
378
379
380
381
# File 'lib/jessica/rabbitmq_client.rb', line 377

def disconnect
  @channel.close
  @connection.close
  @connection = nil
end

#exchange(name, kind = 'fanout', opts = {}) ⇒ Object



399
400
401
# File 'lib/jessica/rabbitmq_client.rb', line 399

def exchange name, kind='fanout', opts={}
  @exchanges[name] = Exchange.new(name, kind, @channel, opts)
end

#find_exchange(name) ⇒ Object



395
396
397
# File 'lib/jessica/rabbitmq_client.rb', line 395

def find_exchange name
  @exchanges[name]
end

#find_queue(name) ⇒ Object



387
388
389
# File 'lib/jessica/rabbitmq_client.rb', line 387

def find_queue name
  @queues[name]
end

#qos(prefetch_size = 0, prefetch_count = 32, global = false) ⇒ Object



413
414
415
# File 'lib/jessica/rabbitmq_client.rb', line 413

def qos(prefetch_size = 0, prefetch_count = 32, global = false)
  @channel.basic_qos(prefetch_size, prefetch_count, global)
end

#queue(name, opts) ⇒ Object



391
392
393
# File 'lib/jessica/rabbitmq_client.rb', line 391

def queue name, opts
  @queues[name] = Queue.new(name, @channel, opts)
end