Class: Async::Limiter::Window

Inherits:
Object
  • Object
show all
Defined in:
lib/async/limiter/window.rb,
lib/async/limiter/window/fixed.rb,
lib/async/limiter/window/sliding.rb,
lib/async/limiter/window/continuous.rb

Direct Known Subclasses

Continuous, Fixed, Sliding

Defined Under Namespace

Classes: Continuous, Fixed, Sliding

Constant Summary collapse

TYPES =
i[fixed sliding].freeze
NULL_TIME =
-1

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(limit = 1, type: :fixed, window: 1, parent: nil, burstable: true, lock: true, queue: []) ⇒ Window

Returns a new instance of Window.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/async/limiter/window.rb', line 17

def initialize(limit = 1, type: :fixed, window: 1, parent: nil,
  burstable: true, lock: true, queue: [])
  @count = 0
  @input_limit = @limit = limit
  @type = type
  @input_window = @window = window
  @parent = parent
  @burstable = burstable
  @lock = lock

  @waiting = queue
  @scheduler_task = nil

  @window_frame_start_time = NULL_TIME
  @window_start_time = NULL_TIME
  @window_count = 0

  update_concurrency
  validate!
end

Instance Attribute Details

#countObject (readonly)

Returns the value of attribute count.



11
12
13
# File 'lib/async/limiter/window.rb', line 11

def count
  @count
end

#lockObject (readonly)

Returns the value of attribute lock.



15
16
17
# File 'lib/async/limiter/window.rb', line 15

def lock
  @lock
end

#typeObject (readonly)

Returns the value of attribute type.



13
14
15
# File 'lib/async/limiter/window.rb', line 13

def type
  @type
end

Instance Method Details

#acquire(*queue_args) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/async/limiter/window.rb', line 65

def acquire(*queue_args)
  wait(*queue_args)
  @count += 1

  current_time = Clock.now

  if window_changed?(current_time)
    @window_start_time =
      if @type == :sliding
        current_time
      elsif @type == :fixed
        (current_time / @window).to_i * @window
      else
        raise "invalid type #{@type}"
      end

    @window_count = 1
  else
    @window_count += 1
  end

  @window_frame_start_time = current_time

  return unless block_given?

  begin
    yield
  ensure
    release
  end
end

#async(*queue_args, parent: (@parent || Task.current), **options) ⇒ Object



50
51
52
53
54
55
56
57
# File 'lib/async/limiter/window.rb', line 50

def async(*queue_args, parent: (@parent || Task.current), **options)
  acquire(*queue_args)
  parent.async(**options) do |task|
    yield task
  ensure
    release
  end
end

#blocking?Boolean

Returns:

  • (Boolean)


46
47
48
# File 'lib/async/limiter/window.rb', line 46

def blocking?
  limit_blocking? || window_blocking? || window_frame_blocking?
end

#limitObject



38
39
40
# File 'lib/async/limiter/window.rb', line 38

def limit
  @input_limit
end

#limit=(new_limit) ⇒ Object



104
105
106
107
108
109
110
111
112
113
# File 'lib/async/limiter/window.rb', line 104

def limit=(new_limit)
  validate_limit!(new_limit)
  @input_limit = @limit = new_limit

  update_concurrency
  resume_waiting
  reschedule if reschedule?

  limit
end

#releaseObject



97
98
99
100
101
102
# File 'lib/async/limiter/window.rb', line 97

def release
  @count -= 1

  # We're resuming waiting fibers when lock is released.
  resume_waiting if @lock
end

#sync(*queue_args) ⇒ Object



59
60
61
62
63
# File 'lib/async/limiter/window.rb', line 59

def sync(*queue_args)
  acquire(*queue_args) do
    yield(@parent || Task.current)
  end
end

#windowObject



42
43
44
# File 'lib/async/limiter/window.rb', line 42

def window
  @input_window
end

#window=(new_window) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/async/limiter/window.rb', line 115

def window=(new_window)
  validate_window!(new_window)
  @input_window = @window = new_window

  update_concurrency
  resume_waiting
  reschedule if reschedule?

  window
end