Class: Carnivore::Source::HttpSource
- Inherits:
-
Carnivore::Source
- Object
- Carnivore::Source
- Carnivore::Source::HttpSource
- Includes:
- Http::Utils::Params
- Defined in:
- lib/carnivore-http/http_source.rb
Overview
Carnivore HTTP source
Constant Summary collapse
- BODY_TO_FILE_SIZE =
Size limit for inline body
1024 * 10
Instance Attribute Summary collapse
-
#args ⇒ Hash
readonly
Source arguments.
-
#auth_allowed_origins ⇒ Array<IPAddr>
readonly
Allowed request origin addresses.
- #auth_htpasswd ⇒ HTAuth::PasswdFile readonly
Instance Method Summary collapse
-
#allowed_credentials?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on config credentials.
-
#allowed_htpasswd?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on htpasswd file.
-
#allowed_origin?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on origin.
-
#authorized?(message) ⇒ TrueClass, FalseClass
Message is authorized for processing.
-
#auto_process? ⇒ Boolean
Always auto start.
-
#build_listener(&block) ⇒ Reel::Server::HTTP, Reel::Server::HTTPS
Initialize http listener correctly based on configuration.
-
#build_message(req) ⇒ Hash
Build message hash from request.
-
#confirm(message, args = {}) ⇒ Object
Confirm processing of message.
-
#default_args(args = {}) ⇒ Hash
Default configuration arguments.
-
#perform_transmission(message_id, payload, method, url, headers = {}) ⇒ NilClass
Transmit message to HTTP endpoint.
- #retry_delivery ⇒ RetryDelivery
-
#retry_directory ⇒ String, NilClass
Directory storing failed messages.
-
#retry_write_directory ⇒ String, NilClass
Cache directory for initial writes.
-
#setup(args = {}) ⇒ Object
Setup the source.
- #terminate ⇒ Object
-
#transmit(message, *extra) ⇒ Object
Tranmit message.
-
#write_for_retry(message_id, payload, method, url, headers) ⇒ TrueClass, FalseClass
Persist message if enabled for send retry.
Methods included from Http::Utils::Params
#dump_query_string, #format_query_args, #format_query_type, included, #parse_query_string
Instance Attribute Details
#args ⇒ Hash (readonly)
Returns source arguments.
15 16 17 |
# File 'lib/carnivore-http/http_source.rb', line 15 def args @args end |
#auth_allowed_origins ⇒ Array<IPAddr> (readonly)
Returns allowed request origin addresses.
17 18 19 |
# File 'lib/carnivore-http/http_source.rb', line 17 def auth_allowed_origins @auth_allowed_origins end |
#auth_htpasswd ⇒ HTAuth::PasswdFile (readonly)
19 20 21 |
# File 'lib/carnivore-http/http_source.rb', line 19 def auth_htpasswd @auth_htpasswd end |
Instance Method Details
#allowed_credentials?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on config credentials
152 153 154 155 156 157 158 |
# File 'lib/carnivore-http/http_source.rb', line 152 def allowed_credentials?() if(creds = args.get(:authorization, :credentials)) creds[[:message][:authentication][:username]] == [:message][:authentication][:password] else true end end |
#allowed_htpasswd?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on htpasswd file
135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/carnivore-http/http_source.rb', line 135 def allowed_htpasswd?() if(auth_htpasswd) entry = auth_htpasswd.fetch([:message][:authentication][:username]) if(entry) entry.authenticated?([:message][:authentication][:password]) else false end else true end end |
#allowed_origin?(message) ⇒ TrueClass, FalseClass
Check if message is allowed based on origin
164 165 166 167 168 169 170 171 172 |
# File 'lib/carnivore-http/http_source.rb', line 164 def allowed_origin?() if(auth_allowed_origins) !!auth_allowed_origins.detect do |allowed_check| allowed_check.include?([:message][:origin]) end else true end end |
#authorized?(message) ⇒ TrueClass, FalseClass
Authorization is driven via the source configuration. Valid structure looks like:
{
:type => 'http',
:args => {
:authorization => {
:allowed_origins => ['127.0.0.1', '192.168.0.2', '192.168.6.0/24'],
:htpasswd => '/path/to/htpasswd.file',
:credentials => {
:username1 => 'password1'
},
:valid_on => :all # or :any
}
}
}
When multiple authorization items are provided, the ‘:valid_on` will define behavior. It will default to `:all`.
Message is authorized for processing
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/carnivore-http/http_source.rb', line 109 def () if(args.fetch(:authorization)) valid_on = args.fetch(:authorization, :valid_on, :all).to_sym case valid_on when :all allowed_origin?() && allowed_htpasswd?() && allowed_credentials?() when :any allowed_origin?() || allowed_htpasswd?() || allowed_credentials?() when :none true else raise ArgumentError.new "Unknown authorization `:valid_on` provided! Given: #{valid_on}. Allowed: `any` or `all`" end else true end end |
#auto_process? ⇒ Boolean
Always auto start
84 85 86 |
# File 'lib/carnivore-http/http_source.rb', line 84 def auto_process? args.has_key?(:enable_processing) ? args[:enable_processing] : true end |
#build_listener(&block) ⇒ Reel::Server::HTTP, Reel::Server::HTTPS
Initialize http listener correctly based on configuration
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/carnivore-http/http_source.rb', line 282 def build_listener(&block) app = Carnivore::Http::App.new(&block) = {:bind => []} if(args[:ssl]) ssl_config = Smash.new(args[:ssl]) [:bind] << "ssl://#{args[:bind]}:#{args[:port]}?cert=#{ssl_config[:cert]}&key=#{ssl_config[:key]}" else [:bind] << "tcp://#{args[:bind]}:#{args[:port]}" end srv = Puma::Server.new(app, Puma::Events.stdio, ) @listeners.push(srv) srv.binder.parse([:bind], Puma::Events.stdio) srv.run srv end |
#build_message(req) ⇒ Hash
if body size is greater than BODY_TO_FILE_SIZE the body will be a temp file instead of a string
Build message hash from request
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 |
# File 'lib/carnivore-http/http_source.rb', line 318 def (req) msg = Smash.new( :request => req, :headers => Smash[ req.headers.map{ |k,v| [k.downcase.tr('-', '_'), v]} ], :query => parse_query_string(req.query_string), :origin => req.remote_addr, :authentication => {} ) if(msg[:headers][:content_type] == 'application/json') msg[:body] = MultiJson.load( req.body.read ) elsif(msg[:headers][:content_type] == 'application/x-www-form-urlencoded') msg[:body] = parse_query_string( req.body.read ) if(msg[:body].size == 1 && msg[:body].values.first.is_a?(Array) && msg[:body].values.first.empty?) msg[:body] = msg[:body].keys.first end elsif(msg[:headers][:content_length].to_i > BODY_TO_FILE_SIZE) msg[:body] = Tempfile.new('carnivore-http') while((chunk = req.body.readpartial(2048))) msg[:body] << chunk end msg[:body].rewind else msg[:body] = req.body.read end if(msg[:headers][:authorization]) user, pass = Base64.urlsafe_decode64( msg[:headers][:authorization].split(' ').last ).split(':', 2) msg[:authentication] = { :username => user, :password => pass } end if(msg[:body].is_a?(Hash) && msg[:body][:id]) Smash.new( :raw => msg, :content => msg[:body].to_smash ) else msg end end |
#confirm(message, args = {}) ⇒ Object
Confirm processing of message
265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/carnivore-http/http_source.rb', line 265 def confirm(, args={}) unless([:message][:confirmed]) code = args.delete(:code) || :ok args[:response_body] = 'Thanks' if code == :ok && args.empty? body = args.delete(:response_body) debug "Confirming #{} with: Code: #{code.inspect} Args: #{args.inspect} Body: #{body}" [:message][:request].respond(code, *(args.empty? ? [body] : [args.merge(:body => body)])) [:message][:confirmed] = true else warn "Message was already confimed. Confirmation not sent! (#{})" end end |
#default_args(args = {}) ⇒ Hash
Default configuration arguments. If hash is provided, it will be merged into the default arguments.
74 75 76 77 78 79 80 81 |
# File 'lib/carnivore-http/http_source.rb', line 74 def default_args(args={}) Smash.new( :bind => '0.0.0.0', :port => '3000', :auto_respond => true, :retry_directory => '/tmp/.carnivore-resend' ).merge(args) end |
#perform_transmission(message_id, payload, method, url, headers = {}) ⇒ NilClass
Transmit message to HTTP endpoint
224 225 226 227 228 229 230 |
# File 'lib/carnivore-http/http_source.rb', line 224 def perform_transmission(, payload, method, url, headers={}) unless(retry_delivery.redeliver(, payload, method, url, headers)) write_for_retry(, payload, method, url, headers) retry_delivery.async.attempt_redelivery() end nil end |
#retry_delivery ⇒ RetryDelivery
50 51 52 |
# File 'lib/carnivore-http/http_source.rb', line 50 def retry_delivery Carnivore::Supervisor.supervisor[:http_retry_delivery] end |
#retry_directory ⇒ String, NilClass
Returns directory storing failed messages.
55 56 57 58 59 |
# File 'lib/carnivore-http/http_source.rb', line 55 def retry_directory if(args[:retry_directory]) FileUtils.mkdir_p(File.join(args[:retry_directory], name.to_s)).first end end |
#retry_write_directory ⇒ String, NilClass
Returns cache directory for initial writes.
62 63 64 65 66 67 |
# File 'lib/carnivore-http/http_source.rb', line 62 def retry_write_directory base = retry_directory if(base) FileUtils.mkdir_p(File.join(base, '.write')).first end end |
#setup(args = {}) ⇒ Object
Setup the source
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/carnivore-http/http_source.rb', line 24 def setup(args={}) require 'fileutils' @args = default_args(args) unless(retry_delivery) Carnivore::Supervisor.supervisor.supervise_as( :http_retry_delivery, Carnivore::Http::RetryDelivery, retry_directory ) end if(args.get(:authorization, :allowed_origins)) require 'ipaddr' @allowed_origins = [args.get(:authorization, :allowed_origins)].flatten.compact.map do |origin_check| IPAddr.new(origin_check) end end if(args.get(:authorization, :htpasswd)) require 'htauth' @auth_htpasswd = HTAuth::PasswdFile.open( args.get(:authorization, :htpasswd) ) end @listeners = [] end |
#terminate ⇒ Object
298 299 300 301 302 303 304 305 306 |
# File 'lib/carnivore-http/http_source.rb', line 298 def terminate super if(@listeners) @listeners.each do |l| l.stop(:sync) end @listeners.clear end end |
#transmit(message, *extra) ⇒ Object
Tranmit message. The transmission can be a response back to an open connection, or a request to a remote source (remote carnivore-http source generally)
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/carnivore-http/http_source.rb', line 180 def transmit(, *extra) = extra.detect{|x| x.is_a?(Hash)} || {} orig = extra.detect{|x| x.is_a?(Carnivore::Message)} con = [:connection] if(orig && con.nil?) con = orig[:message][:connection] end if(con) # response payload = .is_a?(String) ? : MultiJson.dump() # TODO: add `options` options for marshaling: json/xml/etc code = .fetch(:code, :ok) info "Transmit response type with code: #{code}" con.respond(code, payload) else # request if(args[:endpoint]) url = args[:endpoint] else url = "http#{'s' if args[:ssl]}://#{args[:bind]}" if(args[:port]) url << ":#{args[:port]}" end url = URI.join(url, args.fetch(:path, '/')).to_s end if([:path]) url = URI.join(url, [:path].to_s) end method = .fetch(:method, args.fetch(:method, :post) ).to_s.downcase.to_sym = .is_a?(Hash) ? .fetch(:id, Carnivore.uuid) : Carnivore.uuid payload = .is_a?(String) ? : MultiJson.dump() info "Transmit request type for Message ID: #{}" async.perform_transmission(.to_s, payload, method, url, .fetch(:headers, {})) end end |
#write_for_retry(message_id, payload, method, url, headers) ⇒ TrueClass, FalseClass
Persist message if enabled for send retry
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'lib/carnivore-http/http_source.rb', line 240 def write_for_retry(, payload, method, url, headers) data = { :message_id => , :payload => payload, :method => method, :url => url, :headers => headers } if(retry_directory) stage_path = File.join(retry_write_directory, "#{}.json") final_path = File.join(retry_directory, File.basename(stage_path)) File.open(stage_path, 'w+') do |file| file.write MultiJson.dump(data) end FileUtils.move(stage_path, final_path) info "Failed message (ID: #{}) persisted for resend" true end end |