Class: Puppet::Indirector::Queue
Overview
Implements the :queue
abstract indirector terminus type, for storing model instances to a message queue, presumably for the purpose of out-of-process handling of changes related to the model.
Relies upon Puppet::Util::Queue for registry and client object management, and specifies a default queue type of :stomp
, appropriate for use with a variety of message brokers.
It’s up to the queue client type to instantiate itself correctly based on Puppet configuration information.
A single queue client is maintained for the abstract terminus, meaning that you can only use one type of queue client, one message broker solution, etc., with the indirection mechanism.
Per-indirection queues are assumed, based on the indirection name. If the :catalog
indirection makes use of this :queue
terminus, queue operations work against the “catalog” queue. It is up to the queue client library to handle queue creation as necessary (for a number of popular queuing solutions, queue creation is automatic and not a concern).
Direct Known Subclasses
Constant Summary
Constants included from Util
Util::AbsolutePathPosix, Util::AbsolutePathWindows, Util::DEFAULT_POSIX_MODE, Util::DEFAULT_WINDOWS_MODE
Constants included from Util::POSIX
Util::POSIX::LOCALE_ENV_VARS, Util::POSIX::USER_ENV_VARS
Constants included from Util::SymbolicFileMode
Util::SymbolicFileMode::SetGIDBit, Util::SymbolicFileMode::SetUIDBit, Util::SymbolicFileMode::StickyBit, Util::SymbolicFileMode::SymbolicMode, Util::SymbolicFileMode::SymbolicSpecialToBit
Constants included from Util::Docs
Instance Attribute Summary
Attributes included from Util::Docs
Class Method Summary collapse
-
.intern(message) ⇒ Object
converts the message from deserialized format to an actual model instance.
- .queue ⇒ Object
-
.subscribe ⇒ Object
Provides queue subscription functionality; for a given indirection, use this method on the terminus to subscribe to the indirection-specific queue.
Instance Method Summary collapse
-
#client ⇒ Object
Returns the singleton queue client object.
-
#find(request) ⇒ Object
Queue has no idiomatic “find”.
-
#initialize(*args) ⇒ Queue
constructor
A new instance of Queue.
- #queue ⇒ Object
-
#save(request) ⇒ Object
Place the request on the queue.
Methods included from Util::Queue
client_class, queue_type_from_class, queue_type_to_class, register_queue_type
Methods included from Util::InstanceLoader
#instance_docs, #instance_hash, #instance_load, #instance_loader, #instance_loading?, #loaded_instance, #loaded_instances
Methods included from Util
absolute_path?, activerecord_version, benchmark, binread, chuser, classproxy, deterministic_rand, execfail, execpipe, execute, exit_on_fail, logmethods, memory, path_to_uri, pretty_backtrace, proxy, replace_file, safe_posix_fork, symbolizehash, thinmark, uri_to_path, which, withenv, withumask
Methods included from Util::POSIX
#get_posix_field, #gid, #idfield, #methodbyid, #methodbyname, #search_posix_field, #uid
Methods included from Util::SymbolicFileMode
#normalize_symbolic_mode, #symbolic_mode_to_int, #valid_symbolic_mode?
Methods inherited from Terminus
abstract_terminus?, #allow_remote_requests?, const2name, #indirection, indirection_name, inherited, mark_as_abstract_terminus, #model, model, #name, name2const, register_terminus_class, terminus_class, terminus_classes, #terminus_type, #validate, #validate_key, #validate_model
Methods included from Util::Docs
#desc, #dochook, #doctable, #markdown_definitionlist, #markdown_header, #nodoc?, #pad, scrub
Constructor Details
#initialize(*args) ⇒ Queue
Returns a new instance of Queue.
25 26 27 |
# File 'lib/puppet/indirector/queue.rb', line 25 def initialize(*args) super end |
Class Method Details
.intern(message) ⇒ Object
converts the message from deserialized format to an actual model instance.
60 61 62 63 64 65 66 |
# File 'lib/puppet/indirector/queue.rb', line 60 def self.intern() result = nil benchmark :info, "Loaded queued #{indirection.name}" do result = model.convert_from(:pson, ) end result end |
.queue ⇒ Object
46 47 48 |
# File 'lib/puppet/indirector/queue.rb', line 46 def self.queue indirection_name end |
.subscribe ⇒ Object
Provides queue subscription functionality; for a given indirection, use this method on the terminus to subscribe to the indirection-specific queue. Your block will be executed per new indirection model received from the queue, with obj being the model instance.
71 72 73 74 75 76 77 78 79 |
# File 'lib/puppet/indirector/queue.rb', line 71 def self.subscribe client.subscribe(queue) do |msg| begin yield(self.intern(msg)) rescue => detail Puppet.log_exception(detail, "Error occurred with subscription to queue #{queue} for indirection #{indirection_name}: #{detail}") end end end |
Instance Method Details
#client ⇒ Object
Returns the singleton queue client object.
55 56 57 |
# File 'lib/puppet/indirector/queue.rb', line 55 def client self.class.client end |
#find(request) ⇒ Object
Queue has no idiomatic “find”
30 31 32 |
# File 'lib/puppet/indirector/queue.rb', line 30 def find(request) nil end |
#queue ⇒ Object
50 51 52 |
# File 'lib/puppet/indirector/queue.rb', line 50 def queue self.class.queue end |
#save(request) ⇒ Object
Place the request on the queue
35 36 37 38 39 40 41 42 43 44 |
# File 'lib/puppet/indirector/queue.rb', line 35 def save(request) result = nil benchmark :info, "Queued #{indirection.name} for #{request.key}" do result = client.(queue, request.instance.render(:pson)) end result rescue => detail msg = "Could not write #{request.key} to queue: #{detail}\nInstance::#{request.instance}\n client : #{client}" raise Puppet::Error, msg, detail.backtrace end |