Class: Muxer::Multiplexer

Inherits:
Object
  • Object
show all
Defined in:
lib/muxer/multiplexer.rb

Overview

Multiplexer is the core class of Muxer that actually multiplexes web requests. Multiplexer has a lists of Muxer::Requests that will be executed and added to the completed or failed response when the timeouts have been reached.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeMultiplexer

multiplexer = Multiplexer.new



15
16
17
18
# File 'lib/muxer/multiplexer.rb', line 15

def initialize
  @requests = []
  @timeout = nil
end

Instance Attribute Details

#requestsArray

Returns Muxer::Requests that are setup in this Multiplexer.

Returns:

  • (Array)

    Muxer::Requests that are setup in this Multiplexer



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/muxer/multiplexer.rb', line 11

class Multiplexer
  attr_reader :requests
  attr_writer :timeout
  # multiplexer = Multiplexer.new
  def initialize
    @requests = []
    @timeout = nil
  end

  # add_url builds a Request object and passes it to add_request
  #
  # m.add_url('https://www.google.com', {timeout: 3}) # gives a 3 second
  # timeout to a request to https://www.google.com
  # 
  # url is merely the target  URL
  #
  # `options` is a hash describing the web request
  # {
  #   timeout: nil,
  #   method: :get,
  #   params: {},
  #   redirects: nil
  # }
  #
  # @param url [string] The URL for the web request
  # @param options [{symbol => Object}] The parameters for the web request
  # @return true
  def add_url(url, options = {})
    options.keys.each do |key|
      options[key.to_sym] = options.delete(key)
    end
    options = {timeout: nil, method: :get, params: {}, redirects: nil, id: nil}.merge(options)
    timeout = 
    request = Request.new
    request.url = url
    options.each do |key, val|
      next unless request.respond_to? ("#{key}=".to_sym)
      request.send("#{key}=".to_sym, val) if val
    end
    add_request request
    true
  end

  # add_request adds a request to Multiplexer
  #
  # request = Muxer::Request.new
  # request.url = 'https://www.google.com'
  # request.timeout = 3
  # m.add_request request
  #
  # gives a 3 second timeout to a request to https://www.google.com
  #
  # @param request [Muxer::Request] the Request to add to the multiplexer
  # @return true
  def add_request(request)
    requests << request
    true
  end

  # executes the actual event loop that manages creating, sending,
  # and processing the finished / timed out web requests
  #
  # @return [Hash] Keys are :succeeded, :failed
  def execute
    @responses = {succeeded: [], failed: [], pending: [], succeeded_by_id: {}}
    @start = Time.now
    EventMachine.run do
      requests.each do |request|
        @responses[:pending] << request.process!
      end

      EM::PeriodicTimer.new(0.001) do
        process_requests
      end
    end
    @responses
  end

  private

  def process_requests
    process_pending
    
    process_timeouts

    if @responses[:pending].empty?
      EM.stop
    end
  end

  def process_pending
    @responses[:pending].each do |pending|
      if pending.completed?
        @responses[:pending].delete(pending)
        if pending.error.nil?
          @responses[:succeeded] << pending
          if pending.id
            @responses[:succeeded_by_id][pending.id] = pending
          end
        else
          @responses[:failed] << pending
        end
      end
    end
  end

  def process_timeouts
    if @timeout && (@start + @timeout <= Time.now)
      finish_timeouts
      return
    end
    highest_remaining_timeout = @responses[:pending].map(&:timeout).max
    if highest_remaining_timeout && (@start + highest_remaining_timeout <= Time.now)
      finish_timeouts
    end
  end

  def finish_timeouts
    @responses[:pending].each do |pending|
      @responses[:failed] << pending
    end
    @responses[:pending] = []
    EM.stop
  end
end

#timeoutNumber

Returns Seconds for the timeout.

Returns:

  • (Number)

    Seconds for the timeout



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/muxer/multiplexer.rb', line 11

