Class: OmfCommon::Comm
- Inherits:
-
Object
- Object
- OmfCommon::Comm
- Defined in:
- lib/omf_common/comm.rb,
lib/omf_common/comm/topic.rb,
lib/omf_common/comm/xmpp/topic.rb,
lib/omf_common/comm/amqp/amqp_mp.rb,
lib/omf_common/comm/amqp/amqp_topic.rb,
lib/omf_common/comm/local/local_topic.rb,
lib/omf_common/comm/xmpp/communicator.rb,
lib/omf_common/comm/amqp/amqp_communicator.rb,
lib/omf_common/comm/local/local_communicator.rb
Overview
PubSub communication class, can be extended with different implementations
Direct Known Subclasses
Defined Under Namespace
Classes: AMQP, Local, Topic, XMPP
Constant Summary collapse
- @@providers =
{ xmpp: { require: 'omf_common/comm/xmpp/communicator', constructor: 'OmfCommon::Comm::XMPP::Communicator', message_provider: { type: :xml } }, amqp: { require: 'omf_common/comm/amqp/amqp_communicator', constructor: 'OmfCommon::Comm::AMQP::Communicator', message_provider: { type: :json } }, local: { require: 'omf_common/comm/local/local_communicator', constructor: 'OmfCommon::Comm::Local::Communicator', message_provider: { type: :json } } }
- @@instance =
nil
Class Method Summary collapse
-
.init(opts) ⇒ Object
opts: :type - pre installed comms provider :provider - custom provider (opts) :require - gem to load first (opts) :constructor - Class implementing provider.
- .instance ⇒ Object
Instance Method Summary collapse
-
#conn_info ⇒ Hash
Returning connection information.
-
#create_topic(topic, opts = {}) ⇒ Object
Create a new pubsub topic with additional configuration.
-
#delete_topic(topic, &block) ⇒ Object
Delete a pubsub topic.
-
#disconnect(opts = {}) ⇒ Object
Shut down comms layer.
-
#init(opts = {}) ⇒ Object
Initialize comms layer.
-
#local_address ⇒ Object
Return the address used for all ‘generic’ messages not specifically being sent from a resource.
- #local_topic ⇒ Object
- #on_connected(&block) ⇒ Object
-
#on_interrupted(*args, &block) ⇒ Object
TODO should expand this to on_signal(:INT).
-
#options ⇒ Object
Return the options used to initiate this communicator.
-
#publish(topic_name, message) ⇒ Object
Publish a message on a topic.
-
#string_to_topic_address(a_string) ⇒ Object
Take a string and use it to generate a valid topic address for this type of communicator Must be implemented by subclasses.
-
#subscribe(topic_name, opts = {}, &block) ⇒ Object
Subscribe to a pubsub topic.
Class Method Details
.init(opts) ⇒ Object
opts:
:type - pre installed comms provider
:provider - custom provider (opts)
:require - gem to load first (opts)
:constructor - Class implementing provider
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/omf_common/comm.rb', line 44 def self.init(opts) if @@instance raise "Comms layer already initialised" end unless provider = opts[:provider] unless type = opts[:type] if url = opts[:url] type = url.split(':')[0].to_sym end end provider = @@providers[type] end unless provider raise ArgumentError, "Missing Comm provider declaration. Either define 'type', 'provider', or 'url'" end require provider[:require] if provider[:require] if class_name = provider[:constructor] provider_class = class_name.split('::').inject(Object) {|c,n| c.const_get(n) } inst = provider_class.new(opts) else raise ArgumentError, "Missing communicator creation info - :constructor" end @@instance = inst mopts = provider[:message_provider] mopts[:authenticate] = opts[:auth] Message.init(mopts) if aopts = opts[:auth] require 'omf_common/auth' OmfCommon::Auth.init(aopts) end inst.init(opts) end |
.instance ⇒ Object
81 82 83 |
# File 'lib/omf_common/comm.rb', line 81 def self.instance @@instance end |
Instance Method Details
#conn_info ⇒ Hash
Returning connection information
131 132 133 |
# File 'lib/omf_common/comm.rb', line 131 def conn_info { proto: nil, user: nil, domain: nil } end |
#create_topic(topic, opts = {}) ⇒ Object
Create a new pubsub topic with additional configuration
117 118 119 |
# File 'lib/omf_common/comm.rb', line 117 def create_topic(topic, opts = {}) raise NotImplementedError end |
#delete_topic(topic, &block) ⇒ Object
Delete a pubsub topic
124 125 126 |
# File 'lib/omf_common/comm.rb', line 124 def delete_topic(topic, &block) raise NotImplementedError end |
#disconnect(opts = {}) ⇒ Object
Shut down comms layer
102 103 104 |
# File 'lib/omf_common/comm.rb', line 102 def disconnect(opts = {}) raise NotImplementedError end |
#init(opts = {}) ⇒ Object
Initialize comms layer
87 88 |
# File 'lib/omf_common/comm.rb', line 87 def init(opts = {}) end |
#local_address ⇒ Object
Return the address used for all ‘generic’ messages not specifically being sent from a resource
93 94 95 |
# File 'lib/omf_common/comm.rb', line 93 def local_address() @local_topic.address end |
#local_topic ⇒ Object
97 98 99 |
# File 'lib/omf_common/comm.rb', line 97 def local_topic() @local_topic end |
#on_connected(&block) ⇒ Object
106 107 108 |
# File 'lib/omf_common/comm.rb', line 106 def on_connected(&block) raise NotImplementedError end |
#on_interrupted(*args, &block) ⇒ Object
TODO should expand this to on_signal(:INT)
111 112 |
# File 'lib/omf_common/comm.rb', line 111 def on_interrupted(*args, &block) end |
#options ⇒ Object
Return the options used to initiate this communicator.
183 184 185 |
# File 'lib/omf_common/comm.rb', line 183 def () @opts end |
#publish(topic_name, message) ⇒ Object
Publish a message on a topic
171 172 173 174 175 176 177 178 |
# File 'lib/omf_common/comm.rb', line 171 def publish(topic_name, ) #puts "PUBLISH>>>>> #{topic_name}::#{message}" tna = (topic_name.is_a? Array) ? topic_name : [topic_name] ta = tna.collect do |tn| t = create_topic(tn) t.publish() end end |
#string_to_topic_address(a_string) ⇒ Object
Take a string and use it to generate a valid topic address for this type of communicator Must be implemented by subclasses
This may be used when we construct an FRCP Configure message, which requests some resources to subscribe to a topic, which has not yet been created at the time of this message’s construction, but which will be created before this message is published. (an example of such case can be found in OMF EC Group handling code)
142 143 144 |
# File 'lib/omf_common/comm.rb', line 142 def string_to_topic_address(a_string) raise NotImplementedError end |
#subscribe(topic_name, opts = {}, &block) ⇒ Object
Subscribe to a pubsub topic
152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/omf_common/comm.rb', line 152 def subscribe(topic_name, opts = {}, &block) tna = (topic_name.is_a? Array) ? topic_name : [topic_name] ta = tna.collect do |tn| t = create_topic(tn.to_s) if block t.on_subscribed do block.call(t) end end t end ta[0] end |