Class: OmfCommon::Comm::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/omf_common/comm/topic.rb

Direct Known Subclasses

AMQP::Topic, Local::Topic, XMPP::Topic

Constant Summary collapse

@@name2inst =
{}
@@lock =
Monitor.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



37
38
39
# File 'lib/omf_common/comm/topic.rb', line 37

def id
  @id
end

#routing_keyObject (readonly)

Returns the value of attribute routing_key.



37
38
39
# File 'lib/omf_common/comm/topic.rb', line 37

def routing_key
  @routing_key
end

Class Method Details

.[](name) ⇒ Object



33
34
35
# File 'lib/omf_common/comm/topic.rb', line 33

def self.[](name)
  @@name2inst[name]
end

.create(name, opts = {}, &block) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/omf_common/comm/topic.rb', line 17

def self.create(name, opts = {}, &block)
  # Force string conversion as 'name' can be an ExperimentProperty
  name = name.to_s.to_sym
  @@lock.synchronize do
    unless @@name2inst[name]
      debug "New topic: #{name} | #{opts[:routing_key]}"
      #opts[:address] ||= address_for(name)
      @@name2inst[name] = self.new(name, opts, &block)
    else
      debug "Existing topic: #{name} | #{@@name2inst[name].routing_key}"
      block.call(@@name2inst[name]) if block
    end
    @@name2inst[name]
  end
end

Instance Method Details

#addressObject

Raises:

  • (NotImplementedError)


137
138
139
# File 'lib/omf_common/comm/topic.rb', line 137

def address
  raise NotImplementedError
end

#after(delay_sec, &block) ⇒ Object



141
142
143
144
145
146
# File 'lib/omf_common/comm/topic.rb', line 141

def after(delay_sec, &block)
  return unless block
  OmfCommon.eventloop.after(delay_sec) do
    block.arity == 1 ? block.call(self) : block.call
  end
end

#configure(props = {}, core_props = {}, &block) ⇒ Object



48
49
50
51
# File 'lib/omf_common/comm/topic.rb', line 48

def configure(props = {}, core_props = {}, &block)
  create_message_and_publish(:configure, props, core_props, block)
  self
end

#create(res_type, config_props = {}, core_props = {}, &block) ⇒ Object

Request the creation of a new resource. Returns itself



41
42
43
44
45
46
# File 'lib/omf_common/comm/topic.rb', line 41

def create(res_type, config_props = {}, core_props = {}, &block)
  config_props[:type] ||= res_type
  debug "Create resource of type '#{res_type}'"
  create_message_and_publish(:create, config_props, core_props, block)
  self
end

#create_message_and_publish(type, props = {}, core_props = {}, block = nil) ⇒ Object

Only used for create, configure and request



77
78
79
80
81
82
# File 'lib/omf_common/comm/topic.rb', line 77

def create_message_and_publish(type, props = {}, core_props = {}, block = nil)
  debug "(#{id}) create_message_and_publish '#{type}': #{props.inspect}: #{core_props.inspect}"
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(type, props, core_props)
  publish(msg, { routing_key: "o.op" }, &block)
end

#error?Boolean

For detecting message publishing error, means if callback indeed yield a Topic object, there is no publishing error, thus always false

Returns:

  • (Boolean)


133
134
135
# File 'lib/omf_common/comm/topic.rb', line 133

def error?
  false
end

#inform(type, props = {}, core_props = {}, &block) ⇒ Object



59
60
61
62
63
64
# File 'lib/omf_common/comm/topic.rb', line 59

def inform(type, props = {}, core_props = {}, &block)
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(:inform, props, core_props.merge(itype: type))
  publish(msg, { routing_key: "o.info" }, &block)
  self
end

#on_inform(key = nil, &message_block) ⇒ Object



114
115
116
# File 'lib/omf_common/comm/topic.rb', line 114

def on_inform(key = nil, &message_block)
  add_message_handler(:inform, key, &message_block)
end

#on_message(key = nil, &message_block) ⇒ Object



110
111
112
# File 'lib/omf_common/comm/topic.rb', line 110

def on_message(key = nil, &message_block)
  add_message_handler(:message, key, &message_block)
end

#on_subscribed(&block) ⇒ Object

Raises:

  • (NotImplementedError)


128
129
130
# File 'lib/omf_common/comm/topic.rb', line 128

def on_subscribed(&block)
  raise NotImplementedError
end

#publish(msg, opts = {}, &block) ⇒ Object



84
85
86
87
88
89
# File 'lib/omf_common/comm/topic.rb', line 84

def publish(msg, opts = {}, &block)
  error "!!!" if opts[:routing_key].nil?

  raise "Expected message but got '#{msg.class}" unless msg.is_a?(OmfCommon::Message)
  _send_message(msg, opts, block)
end

#release(resource, core_props = {}, &block) ⇒ Object



66
67
68
69
70
71
72
73
74
# File 'lib/omf_common/comm/topic.rb', line 66

def release(resource, core_props = {}, &block)
  unless resource.is_a? self.class
    raise ArgumentError, "Expected '#{self.class}', but got '#{resource.class}'"
  end
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(:release, {}, core_props.merge(res_id: resource.id))
  publish(msg, { routing_key: "o.op" }, &block)
  self
end

#request(select = [], core_props = {}, &block) ⇒ Object



53
54
55
56
57
# File 'lib/omf_common/comm/topic.rb', line 53

def request(select = [], core_props = {}, &block)
  # TODO: What are the parameters to the request method really?
  create_message_and_publish(:request, select, core_props, block)
  self
end

#unsubscribe(key) ⇒ Object

Remove all registered callbacks for ‘key’. Will also unsubscribe from the underlying comms layer if no callbacks remain.



121
122
123
124
125
126
# File 'lib/omf_common/comm/topic.rb', line 121

def unsubscribe(key)
  @lock.synchronize do
    @handlers.clear
    @@name2inst.delete_if { |k, v| k == id.to_sym || k == address.to_sym}
  end
end