Class: Firehose::Client::Producer::Http

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/client/producer.rb

Overview

Publish messages to Firehose via an HTTP interface.

Defined Under Namespace

Classes: Builder

Constant Summary collapse

PublishError =

Exception gets raised when a 202 is not received from the server after a message is published.

Class.new(RuntimeError)
TimeoutError =
Class.new(Faraday::Error::TimeoutError)
DEFAULT_TIMEOUT =

How many seconds should we wait for a publish to take?

1
DEFAULT_ERROR_HANDLER =
->(e) { raise e }

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = Firehose::URI, timeout = DEFAULT_TIMEOUT) ⇒ Http

Returns a new instance of Http.



32
33
34
35
36
# File 'lib/firehose/client/producer.rb', line 32

def initialize(uri = Firehose::URI, timeout=DEFAULT_TIMEOUT)
  @uri = ::URI.parse(uri.to_s)
  @uri.scheme ||= 'http'
  @timeout = timeout
end

Instance Attribute Details

#timeoutObject (readonly)

URI for the Firehose server. This URI does not include the path of the channel.



30
31
32
# File 'lib/firehose/client/producer.rb', line 30

def timeout
  @timeout
end

#uriObject (readonly)

URI for the Firehose server. This URI does not include the path of the channel.



30
31
32
# File 'lib/firehose/client/producer.rb', line 30

def uri
  @uri
end

Class Method Details

.adapterObject

Use :net_http for the default Faraday adapter.



100
101
102
# File 'lib/firehose/client/producer.rb', line 100

def self.adapter
  @adapter ||= Faraday.default_adapter
end

.adapter=(adapter) ⇒ Object

What adapter should Firehose use to PUT the message? List of adapters is available at github.com/technoweenie/faraday.



95
96
97
# File 'lib/firehose/client/producer.rb', line 95

def self.adapter=(adapter)
  @adapter = adapter
end

Instance Method Details

#error_handlerObject

Raise an exception if an error occurs when connecting to the Firehose.



89
90
91
# File 'lib/firehose/client/producer.rb', line 89

def error_handler
  @error_handler || DEFAULT_ERROR_HANDLER
end

#on_error(&block) ⇒ Object

Handle errors that could happen while publishing a message.



84
85
86
# File 'lib/firehose/client/producer.rb', line 84

def on_error(&block)
  @error_handler = block
end

#publish(message) ⇒ Object

A DSL for publishing messages.



39
40
41
# File 'lib/firehose/client/producer.rb', line 39

def publish(message)
  Builder.new(self, message)
end

#put(message, channel, opts, &block) ⇒ Object

Publish the message via HTTP.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/firehose/client/producer.rb', line 44

def put(message, channel, opts, &block)
  ttl         = opts[:ttl]
  timeout     = opts[:timeout] || @timeout || DEFAULT_TIMEOUT
  buffer_size = opts[:buffer_size]

  response = conn.put do |req|
    req.options[:timeout] = timeout
    if conn.path_prefix.nil? || conn.path_prefix == '/'
      # This avoids a double / if the channel starts with a / too (which is expected).
      req.path = channel
    else
      if conn.path_prefix =~ /\/\Z/ || channel =~ /\A\//
        req.path = [conn.path_prefix, channel].compact.join
      else
        # Add a / so the prefix and channel aren't just rammed together.
        req.path = [conn.path_prefix, channel].compact.join('/')
      end
    end
    req.body = message
    req.headers['Cache-Control'] = "max-age=#{ttl.to_i}" if ttl
    req.headers["X-Firehose-Buffer-Size"] = buffer_size.to_s if buffer_size
  end
  response.on_complete do
    case response.status
    when 202 # Fire off the callback if everything worked out OK.
      block.call(response) if block
    else
      # don't pass along basic auth header, if present
      response_data = response.inspect.gsub(/"Authorization"=>"Basic \S+"/, '"Authorization" => "Basic [HIDDEN]"')
      endpoint = "#{uri}/#{channel}".gsub(/:\/\/\S+@/, "://")
      error_handler.call PublishError.new("Could not publish #{message.inspect} to '#{endpoint}': #{response_data}")
    end
  end

  # Hide Faraday with this Timeout exception, and through the error handler.
  rescue Faraday::Error::TimeoutError => e
    error_handler.call TimeoutError.new(e)
end