Module: Goat::NotificationCenter

Defined in:
lib/goat/notifications.rb

Defined Under Namespace

Classes: Receiver

Class Method Summary collapse

Class Method Details

.configure(opts = {}) ⇒ Object



31
32
33
34
35
# File 'lib/goat/notifications.rb', line 31

def self.configure(opts={})
  @host = opts[:host]
  @recv_port = opts[:recv_port]
  @send_port = opts[:send_port]
end

.load_notif(notif) ⇒ Object



57
58
59
# File 'lib/goat/notifications.rb', line 57

def self.load_notif(notif)
  JSON.load(notif)
end

.notif_to_json(notif) ⇒ Object



61
62
63
# File 'lib/goat/notifications.rb', line 61

def self.notif_to_json(notif)
  notif.to_json
end

.notify(notif) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/goat/notifications.rb', line 98

def self.notify(notif)
  set_defaults # maybe #schedule or #configure never got called

  if Dynamic.variable?(:txn)
    notif['txn'] = Dynamic[:txn]
    notif['txn_pgid'] = Dynamic[:txn_pgid]
  end

  nid = String.random
  notif['_nid'] = nid

  process_notification(notif) # ensure local delivery happens first
  received_notifications << nid

  if EM.reactor_running?
    EM.connect(@host, @send_port) do |c|
      # TODO: alert if this fails
      c.send_data(notif_to_json(notif) + "\n")
      c.close_connection(true)
    end
  else
    s = TCPSocket.open(@host, @send_port)
    s.write(notif_to_json(notif) + "\n")
    s.close
  end
rescue SocketError => e
  $stderr.puts "Couldn't notify host #{@host.inspect}:#{@send_port.inspect}"
  raise e
end

.process_notification(notif) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/goat/notifications.rb', line 80

def self.process_notification(notif)
  if(notif['txn'])
    bind = {:txn => notif['txn'], :txn_pgid => notif['txn_pgid']}
  else
    bind = {}
  end

  Dynamic.let(bind) do
    subscribers.each do |sub|
      if wants_notification(sub, notif)
        $stderr.puts "Dispatching to #{sub[:delegate]}"
        meth = sub[:meth]
        sub[:delegate].send(meth, notif)
      end
    end
  end
end

.receive(line) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/goat/notifications.rb', line 67

def self.receive(line)
  notif = load_notif(line)
  nid = notif['_nid']
  Goat.logd "received notif #{notif.inspect}" if $verbose

  raise "No _nid" unless nid

  unless received_notifications.include?(nid)
    received_notifications << nid
    process_notification(notif)
  end
end

.received_notificationsObject



65
# File 'lib/goat/notifications.rb', line 65

def self.received_notifications; @recv_notif ||= Set.new; end

.scheduleObject



43
44
45
# File 'lib/goat/notifications.rb', line 43

def self.schedule
  EM.next_tick { set_defaults; self.start_receiver }
end

.set_defaultsObject



37
38
39
40
41
# File 'lib/goat/notifications.rb', line 37

def self.set_defaults
  @host ||= '127.0.0.1'
  @recv_port ||= 8000
  @send_port ||= 8001
end

.start_receiverObject



27
28
29
# File 'lib/goat/notifications.rb', line 27

def self.start_receiver
  Receiver.start(@host, @recv_port)
end

.subscribe(obj, meth, sig) ⇒ Object



128
129
130
# File 'lib/goat/notifications.rb', line 128

def self.subscribe(obj, meth, sig)
  subscribers << {:sig => sig, :meth => meth, :delegate => obj}
end

.subscribersObject



47
# File 'lib/goat/notifications.rb', line 47

def self.subscribers; @subscribers ||= Set.new; end

.wants_notification(sub, notif) ⇒ Object



49
50
51
52
53
54
55
# File 'lib/goat/notifications.rb', line 49

def self.wants_notification(sub, notif)
  wants = true
  sub[:sig].each do |k, v|
    wants = false unless notif[k] == v
  end
  wants
end