Class: OmfCommon::Comm::Topic
- Inherits:
-
Object
- Object
- OmfCommon::Comm::Topic
show all
- Defined in:
- lib/omf_common/comm/topic.rb
Constant Summary
collapse
- @@name2inst =
{}
- @@lock =
Monitor.new
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
-
#address ⇒ Object
-
#after(delay_sec, &block) ⇒ Object
-
#configure(props = {}, core_props = {}, &block) ⇒ Object
-
#create(res_type, config_props = {}, core_props = {}, &block) ⇒ Object
Request the creation of a new resource.
-
#create_message_and_publish(type, props = {}, core_props = {}, block = nil) ⇒ Object
-
#error? ⇒ Boolean
For detecting message publishing error, means if callback indeed yield a Topic object, there is no publishing error, thus always false.
-
#inform(type, props = {}, core_props = {}, &block) ⇒ Object
-
#on_inform(key = nil, &message_block) ⇒ Object
-
#on_message(key = nil, &message_block) ⇒ Object
-
#on_subscribed(&block) ⇒ Object
-
#publish(msg, &block) ⇒ Object
-
#release(resource, core_props = {}, &block) ⇒ Object
-
#request(select = [], core_props = {}, &block) ⇒ Object
-
#unsubscribe(key) ⇒ Object
Remove all registered callbacks for ‘key’.
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
37
38
39
|
# File 'lib/omf_common/comm/topic.rb', line 37
def id
@id
end
|
Class Method Details
.[](name) ⇒ Object
33
34
35
|
# File 'lib/omf_common/comm/topic.rb', line 33
def self.[](name)
@@name2inst[name]
end
|
.create(name, opts = {}, &block) ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/omf_common/comm/topic.rb', line 17
def self.create(name, opts = {}, &block)
name = name.to_s.to_sym
@@lock.synchronize do
unless @@name2inst[name]
debug "New topic: #{name}"
@@name2inst[name] = self.new(name, opts, &block)
else
debug "Existing topic: #{name}"
block.call(@@name2inst[name]) if block
end
@@name2inst[name]
end
end
|
Instance Method Details
#address ⇒ Object
134
135
136
|
# File 'lib/omf_common/comm/topic.rb', line 134
def address
raise NotImplementedError
end
|
#after(delay_sec, &block) ⇒ Object
138
139
140
141
142
143
|
# File 'lib/omf_common/comm/topic.rb', line 138
def after(delay_sec, &block)
return unless block
OmfCommon.eventloop.after(delay_sec) do
block.arity == 1 ? block.call(self) : block.call
end
end
|
48
49
50
51
|
# File 'lib/omf_common/comm/topic.rb', line 48
def configure(props = {}, core_props = {}, &block)
create_message_and_publish(:configure, props, core_props, block)
self
end
|
#create(res_type, config_props = {}, core_props = {}, &block) ⇒ Object
Request the creation of a new resource. Returns itself
41
42
43
44
45
46
|
# File 'lib/omf_common/comm/topic.rb', line 41
def create(res_type, config_props = {}, core_props = {}, &block)
config_props[:type] ||= res_type
debug "Create resource of type '#{res_type}'"
create_message_and_publish(:create, config_props, core_props, block)
self
end
|
#create_message_and_publish(type, props = {}, core_props = {}, block = nil) ⇒ Object
76
77
78
79
80
81
|
# File 'lib/omf_common/comm/topic.rb', line 76
def create_message_and_publish(type, props = {}, core_props = {}, block = nil)
debug "(#{id}) create_message_and_publish '#{type}': #{props.inspect}"
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(type, props, core_props)
publish(msg, &block)
end
|
#error? ⇒ Boolean
For detecting message publishing error, means if callback indeed yield a Topic object, there is no publishing error, thus always false
130
131
132
|
# File 'lib/omf_common/comm/topic.rb', line 130
def error?
false
end
|
59
60
61
62
63
64
|
# File 'lib/omf_common/comm/topic.rb', line 59
def inform(type, props = {}, core_props = {}, &block)
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(:inform, props, core_props.merge(itype: type))
publish(msg, &block)
self
end
|
111
112
113
|
# File 'lib/omf_common/comm/topic.rb', line 111
def on_inform(key = nil, &message_block)
add_message_handler(:inform, key, &message_block)
end
|
#on_message(key = nil, &message_block) ⇒ Object
107
108
109
|
# File 'lib/omf_common/comm/topic.rb', line 107
def on_message(key = nil, &message_block)
add_message_handler(:message, key, &message_block)
end
|
#on_subscribed(&block) ⇒ Object
125
126
127
|
# File 'lib/omf_common/comm/topic.rb', line 125
def on_subscribed(&block)
raise NotImplementedError
end
|
#publish(msg, &block) ⇒ Object
83
84
85
86
|
# File 'lib/omf_common/comm/topic.rb', line 83
def publish(msg, &block)
raise "Expected message but got '#{msg.class}" unless msg.is_a?(OmfCommon::Message)
_send_message(msg, block)
end
|
#release(resource, core_props = {}, &block) ⇒ Object
66
67
68
69
70
71
72
73
74
|
# File 'lib/omf_common/comm/topic.rb', line 66
def release(resource, core_props = {}, &block)
unless resource.is_a? self.class
raise ArgumentError, "Expected '#{self.class}', but got '#{resource.class}'"
end
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(:release, {}, core_props.merge(res_id: resource.id))
publish(msg, &block)
self
end
|
#request(select = [], core_props = {}, &block) ⇒ Object
53
54
55
56
57
|
# File 'lib/omf_common/comm/topic.rb', line 53
def request(select = [], core_props = {}, &block)
create_message_and_publish(:request, select, core_props, block)
self
end
|
#unsubscribe(key) ⇒ Object
Remove all registered callbacks for ‘key’. Will also unsubscribe from the underlying comms layer if no callbacks remain.
118
119
120
121
122
123
|
# File 'lib/omf_common/comm/topic.rb', line 118
def unsubscribe(key)
@lock.synchronize do
@handlers.clear
@@name2inst.delete_if { |k, v| k == id.to_sym || k == address.to_sym}
end
end
|