Class: Muxer::Multiplexer
- Inherits:
-
Object
- Object
- Muxer::Multiplexer
- Defined in:
- lib/muxer/multiplexer.rb
Overview
Multiplexer is the core class of Muxer that actually multiplexes web requests. Multiplexer has a lists of Muxer::Requests that will be executed and added to the completed or failed response when the timeouts have been reached.
Instance Attribute Summary collapse
-
#requests ⇒ Array
Muxer::Requests that are setup in this Multiplexer.
-
#timeout ⇒ Number
Seconds for the timeout.
Instance Method Summary collapse
-
#add_request(request) ⇒ Object
add_request adds a request to Multiplexer.
-
#add_url(url, options = {}) ⇒ Object
add_url builds a Request object and passes it to add_request.
-
#execute ⇒ Hash
executes the actual event loop that manages creating, sending, and processing the finished / timed out web requests.
-
#initialize ⇒ Multiplexer
constructor
multiplexer = Multiplexer.new.
Constructor Details
#initialize ⇒ Multiplexer
multiplexer = Multiplexer.new
15 16 17 18 |
# File 'lib/muxer/multiplexer.rb', line 15 def initialize @requests = [] @timeout = nil end |
Instance Attribute Details
#requests ⇒ Array
Returns Muxer::Requests that are setup in this Multiplexer.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/muxer/multiplexer.rb', line 11 class Multiplexer attr_reader :requests attr_writer :timeout # multiplexer = Multiplexer.new def initialize @requests = [] @timeout = nil end # add_url builds a Request object and passes it to add_request # # m.add_url('https://www.google.com', {timeout: 3}) # gives a 3 second # timeout to a request to https://www.google.com # # url is merely the target URL # # `options` is a hash describing the web request # { # timeout: nil, # method: :get, # params: {}, # redirects: nil # } # # @param url [string] The URL for the web request # @param options [{symbol => Object}] The parameters for the web request # @return true def add_url(url, = {}) .keys.each do |key| [key.to_sym] = .delete(key) end = {timeout: nil, method: :get, params: {}, redirects: nil, id: nil}.merge() timeout = request = Request.new request.url = url .each do |key, val| next unless request.respond_to? ("#{key}=".to_sym) request.send("#{key}=".to_sym, val) if val end add_request request true end # add_request adds a request to Multiplexer # # request = Muxer::Request.new # request.url = 'https://www.google.com' # request.timeout = 3 # m.add_request request # # gives a 3 second timeout to a request to https://www.google.com # # @param request [Muxer::Request] the Request to add to the multiplexer # @return true def add_request(request) requests << request true end # executes the actual event loop that manages creating, sending, # and processing the finished / timed out web requests # # @return [Hash] Keys are :succeeded, :failed def execute @responses = {succeeded: [], failed: [], pending: [], succeeded_by_id: {}} @start = Time.now EventMachine.run do requests.each do |request| @responses[:pending] << request.process! end EM::PeriodicTimer.new(0.001) do process_requests end end @responses end private def process_requests process_pending process_timeouts if @responses[:pending].empty? EM.stop end end def process_pending @responses[:pending].each do |pending| if pending.completed? @responses[:pending].delete(pending) if pending.error.nil? @responses[:succeeded] << pending if pending.id @responses[:succeeded_by_id][pending.id] = pending end else @responses[:failed] << pending end end end end def process_timeouts if @timeout && (@start + @timeout <= Time.now) finish_timeouts return end highest_remaining_timeout = @responses[:pending].map(&:timeout).max if highest_remaining_timeout && (@start + highest_remaining_timeout <= Time.now) finish_timeouts end end def finish_timeouts @responses[:pending].each do |pending| @responses[:failed] << pending end @responses[:pending] = [] EM.stop end end |
#timeout ⇒ Number
Returns Seconds for the timeout.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/muxer/multiplexer.rb', line 11 class Multiplexer attr_reader :requests attr_writer :timeout # multiplexer = Multiplexer.new def initialize @requests = [] @timeout = nil end # add_url builds a Request object and passes it to add_request # # m.add_url('https://www.google.com', {timeout: 3}) # gives a 3 second # timeout to a request to https://www.google.com # # url is merely the target URL # # `options` is a hash describing the web request # { # timeout: nil, # method: :get, # params: {}, # redirects: nil # } # # @param url [string] The URL for the web request # @param options [{symbol => Object}] The parameters for the web request # @return true def add_url(url, = {}) .keys.each do |key| [key.to_sym] = .delete(key) end = {timeout: nil, method: :get, params: {}, redirects: nil, id: nil}.merge() timeout = request = Request.new request.url = url .each do |key, val| next unless request.respond_to? ("#{key}=".to_sym) request.send("#{key}=".to_sym, val) if val end add_request request true end # add_request adds a request to Multiplexer # # request = Muxer::Request.new # request.url = 'https://www.google.com' # request.timeout = 3 # m.add_request request # # gives a 3 second timeout to a request to https://www.google.com # # @param request [Muxer::Request] the Request to add to the multiplexer # @return true def add_request(request) requests << request true end # executes the actual event loop that manages creating, sending, # and processing the finished / timed out web requests # # @return [Hash] Keys are :succeeded, :failed def execute @responses = {succeeded: [], failed: [], pending: [], succeeded_by_id: {}} @start = Time.now EventMachine.run do requests.each do |request| @responses[:pending] << request.process! end EM::PeriodicTimer.new(0.001) do process_requests end end @responses end private def process_requests process_pending process_timeouts if @responses[:pending].empty? EM.stop end end def process_pending @responses[:pending].each do |pending| if pending.completed? @responses[:pending].delete(pending) if pending.error.nil? @responses[:succeeded] << pending if pending.id @responses[:succeeded_by_id][pending.id] = pending end else @responses[:failed] << pending end end end end def process_timeouts if @timeout && (@start + @timeout <= Time.now) finish_timeouts return end highest_remaining_timeout = @responses[:pending].map(&:timeout).max if highest_remaining_timeout && (@start + highest_remaining_timeout <= Time.now) finish_timeouts end end def finish_timeouts @responses[:pending].each do |pending| @responses[:failed] << pending end @responses[:pending] = [] EM.stop end end |
Instance Method Details
#add_request(request) ⇒ Object
add_request adds a request to Multiplexer
request = Muxer::Request.new request.url = ‘www.google.com’ request.timeout = 3 m.add_request request
gives a 3 second timeout to a request to www.google.com
65 66 67 68 |
# File 'lib/muxer/multiplexer.rb', line 65 def add_request(request) requests << request true end |
#add_url(url, options = {}) ⇒ Object
add_url builds a Request object and passes it to add_request
m.add_url(‘www.google.com’, 3) # gives a 3 second timeout to a request to www.google.com
url is merely the target URL
‘options` is a hash describing the web request
timeout: nil,
method: :get,
params: {,
redirects: nil
}
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/muxer/multiplexer.rb', line 38 def add_url(url, = {}) .keys.each do |key| [key.to_sym] = .delete(key) end = {timeout: nil, method: :get, params: {}, redirects: nil, id: nil}.merge() timeout = request = Request.new request.url = url .each do |key, val| next unless request.respond_to? ("#{key}=".to_sym) request.send("#{key}=".to_sym, val) if val end add_request request true end |
#execute ⇒ Hash
executes the actual event loop that manages creating, sending, and processing the finished / timed out web requests
74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/muxer/multiplexer.rb', line 74 def execute @responses = {succeeded: [], failed: [], pending: [], succeeded_by_id: {}} @start = Time.now EventMachine.run do requests.each do |request| @responses[:pending] << request.process! end EM::PeriodicTimer.new(0.001) do process_requests end end @responses end |