Class: Puppet::Indirector::Queue
Overview
Implements the :queue
abstract indirector terminus type, for storing model instances to a message queue, presumably for the purpose of out-of-process handling of changes related to the model.
Relies upon Puppet::Util::Queue for registry and client object management, and specifies a default queue type of :stomp
, appropriate for use with a variety of message brokers.
It’s up to the queue client type to instantiate itself correctly based on Puppet configuration information.
A single queue client is maintained for the abstract terminus, meaning that you can only use one type of queue client, one message broker solution, etc., with the indirection mechanism.
Per-indirection queues are assumed, based on the indirection name. If the :catalog
indirection makes use of this :queue
terminus, queue operations work against the “catalog” queue. It is up to the queue client library to handle queue creation as necessary (for a number of popular queuing solutions, queue creation is automatic and not a concern).
Direct Known Subclasses
Constant Summary
Constants included from Util
Util::AbsolutePathPosix, Util::AbsolutePathWindows
Constants included from Util::Docs
Instance Attribute Summary
Attributes included from Util::Docs
Class Method Summary collapse
-
.intern(message) ⇒ Object
converts the message from deserialized format to an actual model instance.
- .queue ⇒ Object
-
.subscribe ⇒ Object
Provides queue subscription functionality; for a given indirection, use this method on the terminus to subscribe to the indirection-specific queue.
Instance Method Summary collapse
-
#client ⇒ Object
Returns the singleton queue client object.
-
#find(request) ⇒ Object
Queue has no idiomatic “find”.
-
#initialize(*args) ⇒ Queue
constructor
A new instance of Queue.
- #queue ⇒ Object
-
#save(request) ⇒ Object
Place the request on the queue.
Methods included from Util::Queue
client_class, queue_type_from_class, queue_type_to_class, register_queue_type
Methods included from Util::InstanceLoader
#instance_docs, #instance_hash, #instance_load, #instance_loader, #instance_loading?, #loaded_instance, #loaded_instances
Methods included from Util
absolute_path?, activerecord_version, benchmark, binread, chuser, classproxy, #execfail, #execpipe, execute, execute_posix, execute_windows, logmethods, memory, path_to_uri, proxy, replace_file, safe_posix_fork, symbolize, symbolizehash, symbolizehash!, synchronize_on, thinmark, #threadlock, uri_to_path, wait_for_output, which, withumask
Methods included from Util::POSIX
#get_posix_field, #gid, #idfield, #methodbyid, #methodbyname, #search_posix_field, #uid
Methods inherited from Terminus
abstract_terminus?, const2name, #indirection, indirection_name, inherited, mark_as_abstract_terminus, #model, model, #name, name2const, register_terminus_class, terminus_class, terminus_classes, #terminus_type
Methods included from Util::Docs
#desc, #dochook, #doctable, #markdown_definitionlist, #markdown_header, #nodoc?, #pad, scrub
Constructor Details
Class Method Details
.intern(message) ⇒ Object
converts the message from deserialized format to an actual model instance.
60 61 62 63 64 65 66 |
# File 'lib/vendor/puppet/indirector/queue.rb', line 60 def self.intern() result = nil benchmark :info, "Loaded queued #{indirection.name}" do result = model.convert_from(:pson, ) end result end |
.queue ⇒ Object
46 47 48 |
# File 'lib/vendor/puppet/indirector/queue.rb', line 46 def self.queue indirection_name end |
.subscribe ⇒ Object
Provides queue subscription functionality; for a given indirection, use this method on the terminus to subscribe to the indirection-specific queue. Your block will be executed per new indirection model received from the queue, with obj being the model instance.
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/vendor/puppet/indirector/queue.rb', line 71 def self.subscribe client.subscribe(queue) do |msg| begin yield(self.intern(msg)) rescue => detail puts detail.backtrace if Puppet[:trace] Puppet.err "Error occured with subscription to queue #{queue} for indirection #{indirection_name}: #{detail}" end end end |
Instance Method Details
#client ⇒ Object
Returns the singleton queue client object.
55 56 57 |
# File 'lib/vendor/puppet/indirector/queue.rb', line 55 def client self.class.client end |
#find(request) ⇒ Object
Queue has no idiomatic “find”
31 32 33 |
# File 'lib/vendor/puppet/indirector/queue.rb', line 31 def find(request) nil end |
#queue ⇒ Object
50 51 52 |
# File 'lib/vendor/puppet/indirector/queue.rb', line 50 def queue self.class.queue end |
#save(request) ⇒ Object
Place the request on the queue
36 37 38 39 40 41 42 43 44 |
# File 'lib/vendor/puppet/indirector/queue.rb', line 36 def save(request) result = nil benchmark :info, "Queued #{indirection.name} for #{request.key}" do result = client.(queue, request.instance.render(:pson)) end result rescue => detail raise Puppet::Error, "Could not write #{request.key} to queue: #{detail}\nInstance::#{request.instance}\n client : #{client}" end |