Class: Firehose::Client::Producer::Http

Inherits:
Object
  • Object
show all
Defined in:
lib/firehose/client/producer.rb

Overview

Publish messages to Firehose via an HTTP interface.

Defined Under Namespace

Classes: Builder

Constant Summary collapse

PublishError =

Exception gets raised when a 202 is not received from the server after a message is published.

Class.new(RuntimeError)
TimeoutError =
Class.new(Faraday::Error::TimeoutError)
Timeout =

How many seconds should we wait for a publish to take?

1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri = Firehose::URI) ⇒ Http

Returns a new instance of Http.



31
32
33
34
# File 'lib/firehose/client/producer.rb', line 31

def initialize(uri = Firehose::URI)
  @uri = ::URI.parse(uri.to_s)
  @uri.scheme ||= 'http'
end

Instance Attribute Details

#uriObject (readonly)

URI for the Firehose server. This URI does not include the path of the channel.



29
30
31
# File 'lib/firehose/client/producer.rb', line 29

def uri
  @uri
end

Class Method Details

.adapterObject

Use :net_http for the default Faraday adapter.



93
94
95
# File 'lib/firehose/client/producer.rb', line 93

def self.adapter
  @adapter ||= Faraday.default_adapter
end

.adapter=(adapter) ⇒ Object

What adapter should Firehose use to PUT the message? List of adapters is available at github.com/technoweenie/faraday.



88
89
90
# File 'lib/firehose/client/producer.rb', line 88

def self.adapter=(adapter)
  @adapter = adapter
end

Instance Method Details

#error_handlerObject

Raise an exception if an error occurs when connecting to the Firehose.



82
83
84
# File 'lib/firehose/client/producer.rb', line 82

def error_handler
  @error_handler || Proc.new{ |e| raise e }
end

#on_error(&block) ⇒ Object

Handle errors that could happen while publishing a message.



77
78
79
# File 'lib/firehose/client/producer.rb', line 77

def on_error(&block)
  @error_handler = block
end

#publish(message) ⇒ Object

A DSL for publishing messages.



37
38
39
# File 'lib/firehose/client/producer.rb', line 37

def publish(message)
  Builder.new(self, message)
end

#put(message, channel, opts, &block) ⇒ Object

Publish the message via HTTP.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/firehose/client/producer.rb', line 42

def put(message, channel, opts, &block)
  ttl = opts[:ttl]
  timeout = opts[:timeout] || Timeout

  response = conn.put do |req|
    req.options[:timeout] = timeout
    if conn.path_prefix.nil? || conn.path_prefix == '/'
      # This avoids a double / if the channel starts with a / too (which is expected).
      req.path = channel
    else
      if conn.path_prefix =~ /\/\Z/ || channel =~ /\A\//
        req.path = [conn.path_prefix, channel].compact.join
      else
        # Add a / so the prefix and channel aren't just rammed together.
        req.path = [conn.path_prefix, channel].compact.join('/')
      end
    end
    req.body = message
    req.headers['Cache-Control'] = "max-age=#{ttl.to_i}" if ttl
  end
  response.on_complete do
    case response.status
    when 202 # Fire off the callback if everything worked out OK.
      block.call(response) if block
    else
      error_handler.call PublishError.new("Could not publish #{message.inspect} to '#{uri.to_s}/#{channel}': #{response.inspect}")
    end
  end

  # Hide Faraday with this Timeout exception, and through the error handler.
  rescue Faraday::Error::TimeoutError => e
    error_handler.call TimeoutError.new(e)
end