Class: HTTPX::Selector

Inherits:
Object
  • Object
show all
Defined in:
lib/httpx/selector.rb

Defined Under Namespace

Classes: Monitor

Instance Method Summary collapse

Constructor Details

#initializeSelector

Returns a new instance of Selector.



49
50
51
52
53
54
55
# File 'lib/httpx/selector.rb', line 49

def initialize
  @readers = {}
  @writers = {}
  @lock = Mutex.new
  @__r__, @__w__ = IO.pipe
  @closed = false
end

Instance Method Details

#closeObject

Closes the selector.



144
145
146
147
148
149
150
151
# File 'lib/httpx/selector.rb', line 144

def close
  return if @closed
  @__r__.close
  @__w__.close
rescue IOError
ensure
  @closed = true
end

#deregister(io) ⇒ Object

deregisters io from selectables.



58
59
60
61
62
63
64
65
# File 'lib/httpx/selector.rb', line 58

def deregister(io)
  @lock.synchronize do
    rmonitor = @readers.delete(io)
    wmonitor = @writers.delete(io)
    monitor = rmonitor || wmonitor
    monitor.close(false) if monitor
  end
end

#register(io, interests) ⇒ Object

register io for interests events.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/httpx/selector.rb', line 68

def register(io, interests)
  readable = READABLE.include?(interests)
  writable = WRITABLE.include?(interests)
  @lock.synchronize do
    if readable
      monitor = @readers[io]
      if monitor
        monitor.interests = interests
      else
        monitor = Monitor.new(io, interests, self)
      end
      @readers[io] = monitor
      @writers.delete(io) unless writable
    end
    if writable
      monitor = @writers[io]
      if monitor
        monitor.interests = interests
      else
        # reuse object
        monitor = readable ? @readers[io] : Monitor.new(io, interests, self)
      end
      @writers[io] = monitor
      @readers.delete(io) unless readable
    end
    monitor
  end
end

#select(interval) ⇒ Object

waits for read/write events for interval. Yields for monitors of selected IO objects.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/httpx/selector.rb', line 100

def select(interval)
  begin
    r = nil
    w = nil
    @lock.synchronize do
      r = @readers.keys
      w = @writers.keys
    end
    r.unshift(@__r__)

    readers, writers = IO.select(r, w, nil, interval)

    raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil?
  rescue IOError, SystemCallError
    @lock.synchronize do
      @readers.reject! { |io, _| io.closed? }
      @writers.reject! { |io, _| io.closed? }
    end
    retry
  end

  readers.each do |io|
    if io == @__r__
      # clean up wakeups
      @__r__.read(@__r__.stat.size)
    else
      monitor = io.closed? ? @readers.delete(io) : @readers[io]
      next unless monitor
      monitor.readiness = writers.delete(io) ? :rw : :r
      yield monitor
    end
  end if readers

  writers.each do |io|
    monitor = io.closed? ? @writers.delete(io) : @writers[io]
    next unless monitor
    # don't double run this, the last iteration might have run this task already
    monitor.readiness = :w
    yield monitor
  end if writers
end

#wakeupObject

interrupts the select call.



154
155
156
# File 'lib/httpx/selector.rb', line 154

def wakeup
  @__w__.write_nonblock("\0", exception: false)
end