Class: OFlow::Actors::HttpServer
- Inherits:
-
Trigger
- Object
- OFlow::Actor
- Trigger
- OFlow::Actors::HttpServer
- Defined in:
- lib/oflow/actors/httpserver.rb
Overview
Provides a simple HTTP server that accepts requests which then trigger a flow. The execution flow should end back at the server so it can sent a response to the requester.
Constant Summary collapse
- STATUS_MESSAGES =
{ 100 => 'Continue', 101 => 'Switching Protocols', 102 => 'Processing', 200 => 'OK', 201 => 'Created', 202 => 'Accepted', 203 => 'Non-Authoritative Information', 204 => 'No Content', 205 => 'Reset Content', 206 => 'Partial Content', 207 => 'Multi-Status', 208 => 'Already Reported', 226 => 'IM Used', 300 => 'Multiple Choices', 301 => 'Moved Permanently', 302 => 'Found', 303 => 'See Other', 304 => 'Not Modified', 305 => 'Use Proxy', 306 => 'Switch Proxy', 307 => 'Temporary Redirect', 308 => 'Permanent Redirect', 400 => 'Bad Request', 401 => 'Unauthorized', 402 => 'Payment Required', 403 => 'Forbidden', 404 => 'Not Found', 405 => 'Method Not Allowed', 406 => 'Not Acceptable', 407 => 'Proxy Authentication Required', 408 => 'Request Timeout', 409 => 'Conflict', 410 => 'Gone', 411 => 'Length Required', 412 => 'Precondition Failed', 413 => 'Request Entity Too Large', 414 => 'Request-URI Too Long', 415 => 'Unsupported Media Type', 416 => 'Requested Range Not Satisfiable', 417 => 'Expectation Failed', 418 => "I'm a teapot", 419 => 'Authentication Timeout', 420 => 'Method Failure', 422 => 'Unprocessed Entity', 423 => 'Locked', 424 => 'Failed Dependency', 425 => 'Unordered Collection', 426 => 'Upgrade Required', 428 => 'Precondition Required', 429 => 'Too Many Requests', 431 => 'Request Header Fields Too Long', 440 => 'Login Timeout', 444 => 'No Response', 449 => 'Retry With', 450 => 'Blocked by Windows Parental Controls', 451 => 'Unavailable For Legal Reasons', 494 => 'Request Header Too Large', 495 => 'Cert Error', 496 => 'No Cert', 497 => 'HTTP tp HTTP', 499 => 'Client Closed Request', 500 => 'Internal Server Error', 501 => 'Not Implemented', 502 => 'Bad Gateway', 503 => 'Service Unavailable', 504 => 'Gateway Timeout', 505 => 'HTTP Version Not Supported', 506 => 'Variant Also Negotiates', 507 => 'Insufficient Storage', 508 => 'Loop Detected', 509 => 'Bandwidth Limit Exceeded', 510 => 'Not Extended', 511 => 'Network Authentication Required', 520 => 'Original Error', 522 => 'Connection timed out', 523 => 'Proxy Declined Request', 524 => 'A timeout occurred', 598 => 'Network read timeout error', 599 => 'Network connect timeout error', }
Instance Attribute Summary
Attributes inherited from Trigger
Attributes inherited from OFlow::Actor
Instance Method Summary collapse
-
#initialize(task, options) ⇒ HttpServer
constructor
A new instance of HttpServer.
- #perform(op, box) ⇒ Object
- #read_req(session, id) ⇒ Object
- #read_timeout(session, len, timeout) ⇒ Object
- #set_options(options) ⇒ Object
Methods inherited from Trigger
#new_event, #set_label, #set_with_tracker
Methods inherited from OFlow::Actor
#busy?, #inputs, #options, #outputs, #set_option, #with_own_thread
Constructor Details
#initialize(task, options) ⇒ HttpServer
Returns a new instance of HttpServer.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/oflow/actors/httpserver.rb', line 14 def initialize(task, ) super @sessions = { } @server = TCPServer.new(@port) @server.fcntl(Fcntl::F_SETFL, @server.fcntl(Fcntl::F_GETFL, 0) | Fcntl::O_NONBLOCK) @server_loop = Thread.start(self) do |me| Thread.current[:name] = me.task.full_name() + '-server' while Task::CLOSING != task.state begin if Task::BLOCKED == task.state || Task::STOPPED == task.state sleep(0.1) next end session = @server.accept_nonblock() session.fcntl(Fcntl::F_SETFL, session.fcntl(Fcntl::F_GETFL, 0) | Fcntl::O_NONBLOCK) @count += 1 # if nil is returned the request was empty next if (req = read_req(session, @count)).nil? @sessions[@count] = session resp = { status: 200, body: nil, headers: { 'Content-Type' => 'text/html', } } box = new_event() box.contents[:request] = req box.contents[:response] = resp task.links.each_key do |key| continue if :success == key || 'success' == key begin task.ship(key, box) rescue BlockedError task.warn("Failed to ship timer #{box.contents} to #{key}. Task blocked.") rescue BusyError task.warn("Failed to ship timer #{box.contents} to #{key}. Task busy.") end end rescue IO::WaitReadable, Errno::EINTR IO.select([@server], nil, nil, 0.5) rescue Exception => e task.handle_error(e) end end end end |
Instance Method Details
#perform(op, box) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/oflow/actors/httpserver.rb', line 63 def perform(op, box) case op when :reply req_id = box.get(@req_id_path) if (session = @sessions[req_id]).nil? raise NotFoundError.new(task.full_name, 'session', req_id) end if (resp = box.get(@response_path)).nil? raise NotFoundError.new(task.full_name, 'response', @response_path) end body = resp[:body] body = '' if body.nil? status = resp[:status] headers = ["HTTP/1.1 #{status} {STATUS_MESSAGES[status]}"] resp[:headers].each do |k,v| headers << "#{k}: #{v}" end headers << "Content-Length: #{body.length}\r\n\r\n" session.puts headers.join("\r\n") session.puts body session.close @sessions.delete(req_id) when :skip req_id = box.get(@req_id_path) if (session = @sessions[req_id]).nil? raise NotFoundError.new(task.full_name, 'session', req_id) end @sessions.delete(req_id) else raise OpError.new(task.full_name, op) end task.ship(:success, Box.new(nil, box.tracker)) unless task.links[:success].nil? end |
#read_req(session, id) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/oflow/actors/httpserver.rb', line 105 def read_req(session, id) req = { id: id, } line = session.gets() return if line.nil? parts = line.split(' ') req[:method] = parts[0] req[:protocol] = parts[2] path, arg_str = parts[1].split('?', 2) req[:path] = path args = nil unless arg_str.nil? args = arg_str.split('&').map { |pair| pair.split('=') } end req[:args] = args # Read the rest of the lines and the body if there is one. len = 0 while line = session.gets() line.strip! break if 0 == line.size parts = line.split(':', 2) next unless 2 == parts.size key = parts[0] value = parts[1].strip() if 'Content-Length' == key value = value.to_i len = value end req[key] = value end req[:body] = read_timeout(session, len, @read_timeout) if 0 < len req end |
#read_timeout(session, len, timeout) ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/oflow/actors/httpserver.rb', line 141 def read_timeout(session, len, timeout) str = '' done = Time.now() + timeout loop do begin str = session.readpartial(len, str) break if str.size == len rescue Errno::EAGAIN => e raise e if IO.select([session], nil, nil, done - Time.now()).nil? retry end end str end |
#set_options(options) ⇒ Object
97 98 99 100 101 102 103 |
# File 'lib/oflow/actors/httpserver.rb', line 97 def () super @port = .fetch(:port, 6060) @req_id_path = .fetch(:req_id_path, 'request:id') @response_path = .fetch(:response_path, 'response') @read_timeout = .fetch(:read_timeout, 1.0) end |