Class: OmfCommon::Comm::Topic
- Inherits:
-
Object
- Object
- OmfCommon::Comm::Topic
show all
- Defined in:
- lib/omf_common/comm/topic.rb
Constant Summary
collapse
- @@name2inst =
{}
- @@lock =
Monitor.new
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
-
#address ⇒ Object
-
#after(delay_sec, &block) ⇒ Object
-
#configure(props = {}, core_props = {}, &block) ⇒ Object
-
#create(res_type, config_props = {}, core_props = {}, &block) ⇒ Object
Request the creation of a new resource.
-
#create_message_and_publish(type, props = {}, core_props = {}, block = nil) ⇒ Object
Only used for create, configure and request.
-
#error? ⇒ Boolean
For detecting message publishing error, means if callback indeed yield a Topic object, there is no publishing error, thus always false.
-
#inform(type, props = {}, core_props = {}, &block) ⇒ Object
-
#on_inform(key = nil, &message_block) ⇒ Object
-
#on_message(key = nil, &message_block) ⇒ Object
-
#on_subscribed(&block) ⇒ Object
-
#publish(msg, opts = {}, &block) ⇒ Object
-
#release(resource, core_props = {}, &block) ⇒ Object
-
#request(select = [], core_props = {}, &block) ⇒ Object
-
#unsubscribe(key) ⇒ Object
Remove all registered callbacks for ‘key’.
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
37
38
39
|
# File 'lib/omf_common/comm/topic.rb', line 37
def id
@id
end
|
#routing_key ⇒ Object
Returns the value of attribute routing_key.
37
38
39
|
# File 'lib/omf_common/comm/topic.rb', line 37
def routing_key
@routing_key
end
|
Class Method Details
.[](name) ⇒ Object
33
34
35
|
# File 'lib/omf_common/comm/topic.rb', line 33
def self.[](name)
@@name2inst[name]
end
|
.create(name, opts = {}, &block) ⇒ Object
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
# File 'lib/omf_common/comm/topic.rb', line 17
def self.create(name, opts = {}, &block)
name = name.to_s.to_sym
@@lock.synchronize do
unless @@name2inst[name]
debug "New topic: #{name} | #{opts[:routing_key]}"
@@name2inst[name] = self.new(name, opts, &block)
else
debug "Existing topic: #{name} | #{@@name2inst[name].routing_key}"
block.call(@@name2inst[name]) if block
end
@@name2inst[name]
end
end
|
Instance Method Details
#address ⇒ Object
137
138
139
|
# File 'lib/omf_common/comm/topic.rb', line 137
def address
raise NotImplementedError
end
|
#after(delay_sec, &block) ⇒ Object
141
142
143
144
145
146
|
# File 'lib/omf_common/comm/topic.rb', line 141
def after(delay_sec, &block)
return unless block
OmfCommon.eventloop.after(delay_sec) do
block.arity == 1 ? block.call(self) : block.call
end
end
|
48
49
50
51
|
# File 'lib/omf_common/comm/topic.rb', line 48
def configure(props = {}, core_props = {}, &block)
create_message_and_publish(:configure, props, core_props, block)
self
end
|
#create(res_type, config_props = {}, core_props = {}, &block) ⇒ Object
Request the creation of a new resource. Returns itself
41
42
43
44
45
46
|
# File 'lib/omf_common/comm/topic.rb', line 41
def create(res_type, config_props = {}, core_props = {}, &block)
config_props[:type] ||= res_type
debug "Create resource of type '#{res_type}'"
create_message_and_publish(:create, config_props, core_props, block)
self
end
|
#create_message_and_publish(type, props = {}, core_props = {}, block = nil) ⇒ Object
Only used for create, configure and request
77
78
79
80
81
82
|
# File 'lib/omf_common/comm/topic.rb', line 77
def create_message_and_publish(type, props = {}, core_props = {}, block = nil)
debug "(#{id}) create_message_and_publish '#{type}': #{props.inspect}: #{core_props.inspect}"
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(type, props, core_props)
publish(msg, { routing_key: "o.op" }, &block)
end
|
#error? ⇒ Boolean
For detecting message publishing error, means if callback indeed yield a Topic object, there is no publishing error, thus always false
133
134
135
|
# File 'lib/omf_common/comm/topic.rb', line 133
def error?
false
end
|
59
60
61
62
63
64
|
# File 'lib/omf_common/comm/topic.rb', line 59
def inform(type, props = {}, core_props = {}, &block)
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(:inform, props, core_props.merge(itype: type))
publish(msg, { routing_key: "o.info" }, &block)
self
end
|
114
115
116
|
# File 'lib/omf_common/comm/topic.rb', line 114
def on_inform(key = nil, &message_block)
add_message_handler(:inform, key, &message_block)
end
|
#on_message(key = nil, &message_block) ⇒ Object
110
111
112
|
# File 'lib/omf_common/comm/topic.rb', line 110
def on_message(key = nil, &message_block)
add_message_handler(:message, key, &message_block)
end
|
#on_subscribed(&block) ⇒ Object
128
129
130
|
# File 'lib/omf_common/comm/topic.rb', line 128
def on_subscribed(&block)
raise NotImplementedError
end
|
#publish(msg, opts = {}, &block) ⇒ Object
84
85
86
87
88
89
|
# File 'lib/omf_common/comm/topic.rb', line 84
def publish(msg, opts = {}, &block)
error "!!!" if opts[:routing_key].nil?
raise "Expected message but got '#{msg.class}" unless msg.is_a?(OmfCommon::Message)
_send_message(msg, opts, block)
end
|
#release(resource, core_props = {}, &block) ⇒ Object
66
67
68
69
70
71
72
73
74
|
# File 'lib/omf_common/comm/topic.rb', line 66
def release(resource, core_props = {}, &block)
unless resource.is_a? self.class
raise ArgumentError, "Expected '#{self.class}', but got '#{resource.class}'"
end
core_props[:src] ||= OmfCommon.comm.local_address
msg = OmfCommon::Message.create(:release, {}, core_props.merge(res_id: resource.id))
publish(msg, { routing_key: "o.op" }, &block)
self
end
|
#request(select = [], core_props = {}, &block) ⇒ Object
53
54
55
56
57
|
# File 'lib/omf_common/comm/topic.rb', line 53
def request(select = [], core_props = {}, &block)
create_message_and_publish(:request, select, core_props, block)
self
end
|
#unsubscribe(key) ⇒ Object
Remove all registered callbacks for ‘key’. Will also unsubscribe from the underlying comms layer if no callbacks remain.
121
122
123
124
125
126
|
# File 'lib/omf_common/comm/topic.rb', line 121
def unsubscribe(key)
@lock.synchronize do
@handlers.clear
@@name2inst.delete_if { |k, v| k == id.to_sym || k == address.to_sym}
end
end
|