Class: OmfCommon::Comm
- Inherits:
-
Object
- Object
- OmfCommon::Comm
- Defined in:
- lib/omf_common/comm.rb,
lib/omf_common/comm/topic.rb,
lib/omf_common/comm/xmpp/topic.rb,
lib/omf_common/comm/amqp/amqp_mp.rb,
lib/omf_common/comm/amqp/amqp_topic.rb,
lib/omf_common/comm/local/local_topic.rb,
lib/omf_common/comm/xmpp/communicator.rb,
lib/omf_common/comm/amqp/amqp_communicator.rb,
lib/omf_common/comm/local/local_communicator.rb
Overview
PubSub communication class, can be extended with different implementations
Direct Known Subclasses
Defined Under Namespace
Classes: AMQP, Local, Topic, XMPP
Constant Summary collapse
- @@providers =
{ xmpp: { require: 'omf_common/comm/xmpp/communicator', constructor: 'OmfCommon::Comm::XMPP::Communicator', message_provider: { type: :xml } }, amqp: { require: 'omf_common/comm/amqp/amqp_communicator', constructor: 'OmfCommon::Comm::AMQP::Communicator', message_provider: { type: :json } }, local: { require: 'omf_common/comm/local/local_communicator', constructor: 'OmfCommon::Comm::Local::Communicator', message_provider: { type: :json } } }
- @@instance =
nil
Class Method Summary collapse
-
.init(opts) ⇒ Object
opts: :type - pre installed comms provider :provider - custom provider (opts) :require - gem to load first (opts) :constructor - Class implementing provider.
- .instance ⇒ Object
Instance Method Summary collapse
-
#conn_info ⇒ Hash
Returning connection information.
-
#create_topic(topic, opts = {}) ⇒ Object
Create a new pubsub topic with additional configuration.
-
#delete_topic(topic, &block) ⇒ Object
Delete a pubsub topic.
-
#disconnect(opts = {}) ⇒ Object
Shut down comms layer.
-
#init(opts = {}) ⇒ Object
Initialize comms layer.
-
#local_address ⇒ Object
Return the address used for all ‘generic’ messages not specifically being sent from a resource.
- #local_topic ⇒ Object
- #on_connected(&block) ⇒ Object
-
#on_interrupted(*args, &block) ⇒ Object
TODO should expand this to on_signal(:INT).
-
#options ⇒ Object
Return the options used to initiate this communicator.
-
#publish(topic_name, message) ⇒ Object
Publish a message on a topic.
-
#string_to_topic_address(a_string) ⇒ Object
Take a string and use it to generate a valid topic address for this type of communicator Must be implemented by subclasses.
-
#subscribe(topic_name, opts = {}, &block) ⇒ Object
Subscribe to a pubsub topic.
Class Method Details
.init(opts) ⇒ Object
opts:
:type - pre installed comms provider
:provider - custom provider (opts)
:require - gem to load first (opts)
:constructor - Class implementing provider
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/omf_common/comm.rb', line 44 def self.init(opts) unless @@instance unless provider = opts[:provider] unless type = opts[:type] if url = opts[:url] type = url.split(':')[0].to_sym end end provider = @@providers[type] end unless provider raise ArgumentError, "Missing Comm provider declaration. Either define 'type', 'provider', or 'url'" end require provider[:require] if provider[:require] if class_name = provider[:constructor] provider_class = class_name.split('::').inject(Object) {|c,n| c.const_get(n) } inst = provider_class.new(opts) else raise ArgumentError, "Missing communicator creation info - :constructor" end @@instance = inst mopts = provider[:message_provider] mopts[:authenticate] = opts[:auth] Message.init(mopts) if aopts = opts[:auth] require 'omf_common/auth' OmfCommon::Auth.init(aopts) end inst.init(opts) end end |
.instance ⇒ Object
80 81 82 |
# File 'lib/omf_common/comm.rb', line 80 def self.instance @@instance end |
Instance Method Details
#conn_info ⇒ Hash
Returning connection information
130 131 132 |
# File 'lib/omf_common/comm.rb', line 130 def conn_info { proto: nil, user: nil, domain: nil } end |
#create_topic(topic, opts = {}) ⇒ Object
Create a new pubsub topic with additional configuration
116 117 118 |
# File 'lib/omf_common/comm.rb', line 116 def create_topic(topic, opts = {}) raise NotImplementedError end |
#delete_topic(topic, &block) ⇒ Object
Delete a pubsub topic
123 124 125 |
# File 'lib/omf_common/comm.rb', line 123 def delete_topic(topic, &block) raise NotImplementedError end |
#disconnect(opts = {}) ⇒ Object
Shut down comms layer
101 102 103 |
# File 'lib/omf_common/comm.rb', line 101 def disconnect(opts = {}) raise NotImplementedError end |
#init(opts = {}) ⇒ Object
Initialize comms layer
86 87 |
# File 'lib/omf_common/comm.rb', line 86 def init(opts = {}) end |
#local_address ⇒ Object
Return the address used for all ‘generic’ messages not specifically being sent from a resource
92 93 94 |
# File 'lib/omf_common/comm.rb', line 92 def local_address() @local_topic.address end |
#local_topic ⇒ Object
96 97 98 |
# File 'lib/omf_common/comm.rb', line 96 def local_topic() @local_topic end |
#on_connected(&block) ⇒ Object
105 106 107 |
# File 'lib/omf_common/comm.rb', line 105 def on_connected(&block) raise NotImplementedError end |
#on_interrupted(*args, &block) ⇒ Object
TODO should expand this to on_signal(:INT)
110 111 |
# File 'lib/omf_common/comm.rb', line 110 def on_interrupted(*args, &block) end |
#options ⇒ Object
Return the options used to initiate this communicator.
182 183 184 |
# File 'lib/omf_common/comm.rb', line 182 def () @opts end |
#publish(topic_name, message) ⇒ Object
Publish a message on a topic
170 171 172 173 174 175 176 177 |
# File 'lib/omf_common/comm.rb', line 170 def publish(topic_name, ) #puts "PUBLISH>>>>> #{topic_name}::#{message}" tna = (topic_name.is_a? Array) ? topic_name : [topic_name] ta = tna.collect do |tn| t = create_topic(tn) t.publish() end end |
#string_to_topic_address(a_string) ⇒ Object
Take a string and use it to generate a valid topic address for this type of communicator Must be implemented by subclasses
This may be used when we construct an FRCP Configure message, which requests some resources to subscribe to a topic, which has not yet been created at the time of this message’s construction, but which will be created before this message is published. (an example of such case can be found in OMF EC Group handling code)
141 142 143 |
# File 'lib/omf_common/comm.rb', line 141 def string_to_topic_address(a_string) raise NotImplementedError end |
#subscribe(topic_name, opts = {}, &block) ⇒ Object
Subscribe to a pubsub topic
151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/omf_common/comm.rb', line 151 def subscribe(topic_name, opts = {}, &block) tna = (topic_name.is_a? Array) ? topic_name : [topic_name] ta = tna.collect do |tn| t = create_topic(tn.to_s) if block t.on_subscribed do block.call(t) end end t end ta[0] end |