Class: PubControl
- Inherits:
-
Object
- Object
- PubControl
- Defined in:
- lib/pubcontrol.rb
Overview
The PubControl class allows a consumer to manage a set of publishing endpoints and to publish to all of those endpoints via a single publish or publish_async method call. A PubControl instance can be configured either using a hash or array of hashes containing configuration information or by manually adding PubControlClient instances.
Constant Summary collapse
- @@pubcontrols =
The global list of PubControl instances used to ensure that each instance is properly closed on exit.
Array.new
- @@lock =
Mutex.new
Class Method Summary collapse
-
.close_pubcontrols ⇒ Object
An internal method used for closing all existing PubControl instances.
Instance Method Summary collapse
-
#add_client(client) ⇒ Object
Add the specified PubControlClient instance.
-
#apply_config(config) ⇒ Object
Apply the specified configuration to this PubControl instance.
-
#close ⇒ Object
The close method is a blocking call that closes all ZMQ sockets and ensures that all PubControlClient async publishing is completed prior to returning and allowing the consumer to proceed.
-
#close_locked ⇒ Object
Internal close method, used during shutdown while class lock is held.
-
#finish ⇒ Object
DEPRECATED: The finish method is now deprecated in favor of the more descriptive wait_all_sent method.
-
#initialize(config = nil) ⇒ PubControl
constructor
Initialize with or without a configuration.
-
#publish(channels, item) ⇒ Object
The synchronous publish method for publishing the specified item to the specified channels for all of the configured PubControlClient instances.
-
#publish_async(channels, item, callback = nil) ⇒ Object
The asynchronous publish method for publishing the specified item to the specified channels on the configured endpoint.
-
#remove_all_clients ⇒ Object
Remove all of the configured PubControlClient instances.
-
#wait_all_sent ⇒ Object
This method is a blocking method that ensures that all asynchronous publishing is complete for all of the configured client instances prior to returning and allowing the consumer to proceed.
Constructor Details
#initialize(config = nil) ⇒ PubControl
Initialize with or without a configuration. A configuration can be applied after initialization via the apply_config method.
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/pubcontrol.rb', line 32 def initialize(config=nil) @clients = Array.new @closed = false if !config.nil? apply_config(config) end @@lock.synchronize do @@pubcontrols.push(self) end end |
Class Method Details
.close_pubcontrols ⇒ Object
An internal method used for closing all existing PubControl instances.
131 132 133 134 135 136 137 138 |
# File 'lib/pubcontrol.rb', line 131 def self.close_pubcontrols @@lock.synchronize do pubcontrols = Array.new(@@pubcontrols) pubcontrols.each do |pub| pub.close_locked end end end |
Instance Method Details
#add_client(client) ⇒ Object
Add the specified PubControlClient instance.
49 50 51 |
# File 'lib/pubcontrol.rb', line 49 def add_client(client) @clients.push(client) end |
#apply_config(config) ⇒ Object
Apply the specified configuration to this PubControl instance. The configuration object can either be a hash or an array of hashes where each hash corresponds to a single PubControlClient instance. Each hash will be parsed and a PubControlClient will be created either using just a URI or a URI and JWT authentication information.
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/pubcontrol.rb', line 58 def apply_config(config) if !config.is_a?(Array) config = [config] end config.each do |entry| pub = PubControlClient.new(entry['uri']) if entry.key?('iss') pub.set_auth_jwt({'iss' => entry['iss']}, entry['key']) end @clients.push(pub) end end |
#close ⇒ Object
The close method is a blocking call that closes all ZMQ sockets and ensures that all PubControlClient async publishing is completed prior to returning and allowing the consumer to proceed. Note that the PubControl instance cannot be used after calling this method.
75 76 77 78 |
# File 'lib/pubcontrol.rb', line 75 def close close_clients unregister_pubcontrol end |
#close_locked ⇒ Object
Internal close method, used during shutdown while class lock is held.
81 82 83 84 |
# File 'lib/pubcontrol.rb', line 81 def close_locked close_clients unregister_pubcontrol_locked end |
#finish ⇒ Object
DEPRECATED: The finish method is now deprecated in favor of the more descriptive wait_all_sent method.
100 101 102 103 |
# File 'lib/pubcontrol.rb', line 100 def finish verify_not_closed wait_all_sent end |
#publish(channels, item) ⇒ Object
The synchronous publish method for publishing the specified item to the specified channels for all of the configured PubControlClient instances.
107 108 109 110 111 |
# File 'lib/pubcontrol.rb', line 107 def publish(channels, item) @clients.each do |pub| pub.publish(channels, item) end end |
#publish_async(channels, item, callback = nil) ⇒ Object
The asynchronous publish method for publishing the specified item to the specified channels on the configured endpoint. The callback method is optional and will be passed the publishing results after publishing is complete. Note that a failure to publish in any of the configured PubControlClient instances will result in a failure result being passed to the callback method along with the first encountered error message.
119 120 121 122 123 124 125 126 127 128 |
# File 'lib/pubcontrol.rb', line 119 def publish_async(channels, item, callback=nil) cb = nil if !callback.nil? cb = PubControlClientCallbackHandler.new(@clients.length, callback). handler_method_symbol end @clients.each do |pub| pub.publish_async(channels, item, cb) end end |
#remove_all_clients ⇒ Object
Remove all of the configured PubControlClient instances.
44 45 46 |
# File 'lib/pubcontrol.rb', line 44 def remove_all_clients @clients = Array.new end |
#wait_all_sent ⇒ Object
This method is a blocking method that ensures that all asynchronous publishing is complete for all of the configured client instances prior to returning and allowing the consumer to proceed. NOTE: This only applies to PubControlClient and not ZmqPubControlClient since all ZMQ socket operations are non-blocking.
91 92 93 94 95 96 |
# File 'lib/pubcontrol.rb', line 91 def wait_all_sent verify_not_closed @clients.each do |pub| pub.wait_all_sent end end |