Class: Isono::NodeModules::EventChannel

Inherits:
Base
  • Object
show all
Defined in:
lib/isono/node_modules/event_channel.rb

Overview

Setup mq based event dispatch channels. It prepares a single point topic exchange which receives all the events from publishers. The event consumers setup queues respectively to bound to the exchange. The most of queues have the key string which filters the event. The event publisher puts the key when it sends each new event.

The key consists of two parts with dot separated: event type and the event sender (ie. “event1.sender1”). The event publisher can put any format of data into the message body part.

These rules make the message queue broker to work as the event dispacher.

Constant Summary collapse

AMQP_EXCHANGE =
'isono.event'

Instance Attribute Summary

Attributes inherited from Base

#node

Instance Method Summary collapse

Methods inherited from Base

#config_section, #initialize, #manifest, #value_object

Constructor Details

This class inherits a constructor from Isono::NodeModules::Base

Instance Method Details

#publish(evname, opts = {}) ⇒ Object

Examples:

publish('ev/event_name', :args=>[1, 2, 3])


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/isono/node_modules/event_channel.rb', line 39

def publish(evname, opts={})
  opts = {:args=>[], :sender=>manifest.node_id}.merge(opts)
  
  body = {
    :event => evname,
    :published_at=> Time.now,
    :sender  => opts[:sender],
    :origin_node  => manifest.node_id,
    :args => opts[:args]
  }
  
  EventMachine.schedule {
    @amq.event_exchange.publish(Serializer.instance.marshal(body),
                                      {:key=>"#{evname}.#{opts[:sender]}"}
                                      )
  }
end

#subscribe(evname, sender = '#', receiver_id = node.node_id, &blk) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/isono/node_modules/event_channel.rb', line 57

def subscribe(evname, sender='#', receiver_id=node.node_id, &blk)
  @amq.queue("#{evname}-#{receiver_id}",{:auto_delete=>true}).bind(
               AMQP_EXCHANGE, :key=>"#{evname}.#{sender}"
               ).subscribe { |data|
    data = Serializer.instance.unmarshal(data)
    case blk.arity
    when 2
      m = data.delete(:args)
      blk.call(data, m)
    when 1
      blk.call(data[:args])
    else
      blk.call
    end
  }
end

#unsubscribe(evname, receiver_id = node.node_id) ⇒ Object



74
75
76
77
78
79
# File 'lib/isono/node_modules/event_channel.rb', line 74

def unsubscribe(evname, receiver_id=node.node_id)
  EventMachine.schedule {
    q = @amq.queue("#{evname}-#{receiver_id}",{:auto_delete=>true})
    q.unsubscribe
  }
end