Class: OmfCommon::Comm::XMPP::Communicator
- Inherits:
-
OmfCommon::Comm
- Object
- OmfCommon::Comm
- OmfCommon::Comm::XMPP::Communicator
- Includes:
- Blather::DSL
- Defined in:
- lib/omf_common/comm/xmpp/communicator.rb
Constant Summary collapse
- HOST_PREFIX =
'pubsub'- PUBSUB_CONFIGURE =
Blather::Stanza::X.new({ :type => :submit, :fields => [ { :var => "FORM_TYPE", :type => 'hidden', :value => "http://jabber.org/protocol/pubsub#node_config" }, { :var => "pubsub#persist_items", :value => "0" }, { :var => "pubsub#purge_offline", :value => "1" }, { :var => "pubsub#send_last_published_item", :value => "never" }, { :var => "pubsub#notify_retract", :value => "0" }, { :var => "pubsub#publish_model", :value => "open" }] })
Instance Attribute Summary collapse
-
#normal_shutdown_mode ⇒ Object
Returns the value of attribute normal_shutdown_mode.
-
#published_messages ⇒ Object
Returns the value of attribute published_messages.
-
#retry_counter ⇒ Object
Returns the value of attribute retry_counter.
Instance Method Summary collapse
- #_create(topic, pubsub_host = default_host, &block) ⇒ Object
- #_subscribe(topic, pubsub_host = default_host, &block) ⇒ Object
- #affiliations(pubsub_host = default_host, &block) ⇒ Object
- #conn_info ⇒ Object
-
#connect(username, password, server) ⇒ Object
Set up XMPP options and start the Eventmachine, connect to XMPP server.
-
#create_topic(topic, opts = {}) ⇒ Object
Create a new pubsub topic with additional configuration.
-
#delete_topic(topic, pubsub_host = default_host, &block) ⇒ Object
Delete a pubsub topic.
-
#disconnect(opts = {}) ⇒ Object
Shut down XMPP connection.
-
#init(opts = {}) ⇒ Object
Set up XMPP options and start the Eventmachine, connect to XMPP server.
- #on_connected(&block) ⇒ Object
-
#on_interrupted(&block) ⇒ Object
Capture system :INT & :TERM signal.
-
#publish(topic, message, pubsub_host = default_host, &block) ⇒ Object
Publish to a pubsub topic.
-
#subscribe(topic, opts = {}, &block) ⇒ Object
Subscribe to a pubsub topic.
-
#topic_event(additional_guard = nil, &block) ⇒ Object
Event callback for pubsub topic event(item published).
-
#unsubscribe(pubsub_host = default_host) ⇒ Object
Un-subscribe all existing subscriptions from all pubsub topics.
Methods inherited from OmfCommon::Comm
init, instance, #local_address, #local_topic, #options
Instance Attribute Details
#normal_shutdown_mode ⇒ Object
Returns the value of attribute normal_shutdown_mode.
19 20 21 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 19 def normal_shutdown_mode @normal_shutdown_mode end |
#published_messages ⇒ Object
Returns the value of attribute published_messages.
19 20 21 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 19 def @published_messages end |
#retry_counter ⇒ Object
Returns the value of attribute retry_counter.
19 20 21 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 19 def retry_counter @retry_counter end |
Instance Method Details
#_create(topic, pubsub_host = default_host, &block) ⇒ Object
143 144 145 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 143 def _create(topic, pubsub_host = default_host, &block) pubsub.create(topic, pubsub_host, PUBSUB_CONFIGURE, &callback_logging(__method__, topic, &block)) end |
#_subscribe(topic, pubsub_host = default_host, &block) ⇒ Object
139 140 141 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 139 def _subscribe(topic, pubsub_host = default_host, &block) pubsub.subscribe(topic, nil, pubsub_host, &callback_logging(__method__, topic, &block)) end |
#affiliations(pubsub_host = default_host, &block) ⇒ Object
157 158 159 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 157 def affiliations(pubsub_host = default_host, &block) pubsub.affiliations(pubsub_host, &callback_logging(__method__, &block)) end |
#conn_info ⇒ Object
34 35 36 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 34 def conn_info { proto: :xmpp, user: jid.node, domain: jid.domain } end |
#connect(username, password, server) ⇒ Object
Set up XMPP options and start the Eventmachine, connect to XMPP server
97 98 99 100 101 102 103 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 97 def connect(username, password, server) info "Connecting to '#{server}' ..." jid = "#{username}@#{server}" client.setup(jid, password) client.run MPConnection.inject(Time.now.to_f, jid, 'connect') if OmfCommon::Measure.enabled? end |
#create_topic(topic, opts = {}) ⇒ Object
Create a new pubsub topic with additional configuration
117 118 119 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 117 def create_topic(topic, opts = {}) OmfCommon::Comm::XMPP::Topic.create(topic) end |
#delete_topic(topic, pubsub_host = default_host, &block) ⇒ Object
Delete a pubsub topic
124 125 126 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 124 def delete_topic(topic, pubsub_host = default_host, &block) pubsub.delete(topic, pubsub_host, &callback_logging(__method__, topic, &block)) end |
#disconnect(opts = {}) ⇒ Object
Shut down XMPP connection
106 107 108 109 110 111 112 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 106 def disconnect(opts = {}) # NOTE Do not clean up info "Disconnecting ..." @normal_shutdown_mode = true shutdown OmfCommon::DSL::Xmpp::MPConnection.inject(Time.now.to_f, jid, 'disconnect') if OmfCommon::Measure.enabled? end |
#init(opts = {}) ⇒ Object
Set up XMPP options and start the Eventmachine, connect to XMPP server
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 49 def init(opts = {}) @pubsub_host = opts[:pubsub_domain] if opts[:url] url = URI(opts[:url]) username, password, server = url.user, url.password, url.host else username, password, server = opts[:username], opts[:password], opts[:server] end random_name = "#{Socket.gethostname}-#{Process.pid}" username ||= random_name password ||= random_name raise ArgumentError, "Username cannot be nil when connect to XMPP" if username.nil? raise ArgumentError, "Password cannot be nil when connect to XMPP" if password.nil? raise ArgumentError, "Server cannot be nil when connect to XMPP" if server.nil? @retry_counter = 0 @normal_shutdown_mode = false connect(username, password, server) when_ready do @cbks[:connected].each { |cbk| cbk.call(self) } end disconnected do unless normal_shutdown_mode unless retry_counter > 0 @retry_counter += 1 client.connect else error "Authentication failed." OmfCommon.eventloop.stop end else shutdown end end trap(:INT) { @cbks[:interpreted].empty? ? disconnect : @cbks[:interpreted].each { |cbk| cbk.call(self) } } trap(:TERM) { @cbks[:interpreted].empty? ? disconnect : @cbks[:interpreted].each { |cbk| cbk.call(self) } } super end |
#on_connected(&block) ⇒ Object
43 44 45 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 43 def on_connected(&block) @cbks[:connected] << block end |
#on_interrupted(&block) ⇒ Object
Capture system :INT & :TERM signal
39 40 41 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 39 def on_interrupted(&block) @cbks[:interpreted] << block end |
#publish(topic, message, pubsub_host = default_host, &block) ⇒ Object
Publish to a pubsub topic
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 165 def publish(topic, , pubsub_host = default_host, &block) raise StandardError, "Invalid message" unless .valid? = .marshall[1] unless .kind_of? String if .nil? debug "Cannot publish empty message, using authentication and not providing a proper cert?" return nil end new_block = proc do |stanza| << OpenSSL::Digest::SHA1.new(.to_s) block.call(stanza) if block end pubsub.publish(topic, , pubsub_host, &callback_logging(__method__, topic, &new_block)) MPPublished.inject(Time.now.to_f, jid, topic, .to_s.gsub("\n",'')) if OmfCommon::Measure.enabled? end |
#subscribe(topic, opts = {}, &block) ⇒ Object
Subscribe to a pubsub topic
133 134 135 136 137 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 133 def subscribe(topic, opts = {}, &block) topic = topic.first if topic.is_a? Array OmfCommon::Comm::XMPP::Topic.create(topic, &block) MPSubscription.inject(Time.now.to_f, jid, 'join', topic) if OmfCommon::Measure.enabled? end |
#topic_event(additional_guard = nil, &block) ⇒ Object
Event callback for pubsub topic event(item published)
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 185 def topic_event(additional_guard = nil, &block) guard_block = proc do |event| passed = !event.delayed? && event.items? && !event.items.first.payload.nil? #&& #!published_messages.include?(OpenSSL::Digest::SHA1.new(event.items.first.payload)) MPReceived.inject(Time.now.to_f, jid, event.node, event.items.first.payload.to_s.gsub("\n",'')) if OmfCommon::Measure.enabled? && passed if additional_guard passed && additional_guard.call(event) else passed end end pubsub_event(guard_block, &callback_logging(__method__, &block)) end |
#unsubscribe(pubsub_host = default_host) ⇒ Object
Un-subscribe all existing subscriptions from all pubsub topics.
148 149 150 151 152 153 154 155 |
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 148 def unsubscribe(pubsub_host = default_host) pubsub.subscriptions(pubsub_host) do |m| m[:subscribed] && m[:subscribed].each do |s| pubsub.unsubscribe(s[:node], nil, s[:subid], pubsub_host, &callback_logging(__method__, s[:node], s[:subid])) MPSubscription.inject(Time.now.to_f, jid, 'leave', s[:node]) if OmfCommon::Measure.enabled? end end end |