Class: OmfCommon::Comm::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/omf_common/comm/topic.rb

Direct Known Subclasses

AMQP::Topic, Local::Topic, XMPP::Topic

Constant Summary collapse

@@name2inst =
{}
@@lock =
Monitor.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



37
38
39
# File 'lib/omf_common/comm/topic.rb', line 37

def id
  @id
end

Class Method Details

.[](name) ⇒ Object



33
34
35
# File 'lib/omf_common/comm/topic.rb', line 33

def self.[](name)
  @@name2inst[name]
end

.create(name, opts = {}, &block) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/omf_common/comm/topic.rb', line 17

def self.create(name, opts = {}, &block)
  # Force string conversion as 'name' can be an ExperimentProperty
  name = name.to_s.to_sym
  @@lock.synchronize do
    unless @@name2inst[name]
      debug "New topic: #{name}"
      #opts[:address] ||= address_for(name)
      @@name2inst[name] = self.new(name, opts, &block)
    else
      debug "Existing topic: #{name}"
      block.call(@@name2inst[name]) if block
    end
    @@name2inst[name]
  end
end

Instance Method Details

#addressObject

Raises:

  • (NotImplementedError)


143
144
145
# File 'lib/omf_common/comm/topic.rb', line 143

def address
  raise NotImplementedError
end

#after(delay_sec, &block) ⇒ Object



147
148
149
150
151
152
# File 'lib/omf_common/comm/topic.rb', line 147

def after(delay_sec, &block)
  return unless block
  OmfCommon.eventloop.after(delay_sec) do
    block.arity == 1 ? block.call(self) : block.call
  end
end

#configure(props = {}, core_props = {}, &block) ⇒ Object



48
49
50
51
# File 'lib/omf_common/comm/topic.rb', line 48

def configure(props = {}, core_props = {}, &block)
  create_message_and_publish(:configure, props, core_props, block)
  self
end

#create(res_type, config_props = {}, core_props = {}, &block) ⇒ Object

Request the creation of a new resource. Returns itself



41
42
43
44
45
46
# File 'lib/omf_common/comm/topic.rb', line 41

def create(res_type, config_props = {}, core_props = {}, &block)
  config_props[:type] ||= res_type
  debug "Create resource of type '#{res_type}'"
  create_message_and_publish(:create, config_props, core_props, block)
  self
end

#create_message_and_publish(type, props = {}, core_props = {}, block = nil) ⇒ Object



76
77
78
79
80
81
# File 'lib/omf_common/comm/topic.rb', line 76

def create_message_and_publish(type, props = {}, core_props = {}, block = nil)
  debug "(#{id}) create_message_and_publish '#{type}': #{props.inspect}"
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(type, props, core_props)
  publish(msg, &block)
end

#error?Boolean

For detecting message publishing error, means if callback indeed yield a Topic object, there is no publishing error, thus always false

Returns:

  • (Boolean)


139
140
141
# File 'lib/omf_common/comm/topic.rb', line 139

def error?
  false
end

#inform(type, props = {}, core_props = {}, &block) ⇒ Object



59
60
61
62
63
64
# File 'lib/omf_common/comm/topic.rb', line 59

def inform(type, props = {}, core_props = {}, &block)
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(:inform, props, core_props.merge(itype: type))
  publish(msg, &block)
  self
end

#on_inform(key = nil, &message_block) ⇒ Object



111
112
113
# File 'lib/omf_common/comm/topic.rb', line 111

def on_inform(key = nil, &message_block)
  add_message_handler(:inform, key, &message_block)
end

#on_message(key = nil, &message_block) ⇒ Object



107
108
109
# File 'lib/omf_common/comm/topic.rb', line 107

def on_message(key = nil, &message_block)
  add_message_handler(:message, key, &message_block)
end

#on_subscribed(&block) ⇒ Object

Raises:

  • (NotImplementedError)


134
135
136
# File 'lib/omf_common/comm/topic.rb', line 134

def on_subscribed(&block)
  raise NotImplementedError
end

#publish(msg, &block) ⇒ Object



83
84
85
86
# File 'lib/omf_common/comm/topic.rb', line 83

def publish(msg, &block)
  raise "Expected message but got '#{msg.class}" unless msg.is_a?(OmfCommon::Message)
  _send_message(msg, block)
end

#release(resource, core_props = {}, &block) ⇒ Object



66
67
68
69
70
71
72
73
74
# File 'lib/omf_common/comm/topic.rb', line 66

def release(resource, core_props = {}, &block)
  unless resource.is_a? self.class
    raise ArgumentError, "Expected '#{self.class}', but got '#{resource.class}'"
  end
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(:release, {}, core_props.merge(res_id: resource.id))
  publish(msg, &block)
  self
end

#request(select = [], core_props = {}, &block) ⇒ Object



53
54
55
56
57
# File 'lib/omf_common/comm/topic.rb', line 53

def request(select = [], core_props = {}, &block)
  # TODO: What are the parameters to the request method really?
  create_message_and_publish(:request, select, core_props, block)
  self
end

#unsubscribe(key) ⇒ Object

Remove all registered callbacks for ‘key’. Will also unsubscribe from the underlying comms layer if no callbacks remain.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/omf_common/comm/topic.rb', line 118

def unsubscribe(key)
  @lock.synchronize do
    @handlers.each do |name, cbks|
      if cbks.delete(key)
        # remove altogether if no callback left
        if cbks.empty?
          @handlers.delete(name)
        end
      end
    end
    if @handlers.empty?
      warn "Should unsubscribe '#{id}'"
    end
  end
end