Class: Concurrent::Throttle

Inherits:
Synchronization::Object
  • Object
show all
Includes:
PromisesIntegration
Defined in:
lib/concurrent/edge/throttle.rb

Overview

A tool manage concurrency level of future tasks.

Examples:

data     = (1..5).to_a
db       = data.reduce({}) { |h, v| h.update v => v.to_s }
max_two  = Throttle.new 2

futures = data.map do |data|
  Promises.future(data) do |data|
    # un-throttled, concurrency level equal data.size
    data + 1
  end.then_throttled_by(max_two, db) do |v, db|
    # throttled, only 2 tasks executed at the same time
    # e.g. limiting access to db
    db[v]
  end
end

futures.map(&:value!) # => [2, 3, 4, 5, nil]
throttle.throttled_future_chain do |trigger|
  trigger.
      # 2 throttled promises
      chain { 1 }.
      then(&:succ)
end
max_two = Throttle.new 2
10.times.map do
  Thread.new do
    max_two.throttled_block do
      # Only 2 at the same time
      do_stuff
    end
  end
end

Defined Under Namespace

Modules: PromisesIntegration

Instance Method Summary collapse

Methods included from PromisesIntegration

#throttled_future, #throttled_future_chain

Constructor Details

#initialize(limit) ⇒ Throttle

New throttle.

Parameters:

  • limit (Integer)


53
54
55
56
57
58
# File 'lib/concurrent/edge/throttle.rb', line 53

def initialize(limit)
  super()
  @Limit       = limit
  self.can_run = limit
  @Queue       = LockFreeQueue.new
end

Instance Method Details

#limitInteger

Returns The limit.

Returns:

  • (Integer)

    The limit.



61
62
63
# File 'lib/concurrent/edge/throttle.rb', line 61

def limit
  @Limit
end

#releaseself

Has to be called once for each trigger after it is ok to execute another throttled task.

Returns:

  • (self)

See Also:



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/concurrent/edge/throttle.rb', line 87

def release
  while true
    current_can_run = can_run
    if compare_and_set_can_run current_can_run, current_can_run + 1
      if current_can_run < 0
        Thread.pass until (trigger = @Queue.pop)
        trigger.resolve
      end
      return self
    end
  end
end

#throttled_block { ... } ⇒ Object

Blocks current thread until the block can be executed.

Examples:

max_two = Throttle.new 2
10.times.map do
  Thread.new do
    max_two.throttled_block do
      # Only 2 at the same time
      do_stuff
    end
  end
end

Yields:

  • to throttled block

Yield Returns:

  • (Object)

    is used as a result of the method

Returns:

  • (Object)

    the result of the block



105
106
107
108
109
110
# File 'lib/concurrent/edge/throttle.rb', line 105

def throttled_block(&block)
  trigger.wait
  block.call
ensure
  release
end

#to_sString Also known as: inspect

Returns Short string representation.

Returns:

  • (String)

    Short string representation.



113
114
115
# File 'lib/concurrent/edge/throttle.rb', line 113

def to_s
  format '<#%s:0x%x limit:%s can_run:%d>', self.class, object_id << 1, @Limit, can_run
end

#triggerPromises::Event

New event which will be resolved when depending tasks can execute. Has to be used and after the critical work is done #release must be called exactly once.

Returns:

See Also:



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/concurrent/edge/throttle.rb', line 69

def trigger
  while true
    current_can_run = can_run
    if compare_and_set_can_run current_can_run, current_can_run - 1
      if current_can_run > 0
        return Promises.resolved_event
      else
        event = Promises.resolvable_event
        @Queue.push event
        return event
      end
    end
  end
end