class Multiplexer
  attr_reader :requests
  attr_writer :timeout
  # multiplexer = Multiplexer.new
  def initialize
    @requests = []
    @timeout = nil
  end

  # add_url builds a Request object and passes it to add_request
  #
  # m.add_url('https://www.google.com', {timeout: 3}) # gives a 3 second
  # timeout to a request to https://www.google.com
  # 
  # url is merely the target  URL
  #
  # `options` is a hash describing the web request
  # {
  #   timeout: nil,
  #   method: :get,
  #   params: {},
  #   redirects: nil
  # }
  #
  # @param url [string] The URL for the web request
  # @param options [{symbol => Object}] The parameters for the web request
  # @return true
  def add_url(url, options = {})
    options.keys.each do |key|
      options[key.to_sym] = options.delete(key)
    end
    options = {timeout: nil, method: :get, params: {}, redirects: nil, id: nil}.merge(options)
    timeout = 
    request = Request.new
    request.url = url
    options.each do |key, val|
      next unless request.respond_to? ("#{key}=".to_sym)
      request.send("#{key}=".to_sym, val) if val
    end
    add_request request
    true
  end

  # add_request adds a request to Multiplexer
  #
  # request = Muxer::Request.new
  # request.url = 'https://www.google.com'
  # request.timeout = 3
  # m.add_request request
  #
  # gives a 3 second timeout to a request to https://www.google.com
  #
  # @param request [Muxer::Request] the Request to add to the multiplexer
  # @return true
  def add_request(request)
    requests << request
    true
  end

  # executes the actual event loop that manages creating, sending,
  # and processing the finished / timed out web requests
  #
  # @return [Hash] Keys are :succeeded, :failed
  def execute
    @responses = {succeeded: [], failed: [], pending: [], succeeded_by_id: {}}
    @start = Time.now
    EventMachine.run do
      requests.each do |request|
        @responses[:pending] << request.process!
      end

      EM::PeriodicTimer.new(0.001) do
        process_requests
      end
    end
    @responses
  end

  private

  def process_requests
    process_pending
    
    process_timeouts

    if @responses[:pending].empty?
      EM.stop
    end
  end

  def process_pending
    @responses[:pending].each do |pending|
      if pending.completed?
        @responses[:pending].delete(pending)
        if pending.error.nil?
          @responses[:succeeded] << pending
          if pending.id
            @responses[:succeeded_by_id][pending.id] = pending
          end
        else
          @responses[:failed] << pending
        end
      end
    end
  end

  def process_timeouts
    if @timeout && (@start + @timeout <= Time.now)
      finish_timeouts
      return
    end
    highest_remaining_timeout = @responses[:pending].map(&:timeout).max
    if highest_remaining_timeout && (@start + highest_remaining_timeout <= Time.now)
      finish_timeouts
    end
  end

  def finish_timeouts
    @responses[:pending].each do |pending|
      @responses[:failed] << pending
    end
    @responses[:pending] = []
    EM.stop
  end
end

Instance Method Details

#add_request(request) ⇒ Object

add_request adds a request to Multiplexer

request = Muxer::Request.new request.url = ‘www.google.com’ request.timeout = 3 m.add_request request

gives a 3 second timeout to a request to www.google.com

Parameters:

Returns:

  • true



65
66
67
68
# File 'lib/muxer/multiplexer.rb', line 65

def add_request(request)
  requests << request
  true
end

#add_url(url, options = {}) ⇒ Object

add_url builds a Request object and passes it to add_request

m.add_url(‘www.google.com’, 3) # gives a 3 second timeout to a request to www.google.com

url is merely the target URL

‘options` is a hash describing the web request

timeout: nil,
method: :get,
params: {,
redirects: nil

}

Parameters:

  • url (string)

    The URL for the web request

  • options ({symbol => Object}) (defaults to: {})

    The parameters for the web request

Returns:

  • true



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/muxer/multiplexer.rb', line 38

def add_url(url, options = {})
  options.keys.each do |key|
    options[key.to_sym] = options.delete(key)
  end
  options = {timeout: nil, method: :get, params: {}, redirects: nil, id: nil}.merge(options)
  timeout = 
  request = Request.new
  request.url = url
  options.each do |key, val|
    next unless request.respond_to? ("#{key}=".to_sym)
    request.send("#{key}=".to_sym, val) if val
  end
  add_request request
  true
end

#executeHash

executes the actual event loop that manages creating, sending, and processing the finished / timed out web requests

Returns:

  • (Hash)

    Keys are :succeeded, :failed



74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/muxer/multiplexer.rb', line 74

def execute
  @responses = {succeeded: [], failed: [], pending: [], succeeded_by_id: {}}
  @start = Time.now
  EventMachine.run do
    requests.each do |request|
      @responses[:pending] << request.process!
    end

    EM::PeriodicTimer.new(0.001) do
      process_requests
    end
  end
  @responses
end