Class: OmfCommon::Comm::AMQP::Communicator

Inherits:
OmfCommon::Comm show all
Defined in:
lib/omf_common/comm/amqp/amqp_communicator.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from OmfCommon::Comm

init, instance, #local_address, #local_topic, #on_interrupted, #options, #publish, #subscribe

Instance Attribute Details

#channelObject (readonly)

end



19
20
21
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 19

def channel
  @channel
end

Instance Method Details

#broadcast_file(file_path, topic_name = nil, opts = {}, &block) ⇒ Object



103
104
105
106
107
108
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 103

def broadcast_file(file_path, topic_name = nil, opts = {}, &block)
  topic_name ||= SecureRandom.uuid
  require 'omf_common/comm/amqp/amqp_file_transfer'
  OmfCommon::Comm::AMQP::FileBroadcaster.new(file_path, @channel, topic_name, opts, &block)
  "bdcst:#{@address_prefix + topic_name}"
end

#conn_infoObject



40
41
42
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 40

def conn_info
  { proto: :amqp, user: ::AMQP.settings[:user], domain: ::AMQP.settings[:host] }
end

#create_topic(topic, opts = {}) ⇒ Object

Create a new pubsub topic with additional configuration

Parameters:

  • topic (String)

    Pubsub topic name



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 73

def create_topic(topic, opts = {})
  raise "Topic can't be nil or empty" if topic.nil? || topic.to_s.empty?
  opts = opts.dup
  opts[:communicator] = self
  topic = topic.to_s
  if topic.start_with? 'amqp:'
    # absolute address
    unless topic.start_with? @address_prefix
      raise "Cannot subscribe to a topic from different domain (#{topic}) - #{@address_prefix}"
    end
    opts[:address] = topic
    topic = topic.split(@address_prefix).last
  else
    opts[:address] = @address_prefix + topic
  end
  OmfCommon::Comm::AMQP::Topic.create(topic, opts)
end

#delete_topic(topic, &block) ⇒ Object

Delete a pubsub topic

Parameters:

  • topic (String)

    Pubsub topic name



94
95
96
97
98
99
100
101
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 94

def delete_topic(topic, &block)
  # FIXME CommProvider?
  if t = OmfCommon::CommProvider::AMQP::Topic.find(topic)
    t.release
  else
    warn "Attempt to delete unknown topic '#{topic}"
  end
end

#disconnect(opts = {}) ⇒ Object

Shut down comms layer



49
50
51
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 49

def disconnect(opts = {})
  info "Disconnecting..."
end

#init(opts = {}) ⇒ Object

Initialize comms layer



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 23

def init(opts = {})
  @opts = {
    #:ssl (Hash) TLS (SSL) parameters to use.
    heartbeat: 20, # (Fixnum) - default: 0 Connection heartbeat, in seconds. 0 means no heartbeat. Can also be configured server-side starting with RabbitMQ 3.0.
    #:on_tcp_connection_failure (#call) - A callable object that will be run if connection to server fails
    #:on_possible_authentication_failure (#call) - A callable object that will be run if authentication fails (see Authentication failure section)
    reconnect_delay: 20 # (Fixnum) - Delay in seconds before attempting reconnect on detected failure
  }.merge(opts)

  unless (@url = @opts.delete(:url))
    raise "Missing 'url' option for AQMP layer"
  end
  @address_prefix = @url + '/frcp.'
  _connect()
  super
end

#on_connected(&block) ⇒ Object

TODO: Should be thread safe and check if already connected



54
55
56
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 54

def on_connected(&block)
  @on_connected_procs << block
end

#on_reconnect(key, &block) ⇒ Object

register callbacks to be called when the underlying AMQP layer needs to reconnect to the AMQP server. This may require some additional repairs. If ‘block’ is nil, the callback is removed



62
63
64
65
66
67
68
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 62

def on_reconnect(key, &block)
  if block.nil?
    @on_reconnect.delete(key)
  else
    @on_reconnect[key] = block
  end
end

#receive_file(topic_url, file_path = nil, opts = {}, &block) ⇒ Object



110
111
112
113
114
115
116
117
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 110

def receive_file(topic_url, file_path = nil, opts = {}, &block)
  if topic_url.start_with? @address_prefix
    topic_url = topic_url[@address_prefix.length .. -1]
  end
  require 'omf_common/comm/amqp/amqp_file_transfer'
  file_path ||= File.join(Dir.tmpdir, Dir::Tmpname.make_tmpname('bdcast', '.xxx'))
  FileReceiver.new(file_path, @channel, topic_url, opts, &block)
end

#string_to_topic_address(a_string) ⇒ Object



44
45
46
# File 'lib/omf_common/comm/amqp/amqp_communicator.rb', line 44

def string_to_topic_address(a_string)
  @address_prefix+a_string
end