Class: Puppet::Util::Queue::Stomp

Inherits:
Object
  • Object
show all
Defined in:
lib/puppet/util/queue/stomp.rb

Overview

Implements the Ruby Stomp client as a queue type within the Puppet::Indirector::Queue::Client registry, for use with the :queue indirection terminus type.

Looks to Puppet[:queue_source] for the sole argument to the underlying Stomp::Client constructor; consequently, for this client to work, Puppet[:queue_source] must use the Stomp::Client URL-like syntax for identifying the Stomp message broker: login:[email protected]

API:

  • public

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStomp

Returns a new instance of Stomp.

API:

  • public



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/puppet/util/queue/stomp.rb', line 16

def initialize
  begin
    uri = URI.parse(Puppet[:queue_source])
  rescue => detail
    raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is invalid: #{detail}", detail.backtrace
  end
  unless uri.scheme == "stomp"
    raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is not a Stomp URL: #{detail}"
  end

  begin
    self.stomp_client = Stomp::Client.new(uri.user, uri.password, uri.host, uri.port, true)
  rescue => detail
    raise ArgumentError, "Could not create Stomp client instance with queue source #{Puppet[:queue_source]}: got internal Stomp client error #{detail}", detail.backtrace
  end

  # Identify the supported method for sending messages.
  @method =
    case
    when stomp_client.respond_to?(:publish)
      :publish
    when stomp_client.respond_to?(:send)
      :send
    else
      raise ArgumentError, "STOMP client does not respond to either publish or send"
    end
end

Instance Attribute Details

#stomp_clientObject

API:

  • public



14
15
16
# File 'lib/puppet/util/queue/stomp.rb', line 14

def stomp_client
  @stomp_client
end

Instance Method Details

#publish_message(target, msg) ⇒ Object

API:

  • public



44
45
46
# File 'lib/puppet/util/queue/stomp.rb', line 44

def publish_message(target, msg)
  stomp_client.__send__(@method, stompify_target(target), msg, :persistent => true)
end

#stompify_target(target) ⇒ Object

API:

  • public



55
56
57
# File 'lib/puppet/util/queue/stomp.rb', line 55

def stompify_target(target)
  '/queue/' + target.to_s
end

#subscribe(target) ⇒ Object

API:

  • public



48
49
50
51
52
53
# File 'lib/puppet/util/queue/stomp.rb', line 48

def subscribe(target)
  stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message|
    yield(stomp_message.body)
    stomp_client.acknowledge(stomp_message)
  end
end