Module: Goat::NotificationCenter

Defined in:
lib/goat/notifications.rb

Defined Under Namespace

Classes: Receiver

Class Method Summary collapse

Class Method Details

.configure(opts = {}) ⇒ Object



33
34
35
36
37
38
39
40
41
# File 'lib/goat/notifications.rb', line 33

def self.configure(opts={})
  opts = {:host => '127.0.0.1', :recv_port => 8000, :send_port => 8001}.merge(opts)
  @configured = true
  @host = opts[:host]
  @recv_port = opts[:recv_port]
  @send_port = opts[:send_port]

  EM.next_tick { self.start_receiver }
end

.enabled?Boolean

Returns:

  • (Boolean)


31
# File 'lib/goat/notifications.rb', line 31

def self.enabled?; @configured; end

.load_notif(notif) ⇒ Object



53
54
55
# File 'lib/goat/notifications.rb', line 53

def self.load_notif(notif)
  JSON.load(notif)
end

.notif_to_json(notif) ⇒ Object



57
58
59
# File 'lib/goat/notifications.rb', line 57

def self.notif_to_json(notif)
  notif.to_json
end

.notify(notif) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/goat/notifications.rb', line 94

def self.notify(notif)
  if Dynamic.variable?(:txn)
    notif['txn'] = Dynamic[:txn]
    notif['txn_pgid'] = Dynamic[:txn_pgid]
  end

  nid = String.random
  notif['_nid'] = nid

  process_notification(notif) # ensure local delivery happens first
  received_notifications << nid

  if EM.reactor_running?
    EM.connect(@host, @send_port) do |c|
      # TODO: alert if this fails
      c.send_data(notif_to_json(notif) + "\n")
      c.close_connection(true)
    end
  else
    s = TCPSocket.open(@host, @send_port)
    s.write(notif_to_json(notif) + "\n")
    s.close
  end
rescue SocketError => e
  $stderr.puts "Couldn't notify host #{@host.inspect}:#{@send_port.inspect}"
  raise e
end

.process_notification(notif) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/goat/notifications.rb', line 76

def self.process_notification(notif)
  if(notif['txn'])
    bind = {:txn => notif['txn'], :txn_pgid => notif['txn_pgid']}
  else
    bind = {}
  end

  Dynamic.let(bind) do
    subscribers.each do |sub|
      if wants_notification(sub, notif)
        $stderr.puts "Dispatching to #{sub[:delegate]}"
        meth = sub[:meth]
        sub[:delegate].send(meth, notif)
      end
    end
  end
end

.receive(line) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/goat/notifications.rb', line 63

def self.receive(line)
  notif = load_notif(line)
  nid = notif['_nid']
  Goat.logd "received notif #{notif.inspect}" if $verbose

  raise "No _nid" unless nid

  unless received_notifications.include?(nid)
    received_notifications << nid
    process_notification(notif)
  end
end

.received_notificationsObject



61
# File 'lib/goat/notifications.rb', line 61

def self.received_notifications; @recv_notif ||= Set.new; end

.start_receiverObject



27
28
29
# File 'lib/goat/notifications.rb', line 27

def self.start_receiver
  Receiver.start(@host, @recv_port)
end

.subscribe(obj, meth, sig) ⇒ Object



122
123
124
# File 'lib/goat/notifications.rb', line 122

def self.subscribe(obj, meth, sig)
  subscribers << {:sig => sig, :meth => meth, :delegate => obj}
end

.subscribersObject



43
# File 'lib/goat/notifications.rb', line 43

def self.subscribers; @subscribers ||= Set.new; end

.wants_notification(sub, notif) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/goat/notifications.rb', line 45

def self.wants_notification(sub, notif)
  wants = true
  sub[:sig].each do |k, v|
    wants = false unless notif[k] == v
  end
  wants
end