Module: StreamModule

Included in:
Pf_Lab_Interface
Defined in:
lib/publisher/stream_module.rb

Constant Summary collapse

SECRET =
ENV["FIREBASE_SECRET"]
SITE_URL =
ENV["FIREBASE_SITE"]

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#connectionObject

Returns the value of attribute connection.



7
8
9
# File 'lib/publisher/stream_module.rb', line 7

def connection
  @connection
end

#esObject

the event_stream object



11
12
13
# File 'lib/publisher/stream_module.rb', line 11

def es
  @es
end

#event_sourceObject

Returns the value of attribute event_source.



9
10
11
# File 'lib/publisher/stream_module.rb', line 9

def event_source
  @event_source
end

#on_message_handler_functionObject

Returns the value of attribute on_message_handler_function.



6
7
8
# File 'lib/publisher/stream_module.rb', line 6

def on_message_handler_function
  @on_message_handler_function
end

#private_key_hashObject

Returns the value of attribute private_key_hash.



8
9
10
# File 'lib/publisher/stream_module.rb', line 8

def private_key_hash
  @private_key_hash
end

Instance Method Details

#on_message_handler(data) ⇒ Object



63
64
65
66
# File 'lib/publisher/stream_module.rb', line 63

def on_message_handler(data)
  #puts "got some data"
  #puts data
end

#setup_connectionObject



13
14
15
16
17
18
19
20
# File 'lib/publisher/stream_module.rb', line 13

def setup_connection
  raise "please provide the private key hash, from firebase service account -> create private key " if self.private_key_hash.blank?
  raise "no event source endpoint provided" if self.event_source.blank?
  self.connection = RestFirebase.new :site => SITE_URL,
                    :secret => SECRET, :private_key_hash => private_key_hash, :auth_ttl => 1800
       self.on_message_handler_function ||= "on_message_handler"

end

#watchObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/publisher/stream_module.rb', line 22

def watch
  @reconnect = true
  self.es = self.connection.event_source(self.event_source)
  self.es.onopen   { |sock| p sock } # Called when connecte
  self.es.onmessage{ |event, data, sock| 
    #puts "event: #{event}"
    send(self.on_message_handler_function,data)
  }
  self.es.onerror  { |error, sock| p error } # Called 4
  self.es.onreconnect{ |error, sock| p error; @reconnect }
  self.es.start
  rd, wr = IO.pipe
  %w[INT TERM].each do |sig|
    Signal.trap(sig) do
      wr.puts # unblock the main thread
    end
  end
  rd.gets # block main thread until INT or TERM received
  @reconnect = false
  self.es.close
  self.es.wait # shutdown cleanly
end

#watch_limited(seconds) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/publisher/stream_module.rb', line 45

def watch_limited(seconds)
  
  @reconnect = true
  self.es = self.connection.event_source(self.event_source)
  self.es.onopen   { |sock| p sock } # Called when connecte
  self.es.onmessage{ |event, data, sock| 
    send(self.on_message_handler_function,data)
  }
  self.es.onerror  { |error, sock| p error } # Called 4
  self.es.onreconnect{ |error, sock| p error; @reconnect }
  self.es.start
  sleep(seconds)
  @reconnect = false
  self.es.close
  self.es.wait # shutdown cleanly
  
end