Class: Concurrent::Throttle

Inherits:
Synchronization::Object
  • Object
show all
Includes:
PromisesIntegration
Defined in:
lib/concurrent/edge/throttle.rb

Overview

A tool manage concurrency level of future tasks.

Examples:

data     = (1..5).to_a
db       = data.reduce({}) { |h, v| h.update v => v.to_s }
max_two  = Throttle.new 2

futures = data.map do |data|
  Promises.future(data) do |data|
    # un-throttled, concurrency level equal data.size
    data + 1
  end.then_throttled_by(max_two, db) do |v, db|
    # throttled, only 2 tasks executed at the same time
    # e.g. limiting access to db
    db[v]
  end
end

futures.map(&:value!) # => [2, 3, 4, 5, nil]
throttle.throttled_future(1) do |arg|
  arg.succ
end
throttle.throttled_future_chain do |trigger|
  trigger.
      # 2 throttled promises
      chain { 1 }.
      then(&:succ)
end
max_two = Throttle.new 2
10.times.map do
  Thread.new do
    max_two.throttled_block do
      # Only 2 at the same time
      do_stuff
    end
  end
end

Defined Under Namespace

Modules: PromisesIntegration

Instance Method Summary collapse

Methods included from PromisesIntegration

#throttled_future, #throttled_future_chain

Constructor Details

#initialize(limit) ⇒ Throttle

New throttle.

Parameters:

  • limit (Integer)


59
60
61
62
63
64
# File 'lib/concurrent/edge/throttle.rb', line 59

def initialize(limit)
  super()
  @Limit       = limit
  self.can_run = limit
  @Queue       = LockFreeQueue.new
end

Instance Method Details

#limitInteger

Returns The limit.

Returns:

  • (Integer)

    The limit.



67
68
69
# File 'lib/concurrent/edge/throttle.rb', line 67

def limit
  @Limit
end

#releaseself

Has to be called once for each trigger after it is ok to execute another throttled task.

Returns:

  • (self)

See Also:



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/concurrent/edge/throttle.rb', line 93

def release
  while true
    current_can_run = can_run
    if compare_and_set_can_run current_can_run, current_can_run + 1
      if current_can_run < 0
        Thread.pass until (trigger = @Queue.pop)
        trigger.resolve
      end
      return self
    end
  end
end

#throttled_block { ... } ⇒ Object

Blocks current thread until the block can be executed.

Examples:

max_two = Throttle.new 2
10.times.map do
  Thread.new do
    max_two.throttled_block do
      # Only 2 at the same time
      do_stuff
    end
  end
end

Yields:

  • to throttled block

Yield Returns:

  • (Object)

    is used as a result of the method

Returns:

  • (Object)

    the result of the block



111
112
113
114
115
116
# File 'lib/concurrent/edge/throttle.rb', line 111

def throttled_block(&block)
  trigger.wait
  block.call
ensure
  release
end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



119
120
121
# File 'lib/concurrent/edge/throttle.rb', line 119

def to_s
  format '<#%s:0x%x limit:%s can_run:%d>', self.class, object_id << 1, @Limit, can_run
end

#triggerPromises::Event

New event which will be resolved when depending tasks can execute. Has to be used and after the critical work is done #release must be called exactly once.

Returns:

See Also:



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/concurrent/edge/throttle.rb', line 75

def trigger
  while true
    current_can_run = can_run
    if compare_and_set_can_run current_can_run, current_can_run - 1
      if current_can_run > 0
        return Promises.resolved_event
      else
        event = Promises.resolvable_event
        @Queue.push event
        return event
      end
    end
  end
end