Class: OmfCommon::Comm::AMQP::Communicator

Inherits:
OmfCommon::Comm show all
Defined in:
lib/omf_common/comm/amqp/amqp_communicator.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from OmfCommon::Comm

init, instance, #local_address, #local_topic, #on_interrupted, #options, #publish, #subscribe

Instance Attribute Details

#channelObject (readonly)

end



19
20
21
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 19

def channel
  @channel
end

Instance Method Details

#broadcast_file(file_path, topic_name = nil, opts = {}, &block) ⇒ Object



98
99
100
101
102
103
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 98

def broadcast_file(file_path, topic_name = nil, opts = {}, &block)
  topic_name ||= SecureRandom.uuid
  require 'omf_common/comm/amqp/amqp_file_transfer'
  OmfCommon::Comm::AMQP::FileBroadcaster.new(file_path, @channel, topic_name, opts, &block)
  "bdcst:#{@address_prefix + topic_name}"
end

#conn_infoObject



41
42
43
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 41

def conn_info
  { proto: :amqp, user: ::AMQP.settings[:user], domain: ::AMQP.settings[:host] }
end

#create_topic(topic, opts = {}) ⇒ Object

Create a new pubsub topic with additional configuration



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 69

def create_topic(topic, opts = {})
  raise "Topic can't be nil or empty" if topic.nil? || topic.empty?
  opts = opts.dup
  opts[:communicator] = self
  topic = topic.to_s
  if topic.start_with? 'amqp:'
    # absolute address
    unless topic.start_with? @address_prefix
      raise "Cannot subscribe to a topic from different domain (#{topic})"
    end
    opts[:address] = topic
    topic = topic.split(@address_prefix).last
  else
    opts[:address] = @address_prefix + topic
  end
  OmfCommon::Comm::AMQP::Topic.create(topic, opts)
end

#delete_topic(topic, &block) ⇒ Object

Delete a pubsub topic



90
91
92
93
94
95
96
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 90

def delete_topic(topic, &block)
  if t = OmfCommon::CommProvider::AMQP::Topic.find(topic)
    t.release
  else
    warn "Attempt to delete unknown topic '#{topic}"
  end
end

#disconnect(opts = {}) ⇒ Object

Shut down comms layer



46
47
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 46

def disconnect(opts = {})
end

#init(opts = {}) ⇒ Object

Initialize comms layer



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 23

def init(opts = {})
  @opts = {
    #:ssl (Hash) TLS (SSL) parameters to use.
    heartbeat: 20, # (Fixnum) - default: 0 Connection heartbeat, in seconds. 0 means no heartbeat. Can also be configured server-side starting with RabbitMQ 3.0.
    #:on_tcp_connection_failure (#call) - A callable object that will be run if connection to server fails
    #:on_possible_authentication_failure (#call) - A callable object that will be run if authentication fails (see Authentication failure section)
    reconnect_delay: 20 # (Fixnum) - Delay in seconds before attempting reconnect on detected failure
  }.merge(opts)

  unless (@url = @opts.delete(:url))
    raise "Missing 'url' option for AQMP layer"
  end
  @address_prefix = @url + '/'
  _connect()
  #AMQP::Session#on_skipped_heartbeats callback that can be used to handle skipped heartbeats
  super
end

#on_connected(&block) ⇒ Object

TODO: Should be thread safe and check if already connected



50
51
52
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 50

def on_connected(&block)
  @on_connected_procs << block
end

#on_reconnect(key, &block) ⇒ Object

register callbacks to be called when the underlying AMQP layer needs to reconnect to the AMQP server. This may require some additional repairs. If ‘block’ is nil, the callback is removed



58
59
60
61
62
63
64
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 58

def on_reconnect(key, &block)
  if block.nil?
    @on_reconnect.delete(key)
  else
    @on_reconnect[key] = block
  end
end

#receive_file(topic_url, file_path = nil, opts = {}, &block) ⇒ Object



105
106
107
108
109
110
111
112
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 105

def receive_file(topic_url, file_path = nil, opts = {}, &block)
  if topic_url.start_with? @address_prefix
    topic_url = topic_url[@address_prefix.length .. -1]
  end
  require 'omf_common/comm/amqp/amqp_file_transfer'
  file_path ||= File.join(Dir.tmpdir, Dir::Tmpname.make_tmpname('bdcast', '.xxx'))
  FileReceiver.new(file_path, @channel, topic_url, opts, &block)
end