Class: FaaStRuby::EventHub

Inherits:
Object
  • Object
show all
Extended by:
Logger::System
Defined in:
lib/faastruby/server/event_hub.rb

Constant Summary collapse

@@queue =
Queue.new

Class Method Summary collapse

Methods included from Logger::System

puts, puts, tag

Class Method Details

.listen_for_events!Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/faastruby/server/event_hub.rb', line 33

def self.listen_for_events!
  load_subscribers
  @@thread = Thread.new do
    loop do
      encoded_channel, encoded_data = @@queue.pop.split(',')
      channel = EventChannel.new(Base64.urlsafe_decode64(encoded_channel))
      puts "#{tag} Event channel=#{channel.name.inspect}"
      channel.subscribers.each do |s|
        subscriber = Subscriber.new(s)
        puts "#{tag} Trigger function=#{subscriber.path.inspect} base64_payload=#{encoded_data.inspect}"
        response = subscriber.call(encoded_data)
        puts "[#{subscriber.path}] #=> status=#{response.status} body=#{response.body.inspect} headers=#{Oj.dump response.headers}".light_blue
      end
    end
  end
  puts "#{tag} Events thread started."
end

.load_subscribersObject



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/faastruby/server/event_hub.rb', line 17

def self.load_subscribers
  Dir.glob('*/*/faastruby.yml').each do |file|
    workspace_name, function_name, _ = file.split('/')
    path = "#{workspace_name}/#{function_name}"
    config = YAML.load(File.read(file))
    next unless config['channels'].is_a?(Array)
    config['channels'].compact!
    config['channels'].each do |c|
      channel = EventChannel.new(c)
      channel.subscribe(path)
    end
  end
  puts "#{tag} Channel subscriptions: #{EventChannel.channels}"
  puts "#{tag} Please restart the server if you modify channel subscriptions in 'faastruby.yml' for any function."
end

.push(payload) ⇒ Object



9
10
11
# File 'lib/faastruby/server/event_hub.rb', line 9

def self.push(payload)
  @@queue << payload
end

.queueObject



5
6
7
# File 'lib/faastruby/server/event_hub.rb', line 5

def self.queue
  @@queue
end

.threadObject



13
14
15
# File 'lib/faastruby/server/event_hub.rb', line 13

def self.thread
  @@thread
end