Class: Isono::NodeModules::DirectChannel
- Inherits:
-
Base
- Object
- Base
- Isono::NodeModules::DirectChannel
show all
- Defined in:
- lib/isono/node_modules/direct_channel.rb
Constant Summary
collapse
- AMQP_EXCHANGE =
'isono.direct'
Instance Attribute Summary
Attributes inherited from Base
#node
Instance Method Summary
collapse
Methods inherited from Base
#config_section, #initialize, #manifest, #value_object
Instance Method Details
#publish(evname, opts = {}) ⇒ Object
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/isono/node_modules/direct_channel.rb', line 25
def publish(evname, opts={})
opts = {:args=>[], :sender=>manifest.node_id}.merge(opts)
body = {
:event => evname,
:published_at=> Time.now,
:sender => opts[:sender],
:origin_node => manifest.node_id,
:args => opts[:args]
}
EventMachine.schedule {
@amq.event_exchange.publish(Serializer.instance.marshal(body),
{:key=>"#{evname}"}
)
}
end
|
#subscribe(evname, &blk) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/isono/node_modules/direct_channel.rb', line 43
def subscribe(evname, &blk)
@amq.queue("#{evname}",{:auto_delete=>true}).bind(
AMQP_EXCHANGE, :key=>"#{evname}"
).subscribe { |data|
data = Serializer.instance.unmarshal(data)
case blk.arity
when 2
m = data.delete(:args)
blk.call(data, m)
when 1
blk.call(data[:args])
else
blk.call
end
}
end
|
#unsubscribe(evname) ⇒ Object
60
61
62
63
64
65
|
# File 'lib/isono/node_modules/direct_channel.rb', line 60
def unsubscribe(evname)
EventMachine.schedule {
q = @amq.queue("#{evname}",{:auto_delete=>true})
q.unsubscribe
}
end
|