Class: OmfCommon::Comm::XMPP::Communicator

Inherits:
OmfCommon::Comm show all
Includes:
Blather::DSL
Defined in:
lib/omf_common/comm/xmpp/communicator.rb

Constant Summary collapse

HOST_PREFIX =
'pubsub'
PUBSUB_CONFIGURE =
Blather::Stanza::X.new({
  :type => :submit,
  :fields => [
    { :var => "FORM_TYPE", :type => 'hidden', :value => "http://jabber.org/protocol/pubsub#node_config" },
    { :var => "pubsub#persist_items", :value => "0" },
    { :var => "pubsub#purge_offline", :value => "1" },
    { :var => "pubsub#send_last_published_item", :value => "never" },
    { :var => "pubsub#notify_retract",  :value => "0" },
    { :var => "pubsub#publish_model", :value => "open" }]
})

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from OmfCommon::Comm

init, instance, #local_address, #local_topic, #options

Instance Attribute Details

#normal_shutdown_modeObject

Returns the value of attribute normal_shutdown_mode.



19
20
21
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 19

def normal_shutdown_mode
  @normal_shutdown_mode
end

#published_messagesObject

Returns the value of attribute published_messages.



19
20
21
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 19

def published_messages
  @published_messages
end

#retry_counterObject

Returns the value of attribute retry_counter.



19
20
21
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 19

def retry_counter
  @retry_counter
end

Instance Method Details

#_create(topic, pubsub_host = default_host, &block) ⇒ Object



143
144
145
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 143

def _create(topic, pubsub_host = default_host, &block)
  pubsub.create(topic, pubsub_host, PUBSUB_CONFIGURE, &callback_logging(__method__, topic, &block))
end

#_subscribe(topic, pubsub_host = default_host, &block) ⇒ Object



139
140
141
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 139

def (topic, pubsub_host = default_host, &block)
  pubsub.subscribe(topic, nil, pubsub_host, &callback_logging(__method__, topic, &block))
end

#affiliations(pubsub_host = default_host, &block) ⇒ Object



157
158
159
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 157

def affiliations(pubsub_host = default_host, &block)
  pubsub.affiliations(pubsub_host, &callback_logging(__method__, &block))
end

#conn_infoObject



34
35
36
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 34

def conn_info
  { proto: :xmpp, user: jid.node, domain: jid.domain }
end

#connect(username, password, server) ⇒ Object

Set up XMPP options and start the Eventmachine, connect to XMPP server



97
98
99
100
101
102
103
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 97

def connect(username, password, server)
  info "Connecting to '#{server}' ..."
  jid = "#{username}@#{server}"
  client.setup(jid, password)
  client.run
  MPConnection.inject(Time.now.to_f, jid, 'connect') if OmfCommon::Measure.enabled?
end

#create_topic(topic, opts = {}) ⇒ Object

Create a new pubsub topic with additional configuration

Parameters:

  • topic (String)

    Pubsub topic name



117
118
119
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 117

def create_topic(topic, opts = {})
  OmfCommon::Comm::XMPP::Topic.create(topic)
end

#delete_topic(topic, pubsub_host = default_host, &block) ⇒ Object

Delete a pubsub topic

Parameters:

  • topic (String)

    Pubsub topic name



124
125
126
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 124

def delete_topic(topic, pubsub_host = default_host, &block)
  pubsub.delete(topic, pubsub_host, &callback_logging(__method__, topic, &block))
end

#disconnect(opts = {}) ⇒ Object

Shut down XMPP connection



106
107
108
109
110
111
112
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 106

def disconnect(opts = {})
  # NOTE Do not clean up
  info "Disconnecting ..."
  @normal_shutdown_mode = true
  shutdown
  OmfCommon::DSL::Xmpp::MPConnection.inject(Time.now.to_f, jid, 'disconnect') if OmfCommon::Measure.enabled?
end

#init(opts = {}) ⇒ Object

Set up XMPP options and start the Eventmachine, connect to XMPP server

Raises:

  • (ArgumentError)


49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 49

