Class: Stapfen::Client::JMS

Inherits:
Object
  • Object
show all
Defined in:
lib/stapfen/client/jms.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(configuration) ⇒ JMS

Returns a new instance of JMS.



9
10
11
12
13
# File 'lib/stapfen/client/jms.rb', line 9

def initialize(configuration)
  super()
  @config = configuration
  @connection = nil
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



7
8
9
# File 'lib/stapfen/client/jms.rb', line 7

def connection
  @connection
end

Instance Method Details

#can_unreceive?Boolean

Returns:

  • (Boolean)


86
87
88
# File 'lib/stapfen/client/jms.rb', line 86

def can_unreceive?
  true
end

#closeBoolean

Close the JMS::Connection and the JMS::Session if it’s been created for this client

Returns:

  • (Boolean)

    True/false depending on whether we actually closed the connection



64
65
66
67
68
69
70
# File 'lib/stapfen/client/jms.rb', line 64

def close
  return false unless @connection
  @session.close if @session
  @connection.close
  @connection = nil
  return true
end

#closed?Boolean

API compatibilty method, doesn’t actually indicate that the connection is closed. Will only return true if no connection currently exists

Returns:

  • (Boolean)


76
77
78
# File 'lib/stapfen/client/jms.rb', line 76

def closed?
  return connection.nil?
end

#connect(*args) ⇒ JMS::Connection

Connect to the broker via JMS and start the JMS session

Returns:

  • (JMS::Connection)


18
19
20
21
22
# File 'lib/stapfen/client/jms.rb', line 18

def connect(*args)
  @connection = ::JMS::Connection.new(@config)
  @connection.start
  return @connection
end

#publish(destination, body, headers = {}) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/stapfen/client/jms.rb', line 33

def publish(destination, body, headers={})
  destination = Stapfen::Destination.from_string(destination)

  session.producer(destination.jms_opts) do |p|
    # Create the JMS typed Message
    message = session.message(body)

    message.delivery_mode = ::JMS::DeliveryMode::PERSISTENT if headers.delete(:persistent)

    # Take the remainder of the headers and push them into the message
    # properties.
    headers.each_pair do |key, value|
      message.setStringProperty(key.to_s, value.to_s)
    end

    p.send(message)
  end
end

#runloopObject



80
81
82
83
84
# File 'lib/stapfen/client/jms.rb', line 80

def runloop
  loop do
    sleep 1
  end
end

#sessionJMS::Session

Accessor method which will cache the session if we’ve already created it once

Returns:

  • (JMS::Session)

    Instantiated JMS::Session for our connection



29
30
31
# File 'lib/stapfen/client/jms.rb', line 29

def session
  @session ||= connection.create_session
end

#subscribe(destination, headers = {}, &block) ⇒ Object



52
53
54
55
56
57
# File 'lib/stapfen/client/jms.rb', line 52

def subscribe(destination, headers={}, &block)
  destination = Stapfen::Destination.from_string(destination)
  connection.on_message(destination.jms_opts) do |m|
    block.call(m)
  end
end

#unreceive(message, unreceive_headers) ⇒ Object

JMS doesn’t implement unreceive in quite the way Stomp does, so we’ll implement it here.

Given a message and a set of unreceive headers, this will deliver the message back to its originating queue a limited number of times, then failover to the dead letter queue.

Parameters:

  • message (Stapfen::Message)

    The message to unreceive.

  • unreceive_headers (Hash)

Options Hash (unreceive_headers):

  • :max_redeliveries (Integer)

    The number of times to attempt redelivery.

  • :dead_letter_queue (String)

    After giving up on redelivering, send the message here.



103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/stapfen/client/jms.rb', line 103

def unreceive(message, unreceive_headers)
  return if unreceive_headers[:max_redeliveries].nil? && unreceive_headers[:max_redeliveries].nil?

  destination = message.destination.to_s.sub('queue://','/queue/').sub('topic://','/topic')
  retry_count = message.getStringProperty('retry_count').to_i || 0
  retry_count +=1

  if unreceive_headers[:max_redeliveries] && (retry_count <= unreceive_headers[:max_redeliveries])
    self.publish(destination, message.data, {'retry_count' => retry_count.to_s})
  elsif unreceive_headers[:dead_letter_queue] # Done retrying, send to DLQ
    self.publish(unreceive_headers[:dead_letter_queue], message.data, {:original_destination => destination})
  end
end