Class: Droonga::Client::Connection::DroongaProtocol::Thread::Receiver

Inherits:
Object
  • Object
show all
Defined in:
lib/droonga/client/connection/droonga-protocol/thread.rb

Constant Summary collapse

BUFFER_SIZE =
8192

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Receiver

Returns a new instance of Receiver.



184
185
186
187
188
189
190
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 184

def initialize(options={})
  host = options[:host] || Socket.gethostname
  port = options[:port] || 0
  @socket = TCPServer.new(host, port)
  @read_ios = [@socket]
  @client_handlers = {}
end

Instance Attribute Details

#on_error=(value) ⇒ Object

Sets the attribute on_error

Parameters:

  • value

    the value to set the attribute on_error to.



182
183
184
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 182

def on_error=(value)
  @on_error = value
end

Instance Method Details

#closeObject



192
193
194
195
196
197
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 192

def close
  @socket.close
  @client_handlers.each_key do |client|
    client.close
  end
end

#hostObject



199
200
201
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 199

def host
  @socket.addr[3]
end

#portObject



203
204
205
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 203

def port
  @socket.addr[1]
end

#receive(options = {}, &block) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/droonga/client/connection/droonga-protocol/thread.rb', line 208

def receive(options={}, &block)
  timeout = options[:timeout]
  catch do |tag|
    loop do
      start = Time.now
      readable_ios, = IO.select(@read_ios, nil, nil, timeout)
      break if readable_ios.nil?
      if timeout
        timeout -= (Time.now - start)
        timeout = 0 if timeout < 0
      end
      readable_ios.each do |readable_io|
        on_readable(readable_io) do |object|
          begin
            yield(object)
          rescue LocalJumpError
            throw(tag)
          end
        end
      end
    end
  end
end