Module: JMS::MessageConsumer

Defined in:
lib/jms/message_consumer.rb

Overview

Interface javax.jms.MessageConsumer

Instance Method Summary collapse

Instance Method Details

#each(params = {}, &block) ⇒ Object

For each message available to be consumed call the supplied block Returns the statistics gathered when statistics: true, otherwise nil

Parameters:

:timeout How to timeout waiting for messages on the Queue or Topic
  -1 : Wait forever
   0 : Return immediately if no message is available
   x : Wait for x milli-seconds for a message to be received from the broker
        Note: Messages may still be on the queue, but the broker has not supplied any messages
                  in the time interval specified
   Default: 0

:statistics Capture statistics on how many messages have been read
   true  : This method will capture statistics on the number of messages received
           and the time it took to process them.
           The statistics can be reset by calling MessageConsumer::each again
           with statistics: true

           The statistics gathered are returned when statistics: true and async: false

Raises:

  • (ArgumentError)


57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/jms/message_consumer.rb', line 57

def each(params={}, &block)
  raise(ArgumentError, 'Destination::each requires a code block to be executed for each message received') unless block

  message_count = nil
  start_time    = nil

  if params[:statistics]
    message_count = 0
    start_time    = Time.now
  end

  # Receive messages according to timeout
  while message = self.get(params) do
    block.call(message)
    message_count += 1 if message_count
  end

  unless message_count.nil?
    duration = Time.now - start_time
    {
      messages:            message_count,
      duration:            duration,
      messages_per_second: duration > 0 ? (message_count/duration).to_i : 0,
      ms_per_msg:          message_count > 0 ? (duration*1000.0)/message_count : 0
    }
  end
end

#get(params = {}) ⇒ Object

Obtain a message from the Destination or Topic In JMS terms, the message is received from the Destination :timeout follows the rules for MQSeries:

-1 : Wait forever
 0 : Return immediately if no message is available
 x : Wait for x milli-seconds for a message to be received from the broker
      Note: Messages may still be on the queue, but the broker has not supplied any messages
                in the time interval specified
 Default: 0

:buffered_message - consume Oracle AQ buffered message

Default: false


14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/jms/message_consumer.rb', line 14

def get(params={})
  timeout = params[:timeout] || 0
  buffered_message = params[:buffered_message] || false
  if timeout == -1
     if buffered_message
       self.bufferReceive
     else
       self.receive
     end
  elsif timeout == 0
     if buffered_message
       self.bufferReceiveNoWait
     else
       self.receiveNoWait
     end
  else
     if buffered_message
       self.bufferReceive(timeout)
     else
       self.receive(timeout)
     end
  end
end

#on_message(params = {}, &proc) ⇒ Object

Receive messages in a separate thread when they arrive Allows messages to be recieved in a separate thread. I.e. Asynchronously This method will return to the caller before messages are processed. It is then the callers responsibility to keep the program active so that messages can then be processed.

Parameters:

:statistics Capture statistics on how many messages have been read
   true  : This method will capture statistics on the number of messages received
           and the time it took to process them.
           The timer starts when each() is called and finishes when either the last message was received,
           or when Destination::statistics is called. In this case MessageConsumer::statistics
           can be called several times during processing without affecting the end time.
           Also, the start time and message count is not reset until MessageConsumer::each
           is called again with statistics: true

           The statistics gathered are returned when statistics: true and async: false

Raises:

  • (ArgumentError)


103
104
105
106
107
108
109
110
111
# File 'lib/jms/message_consumer.rb', line 103

def on_message(params={}, &proc)
  raise(ArgumentError, 'MessageConsumer::on_message requires a code block to be executed for each message received') unless proc

  # Turn on Java class persistence: https://github.com/jruby/jruby/wiki/Persistence
  self.class.__persistent__ = true

  @listener = JMS::MessageListenerImpl.new(params, &proc)
  self.setMessageListener(@listener)
end

#on_message_statisticsObject

Return the current statistics for a running MessageConsumer::on_message

Raises:

  • (ArgumentError)


114
115
116
117
118
# File 'lib/jms/message_consumer.rb', line 114

def on_message_statistics
  stats = @listener.statistics if @listener
  raise(ArgumentError, 'First call MessageConsumer::on_message with statistics: true before calling MessageConsumer::statistics()') unless stats
  stats
end