Class: RabbitMQClient::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/rabbitmq_client.rb

Instance Method Summary collapse

Constructor Details

#initialize(name, channel, durable = false) ⇒ Queue

Returns a new instance of Queue.



39
40
41
42
43
44
45
# File 'lib/rabbitmq_client.rb', line 39

def initialize(name, channel, durable=false)
  @name = name
  @durable = durable
  @channel = channel
  @channel.queue_declare(name, durable)
  self
end

Instance Method Details

#bind(exchange, routing_key = '') ⇒ Object



47
48
49
50
51
52
53
# File 'lib/rabbitmq_client.rb', line 47

def bind(exchange, routing_key='')
  @routing_key = routing_key
  @exchange = exchange
  raise RabbitMQClientError, "queue and exchange has different durable property" unless @durable == exchange.durable
  @channel.queue_bind(@name, @exchange.name, @routing_key)
  self
end

#loop_subscribe(&block) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/rabbitmq_client.rb', line 93

def loop_subscribe(&block)
  no_ack = false
  consumer = QueueingConsumer.new(@channel)
  @channel.basic_consume(@name, no_ack, consumer)
  loop do
    begin
      delivery = consumer.next_delivery
      message_body = Marshal.load(String.from_java_bytes(delivery.get_body))
      block.call message_body
      @channel.basic_ack(delivery.get_envelope.get_delivery_tag, false)
    rescue InterruptedException => ie
      next
    end
  end
end

#persistent_publish(message_body, props = MessageProperties::PERSISTENT_TEXT_PLAIN) ⇒ Object



69
70
71
72
# File 'lib/rabbitmq_client.rb', line 69

def persistent_publish(message_body, props=MessageProperties::PERSISTENT_TEXT_PLAIN)
  raise RabbitMQClientError, "can only publish persistent message to durable queue" unless @durable
  publish(message_body, props)
end

#publish(message_body, props = nil) ⇒ Object

Set props for different type of message. Currently they are: RabbitMQClient::MessageProperties::MINIMAL_BASIC RabbitMQClient::MessageProperties::MINIMAL_PERSISTENT_BASIC RabbitMQClient::MessageProperties::BASIC RabbitMQClient::MessageProperties::PERSISTENT_BASIC RabbitMQClient::MessageProperties::TEXT_PLAIN RabbitMQClient::MessageProperties::PERSISTENT_TEXT_PLAIN



62
63
64
65
66
67
# File 'lib/rabbitmq_client.rb', line 62

def publish(message_body, props=nil)
  auto_bind
  message_body_byte = Marshal.dump(message_body).to_java_bytes
  @channel.basic_publish(@exchange.name, @routing_key, props, message_body_byte)
  message_body
end

#retrieveObject



74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/rabbitmq_client.rb', line 74

def retrieve
  auto_bind
  message_body = nil
  no_ack = false
  response = @channel.basic_get(@name, no_ack)
  if response
    props = response.get_props
    message_body = Marshal.load(String.from_java_bytes(response.get_body))
    delivery_tag = response.get_envelope.get_delivery_tag
    @channel.basic_ack(delivery_tag, false)
  end
  message_body
end

#subscribe(&block) ⇒ Object



88
89
90
91
# File 'lib/rabbitmq_client.rb', line 88

def subscribe(&block)
  no_ack = false
  @channel.basic_consume(@name, no_ack, QueueConsumer.new(@channel, block))
end