Class: Isono::NodeModules::DirectChannel

Inherits:
Base
  • Object
show all
Defined in:
lib/isono/node_modules/direct_channel.rb

Constant Summary collapse

AMQP_EXCHANGE =
'isono.direct'

Instance Attribute Summary

Attributes inherited from Base

#node

Instance Method Summary collapse

Methods inherited from Base

#config_section, #initialize, #manifest, #value_object

Constructor Details

This class inherits a constructor from Isono::NodeModules::Base

Instance Method Details

#publish(evname, opts = {}) ⇒ Object

Examples:

publish('ev/event_name', :args=>[1, 2, 3])


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/isono/node_modules/direct_channel.rb', line 25

def publish(evname, opts={})
  opts = {:args=>[], :sender=>manifest.node_id}.merge(opts)
  
  body = {
    :event => evname,
    :published_at=> Time.now,
    :sender  => opts[:sender],
    :origin_node  => manifest.node_id,
    :args => opts[:args]
  }
  
  EventMachine.schedule {
    @amq.event_exchange.publish(Serializer.instance.marshal(body),
                                      {:key=>"#{evname}"}
                                      )
  }
end

#subscribe(evname, &blk) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/isono/node_modules/direct_channel.rb', line 43

def subscribe(evname, &blk)
  @amq.queue("#{evname}",{:auto_delete=>true}).bind(
               AMQP_EXCHANGE, :key=>"#{evname}"
               ).subscribe { |data|
    data = Serializer.instance.unmarshal(data)
    case blk.arity
    when 2
      m = data.delete(:args)
      blk.call(data, m)
    when 1
      blk.call(data[:args])
    else
      blk.call
    end
  }
end

#unsubscribe(evname) ⇒ Object



60
61
62
63
64
65
# File 'lib/isono/node_modules/direct_channel.rb', line 60

def unsubscribe(evname)
  EventMachine.schedule {
    q = @amq.queue("#{evname}",{:auto_delete=>true})
    q.unsubscribe
  }
end