Module: IOPromise::Dalli::AsyncServer

Defined in:
lib/iopromise/dalli/patch_dalli.rb

Instance Method Summary collapse

Instance Method Details

#async?Boolean

Returns:

  • (Boolean)


42
43
44
# File 'lib/iopromise/dalli/patch_dalli.rb', line 42

def async?
  @async
end

#async_resetObject



54
55
56
57
58
59
60
# File 'lib/iopromise/dalli/patch_dalli.rb', line 54

def async_reset
  @write_buffer = +""
  @write_offset = 0

  @read_buffer = +""
  @read_offset = 0
end

#closeObject



46
47
48
49
50
51
52
# File 'lib/iopromise/dalli/patch_dalli.rb', line 46

def close
  if async?
    async_reset
  end

  super
end

#execute_continue(ready_readers, ready_writers, ready_exceptions) ⇒ Object

called by ExecutorPool to continue processing for this server



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/iopromise/dalli/patch_dalli.rb', line 63

def execute_continue(ready_readers, ready_writers, ready_exceptions)
  unless ready_writers.nil? || ready_writers.empty?
    # we are able to write, so write as much as we can.
    sock_write_nonblock
  end

  readers_empty = ready_readers.nil? || ready_readers.empty?
  exceptions_empty = ready_exceptions.nil? || ready_exceptions.empty?

  if !readers_empty || !exceptions_empty
    sock_read_nonblock
  end

  readers = []
  writers = []
  exceptions = [@sock]
  timeout = nil

  to_timeout = @pending_ops.select { |key, op| op.timeout? }
  to_timeout.each do |key, op|
    @pending_ops.delete(key)
    op.reject(Timeout::Error.new)
    op.execute_pool.complete(op)
  end

  unless @pending_ops.empty?
    # wait for writability if we have pending data to write
    writers << @sock if @write_buffer.bytesize > @write_offset
    # and always call back when there is data available to read
    readers << @sock

    # let all pending operations know that they are seeing the
    # select loop. this starts the timer for the operation, because
    # it guarantees we're now working on it.
    # this is more accurate than starting the timer when we buffer
    # the write.
    @pending_ops.each do |_, op|
      op.in_select_loop
    end

    # mark the amount of time left of the closest to timeout.
    timeout = @pending_ops.map { |_, op| op.timeout_remaining }.min
  end

  [readers, writers, exceptions, timeout]
end

#initialize(attribs, options = {}) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/iopromise/dalli/patch_dalli.rb', line 29

def initialize(attribs, options = {})
  @async = options.delete(:iopromise_async) == true

  if @async
    async_reset

    @next_opaque_id = 0
    @pending_ops = {}
  end

  super
end