def init(opts = {})
  @pubsub_host = opts[:pubsub_domain]
  if opts[:url]
    url = URI(opts[:url])
    username, password, server = url.user, url.password, url.host
  else
    username, password, server = opts[:username], opts[:password], opts[:server]
  end

  random_name = "#{Socket.gethostname}-#{Process.pid}"
  username ||= random_name
  password ||= random_name

  raise ArgumentError, "Username cannot be nil when connect to XMPP" if username.nil?
  raise ArgumentError, "Password cannot be nil when connect to XMPP" if password.nil?
  raise ArgumentError, "Server cannot be nil when connect to XMPP" if server.nil?

  @retry_counter = 0
  @normal_shutdown_mode = false

  connect(username, password, server)

  when_ready do
    @cbks[:connected].each { |cbk| cbk.call(self) }
  end

  disconnected do
    unless normal_shutdown_mode
      unless retry_counter > 0
        @retry_counter += 1
        client.connect
      else
        error "Authentication failed."
        OmfCommon.eventloop.stop
      end
    else
      shutdown
    end
  end

  trap(:INT) { @cbks[:interpreted].empty? ? disconnect : @cbks[:interpreted].each { |cbk| cbk.call(self) } }
  trap(:TERM) { @cbks[:interpreted].empty? ? disconnect : @cbks[:interpreted].each { |cbk| cbk.call(self) } }

  super
end

#on_connected(&block) ⇒ Object



43
44
45
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 43

def on_connected(&block)
  @cbks[:connected] << block
end

#on_interrupted(&block) ⇒ Object

Capture system :INT & :TERM signal



39
40
41
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 39

def on_interrupted(&block)
  @cbks[:interpreted] << block
end

#publish(topic, message, pubsub_host = default_host, &block) ⇒ Object

Publish to a pubsub topic

Parameters:

Raises:

  • (StandardError)


165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 165

def publish(topic, message, pubsub_host = default_host, &block)
  raise StandardError, "Invalid message" unless message.valid?

  message = message.marshall[1] unless message.kind_of? String
  if message.nil?
    debug "Cannot publish empty message, using authentication and not providing a proper cert?"
    return nil
  end

  new_block = proc do |stanza|
    published_messages << OpenSSL::Digest::SHA1.new(message.to_s)
    block.call(stanza) if block
  end

  pubsub.publish(topic, message, pubsub_host, &callback_logging(__method__, topic, &new_block))
  MPPublished.inject(Time.now.to_f, jid, topic, message.to_s.gsub("\n",'')) if OmfCommon::Measure.enabled?
end

#subscribe(topic, opts = {}, &block) ⇒ Object

Subscribe to a pubsub topic

Parameters:

  • topic (String)

    Pubsub topic name

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :create_if_non_existent (Boolean)

    create the topic if non-existent, use this option with caution



133
134
135
136
137
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 133

def subscribe(topic, opts = {}, &block)
  topic = topic.first if topic.is_a? Array
  OmfCommon::Comm::XMPP::Topic.create(topic, &block)
  MPSubscription.inject(Time.now.to_f, jid, 'join', topic) if OmfCommon::Measure.enabled?
end

#topic_event(additional_guard = nil, &block) ⇒ Object

Event callback for pubsub topic event(item published)



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 185

def topic_event(additional_guard = nil, &block)
  guard_block = proc do |event|
    passed = !event.delayed? && event.items? && !event.items.first.payload.nil? #&&
      #!published_messages.include?(OpenSSL::Digest::SHA1.new(event.items.first.payload))

    MPReceived.inject(Time.now.to_f, jid, event.node, event.items.first.payload.to_s.gsub("\n",'')) if OmfCommon::Measure.enabled? && passed

    if additional_guard
      passed && additional_guard.call(event)
    else
      passed
    end
  end
  pubsub_event(guard_block, &callback_logging(__method__, &block))
end

#unsubscribe(pubsub_host = default_host) ⇒ Object

Un-subscribe all existing subscriptions from all pubsub topics.



148
149
150
151
152
153
154
155
# File 'lib/omf_common/comm/xmpp/communicator.rb', line 148

def unsubscribe(pubsub_host = default_host)
  pubsub.subscriptions(pubsub_host) do |m|
    m[:subscribed] && m[:subscribed].each do |s|
      pubsub.unsubscribe(s[:node], nil, s[:subid], pubsub_host, &callback_logging(__method__, s[:node], s[:subid]))
      MPSubscription.inject(Time.now.to_f, jid, 'leave', s[:node]) if OmfCommon::Measure.enabled?
    end
  end
end