Class: PubControl

Inherits:
Object
  • Object
show all
Defined in:
lib/pubcontrol.rb

Overview

The PubControl class allows a consumer to manage a set of publishing endpoints and to publish to all of those endpoints via a single publish or publish_async method call. A PubControl instance can be configured either using a hash or array of hashes containing configuration information or by manually adding PubControlClient instances.

Constant Summary collapse

@@pubcontrols =

The global list of PubControl instances used to ensure that each instance is properly closed on exit.

Array.new
@@lock =
Mutex.new

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = nil) ⇒ PubControl

Initialize with or without a configuration. A configuration can be applied after initialization via the apply_config method.



32
33
34
35
36
37
38
39
40
41
# File 'lib/pubcontrol.rb', line 32

def initialize(config=nil)
  @clients = Array.new
  @closed = false
  if !config.nil?
    apply_config(config)
  end
  @@lock.synchronize do
    @@pubcontrols.push(self)
  end
end

Class Method Details

.close_pubcontrolsObject

An internal method used for closing all existing PubControl instances.



131
132
133
134
135
136
137
138
# File 'lib/pubcontrol.rb', line 131

def self.close_pubcontrols
  @@lock.synchronize do
    pubcontrols = Array.new(@@pubcontrols)
    pubcontrols.each do |pub|
      pub.close_locked
    end
  end
end

Instance Method Details

#add_client(client) ⇒ Object

Add the specified PubControlClient instance.



49
50
51
# File 'lib/pubcontrol.rb', line 49

def add_client(client)
  @clients.push(client)
end

#apply_config(config) ⇒ Object

Apply the specified configuration to this PubControl instance. The configuration object can either be a hash or an array of hashes where each hash corresponds to a single PubControlClient instance. Each hash will be parsed and a PubControlClient will be created either using just a URI or a URI and JWT authentication information.



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/pubcontrol.rb', line 58

def apply_config(config)
  if !config.is_a?(Array)
    config = [config]
  end
  config.each do |entry|
    pub = PubControlClient.new(entry['uri'])
    if entry.key?('iss')
      pub.set_auth_jwt({'iss' => entry['iss']}, entry['key'])
    end
    @clients.push(pub)
  end
end

#closeObject

The close method is a blocking call that closes all ZMQ sockets and ensures that all PubControlClient async publishing is completed prior to returning and allowing the consumer to proceed. Note that the PubControl instance cannot be used after calling this method.



75
76
77
78
# File 'lib/pubcontrol.rb', line 75

def close
  close_clients
  unregister_pubcontrol
end

#close_lockedObject

Internal close method, used during shutdown while class lock is held.



81
82
83
84
# File 'lib/pubcontrol.rb', line 81

def close_locked
  close_clients
  unregister_pubcontrol_locked
end

#finishObject

DEPRECATED: The finish method is now deprecated in favor of the more descriptive wait_all_sent method.



100
101
102
103
# File 'lib/pubcontrol.rb', line 100

def finish
  verify_not_closed
  wait_all_sent
end

#publish(channels, item) ⇒ Object

The synchronous publish method for publishing the specified item to the specified channels for all of the configured PubControlClient instances.



107
108
109
110
111
# File 'lib/pubcontrol.rb', line 107

def publish(channels, item)
  @clients.each do |pub|
    pub.publish(channels, item)
  end
end

#publish_async(channels, item, callback = nil) ⇒ Object

The asynchronous publish method for publishing the specified item to the specified channels on the configured endpoint. The callback method is optional and will be passed the publishing results after publishing is complete. Note that a failure to publish in any of the configured PubControlClient instances will result in a failure result being passed to the callback method along with the first encountered error message.



119
120
121
122
123
124
125
126
127
128
# File 'lib/pubcontrol.rb', line 119

def publish_async(channels, item, callback=nil)
  cb = nil
  if !callback.nil?
    cb = PubControlClientCallbackHandler.new(@clients.length, callback).
        handler_method_symbol
  end
  @clients.each do |pub|
    pub.publish_async(channels, item, cb)
  end
end

#remove_all_clientsObject

Remove all of the configured PubControlClient instances.



44
45
46
# File 'lib/pubcontrol.rb', line 44

def remove_all_clients
  @clients = Array.new
end

#wait_all_sentObject

This method is a blocking method that ensures that all asynchronous publishing is complete for all of the configured client instances prior to returning and allowing the consumer to proceed. NOTE: This only applies to PubControlClient and not ZmqPubControlClient since all ZMQ socket operations are non-blocking.



91
92
93
94
95
96
# File 'lib/pubcontrol.rb', line 91

def wait_all_sent
  verify_not_closed
  @clients.each do |pub|
    pub.wait_all_sent
  end
end