Class: BackPressure::GatedExecutor

Inherits:
Executor
  • Object
show all
Defined in:
lib/back_pressure/gated_executor.rb

Overview

A GatedExecutor is an implementation of Executor that allows external control of back-pressure state, and is useful when non-blocking APIs provide hooks for identifying when they should block.

Examples:

Using a GatedExecutor with a non-blocking API

gated_executor = BackPressure::GatedExecutor.new

non_blocking_api_client.on_connection_blocked   { gated_executor.engage_back_pressure }
non_blocking_api_client.on_connection_unblocked { gated_executor.remove_back_pressure }

16.times do
  Thread.new do
    loop do
      message = queue.pop
      gated_executor.execute { non_blocking_api_client.push(message) }
    end
  end
end

Author:

Since:

  • 1.0.0

Instance Method Summary collapse

Constructor Details

#initialize(logger: nil, description: nil, log_threshold: 1) {|gated_executor| ... } ⇒ GatedExecutor

Returns a new instance of GatedExecutor

Yields:

  • (gated_executor)

    if a block is provided, the newly-created instance is yielded to the given block after being initialised.

Yield Parameters:

  • (self)

Yield Returns:

  • (void)

Since:

  • 1.0.0


59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/back_pressure/gated_executor.rb', line 59

def initialize(logger:        nil,
               description:   nil,
               log_threshold: 1)

  @logger = logger
  @desc = (description ? description.dup : "#{self.class.name}<#{__id__}>").freeze
  @log_threshold = log_threshold

  @control_mutex = Mutex.new
  @control_condv = ConditionVariable.new

  @blocked_threads = Set.new
  @blocked_threads_mutex = Mutex.new

  yield(self) if block_given?
end

Instance Method Details

#back_pressure_engaged?Boolean

Helper method for determining if back-pressure is currently engaged.

Since:

  • 1.0.0


119
120
121
# File 'lib/back_pressure/gated_executor.rb', line 119

def back_pressure_engaged?
  @control_mutex.synchronize { @back_pressure_engaged }
end

#blocked?Boolean

Note:

This method should be used only for observation-based tooling.

Helper method for determining if any threads are currently blocked by back-pressure.

Since:

  • 1.0.0


162
163
164
# File 'lib/back_pressure/gated_executor.rb', line 162

def blocked?
  blocked_threads.any?
end

#blocked_threadsSet{Thread}

Note:

This method should be used only for observation-based tooling.

Helper method for observing which threads, if any, are blocked at the instant the method is invoked. The returned value is a frozen snapshot, and the included threads are not guaranteed to be still blocking by the time they are accessed.

Since:

  • 1.0.0


156
157
158
# File 'lib/back_pressure/gated_executor.rb', line 156

def blocked_threads
  @blocked_threads_mutex.synchronize { @blocked_threads.dup.freeze }
end

#engage_back_pressure(reason = DEFAULT_REASON) ⇒ void

This method returns an undefined value.

Engages back-pressure and immediately returns; threads that send this instance `GatedExecutor#execute` will be blocked until back-pressure is removed.

Since:

  • 1.0.0


84
85
86
87
88
89
90
91
92
93
# File 'lib/back_pressure/gated_executor.rb', line 84

def engage_back_pressure(reason=DEFAULT_REASON)
  @control_mutex.synchronize do
    if !@back_pressure_engaged
      @back_pressure_engaged = true
      @logger && @logger.info("#{@desc} back-pressure engaged (#{reason})")
    else
      @logger && @logger.debug("#{@desc} attempted to engage back-pressure when it is already engaged (#{reason})")
    end
  end
end

#execute(blocking_time_limit = nil) ⇒ Boolean

Note:

Care must be taken to ensure that back-pressure control is executed outside of this block, as the block provided is not executed while back-pressure is engaged.

Executes the provided block, after waiting out any back-pressure, returning `true` IFF the block was executed.

Yield Returns:

  • (void)

    : the value returned by the block is ignored by this method.

Since:

  • 1.0.0


129
130
131
132
133
134
135
136
137
138
# File 'lib/back_pressure/gated_executor.rb', line 129

def execute(blocking_time_limit=nil)
  fail(ArgumentError, 'block required!') unless block_given?

  if !@back_pressure_engaged || block_until_back_pressure_removed(blocking_time_limit)
    yield
    return true
  else
    return false
  end
end

#execute!(blocking_time_limit = nil) ⇒ Object

Note:

Care must be taken to ensure that back-pressure control is executed outside of this block, as the block provided is not executed while back-pressure is engaged.

Executes the provided block, after waiting out any back-pressure, returning the result of the block or raising an `ExecutionExpired` exception if the provided limit was reached before execution could begin.

Yield Returns:

  • (Object)

    : the value returned from the block is returned by this method

Raises:

Since:

  • 1.0.0


146
147
148
149
150
151
152
# File 'lib/back_pressure/gated_executor.rb', line 146

def execute!(blocking_time_limit=nil)
  execute(blocking_time_limit) do
    return yield
  end

  fail(ExecutionExpired)
end

#remove_back_pressure(reason = DEFAULT_REASON) ⇒ void

Note:

No guarantee of ordering are made with regard to threads that are blocked at the instant back-pressure is removed.

This method returns an undefined value.

Removes back-pressure, waking any threads that are currently blocked by back-pressure, and immediately returns.

Since:

  • 1.0.0


103
104
105
106
107
108
109
110
111
112
113
# File 'lib/back_pressure/gated_executor.rb', line 103

def remove_back_pressure(reason=DEFAULT_REASON)
  @control_mutex.synchronize do
    if @back_pressure_engaged
      @back_pressure_engaged = false
      @logger && @logger.info("#{@desc} back-pressure removed (#{reason})")
      @control_condv.broadcast # wakeup _all_ waiting threads
    else
      @logger && @logger.debug("#{@desc} attempted to remove back-pressure when it not engaged (#{reason})")
    end
  end
end