Class: HTTPX::Selector

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/httpx/selector.rb

Overview

Implements the selector loop, where it registers and monitors “Selectable” objects.

A Selectable object is an object which can calculate the interests (:r, :w or :rw, respectively “read”, “write” or “read-write”) it wants to monitor for, and returns (via to_io method) an IO object which can be passed to functions such as IO.select . More exhaustively, a Selectable must implement the following methods:

state

returns the state as a Symbol, must return :closed when disposed of resources.

to_io

returns the IO object.

call

gets called when the IO is ready.

interests

returns the current interests to monitor for, as described above.

timeout

returns nil or an integer, representing how long to wait for interests.

handle_socket_timeout(Numeric)

called when waiting for interest times out.

Instance Method Summary collapse

Constructor Details

#initializeSelector

Returns a new instance of Selector.



34
35
36
37
38
# File 'lib/httpx/selector.rb', line 34

def initialize
  @timers = Timers.new
  @selectables = []
  @is_timer_interval = false
end

Instance Method Details

#deregister(io) ⇒ Object

deregisters io from selectables.



127
128
129
# File 'lib/httpx/selector.rb', line 127

def deregister(io)
  @selectables.delete(io)
end

#each(&blk) ⇒ Object



40
41
42
# File 'lib/httpx/selector.rb', line 40

def each(&blk)
  @selectables.each(&blk)
end

#each_connection(&block) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/httpx/selector.rb', line 101

def each_connection(&block)
  return enum_for(__method__) unless block

  @selectables.each do |c|
    case c
    when Resolver::Resolver
      c.each_connection(&block)
    when Connection
      yield c
    end
  end
end

#find_connection(request_uri, options) ⇒ Object



114
115
116
117
118
# File 'lib/httpx/selector.rb', line 114

def find_connection(request_uri, options)
  each_connection.find do |connection|
    connection.match?(request_uri, options)
  end
end

#find_mergeable_connection(connection) ⇒ Object



120
121
122
123
124
# File 'lib/httpx/selector.rb', line 120

def find_mergeable_connection(connection)
  each_connection.find do |ch|
    ch != connection && ch.mergeable?(connection)
  end
end

#find_resolver(options) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/httpx/selector.rb', line 93

def find_resolver(options)
  res = @selectables.find do |c|
    c.is_a?(Resolver::Resolver) && options == c.options
  end

  res.multi if res
end

#next_tickObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/httpx/selector.rb', line 44

def next_tick
  catch(:jump_tick) do
    timeout = next_timeout
    if timeout && timeout.negative?
      @timers.fire
      throw(:jump_tick)
    end

    begin
      select(timeout) do |c|
        c.log(level: 2) { "[#{c.state}] selected#{" after #{timeout} secs" unless timeout.nil?}..." }

        c.call
      end

      @timers.fire
    rescue TimeoutError => e
      @timers.fire(e)
    end
  end
rescue StandardError => e
  each_connection do |c|
    c.emit(:error, e)
  end
rescue Exception # rubocop:disable Lint/RescueException
  each_connection do |conn|
    conn.force_reset
    conn.disconnect
  end

  raise
end

#register(io) ⇒ Object

register io.



132
133
134
135
136
# File 'lib/httpx/selector.rb', line 132

def register(io)
  return if @selectables.include?(io)

  @selectables << io
end

#terminateObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/httpx/selector.rb', line 77

def terminate
  # array may change during iteration
  selectables = @selectables.reject(&:inflight?)

  selectables.delete_if do |sel|
    sel.terminate
    sel.state == :closed
  end

  until selectables.empty?
    next_tick

    selectables &= @selectables
  end
end