Class: Connfu::ConnfuStream

Inherits:
Object
  • Object
show all
Includes:
ConnfuLogger
Defined in:
lib/connfu/connfu_stream.rb

Overview

Open an HTTP connection to connFu and start listening to any incoming event This is the entry point to execute any proc in a connFu application based on external events.

Instance Method Summary collapse

Methods included from ConnfuLogger

included, #logger

Constructor Details

#initialize(app_stream, api_key, uri) ⇒ ConnfuStream

ConnfuStream initializer.

Parameters

  • app_stream valid HTTP stream url to connect and listen events

  • api_key valid Token to get access to a Connfu Stream

  • uri HTTP endpoint to open the connection



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/connfu/connfu_stream.rb', line 26

def initialize(app_stream, api_key, uri)

  @app_stream = app_stream
  @api_key = api_key

  _uri = URI.parse(uri)
  @port = _uri.port
  @host = _uri.host
  @path = _uri.path.concat(app_stream)
  @scheme = _uri.scheme

  @formatter = Connfu::ConnfuMessageFormatter
end

Instance Method Details

#getObject

This method should be called by the class that instantiates the ConnfuStream to fetch inbound events. It stops the execution waiting for an inbound event.

Return

Connfu::Message instance



94
95
96
# File 'lib/connfu/connfu_stream.rb', line 94

def get
  queue.pop
end

#start_listeningObject

Open a HTTP connection to connFu and start listening new events



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/connfu/connfu_stream.rb', line 42

def start_listening

  begin
    # http client instantiation and configuration
    http_client = Net::HTTP.new(@host, @port)
    logger.debug("#{self.class} opening connection to  #{@host}:#{@port}")
    http_client.use_ssl = @scheme.eql?("https")
    if @scheme.eql?("https")
      http_client.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end

    logger.debug("#{self.class} start listening to stream #{@path}")

    http_client.read_timeout = 60*6 # double the timeout connFu is configured

    # open connection
    http_client.start do |http|
      req = Net::HTTP::Get.new(
          @path,
          headers)

      # send GET request
      http.request(req) do |res|
        logger.debug "#{self.class} Request to the endpoint...."
        # read chunk data
        logger.debug "#{self.class} Waiting for a new event...."
        res.read_body do |chunk|
          unless chunk.chomp.strip.empty?
            # format data retrieved
            events = handle_data(chunk)
            # Insert message(s) in the queue
            events.nil? or message(events)
          else
            logger.debug "#{self.class} got an empty data"
          end
          logger.debug "#{self.class} Waiting for a new event...."
        end
      end
    end
  rescue Exception => ex
    logger.error "[#{Time.now} | #{ex.class}] #{ex.message}\n#{ex.backtrace.join("\n")}"
    # loop again
    start_listening
  end
end