Class: BackPressure::GatedExecutor

Inherits:
Executor
  • Object
show all
Defined in:
lib/back_pressure/gated_executor.rb

Overview

A GatedExecutor is an implementation of Executor that allows external control of back-pressure state, and is useful when non-blocking APIs provide hooks for identifying when they should block.

Examples:

Using a GatedExecutor with a non-blocking API

gated_executor = BackPressure::GatedExecutor.new

non_blocking_api_client.on_connection_blocked   { gated_executor.engage_back_pressure }
non_blocking_api_client.on_connection_unblocked { gated_executor.remove_back_pressure }

16.times do
  Thread.new do
    loop do
      message = queue.pop
      gated_executor.execute { non_blocking_api_client.push(message) }
    end
  end
end

Author:

Since:

  • 1.0.0

Instance Method Summary collapse

Constructor Details

#initialize(logger: nil, description: nil, log_threshold: 1) {|gated_executor| ... } ⇒ GatedExecutor

Returns a new instance of GatedExecutor.

Parameters:

  • logger (Logger) (defaults to: nil)

    : logger on which to emit (optional)

  • description (String) (defaults to: nil)

    : description for logs (optional)

  • log_threshold (Number) (defaults to: 1)

    : silences blockage warnings for durations less than the provided value (default ‘0`).

Yields:

  • (gated_executor)

    if a block is provided, the newly-created instance is yielded to the given block after being initialised.

Yield Parameters:

  • (self)

Yield Returns:

  • (void)

Since:

  • 1.0.0



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/back_pressure/gated_executor.rb', line 59

def initialize(logger:        nil,
               description:   nil,
               log_threshold: 1)

  @logger = logger
  @desc = (description ? description.dup : "#{self.class.name}<#{__id__}>").freeze
  @log_threshold = log_threshold

  @control_mutex = Mutex.new
  @control_condv = ConditionVariable.new

  @blocked_threads = Set.new
  @blocked_threads_mutex = Mutex.new

  yield(self) if block_given?
end

Instance Method Details

#back_pressure_engaged?Boolean

Helper method for determining if back-pressure is currently engaged.

Returns:

  • (Boolean)

Since:

  • 1.0.0



119
120
121
# File 'lib/back_pressure/gated_executor.rb', line 119

def back_pressure_engaged?
  @control_mutex.synchronize { @back_pressure_engaged }
end

#blocked?Boolean

Note:

This method should be used only for observation-based tooling.

Helper method for determining if any threads are currently blocked by back-pressure.

Returns:

  • (Boolean)

Since:

  • 1.0.0



162
163
164
# File 'lib/back_pressure/gated_executor.rb', line 162

def blocked?
  blocked_threads.any?
end

#blocked_threadsSet{Thread}

Note:

This method should be used only for observation-based tooling.

Helper method for observing which threads, if any, are blocked at the instant the method is invoked. The returned value is a frozen snapshot, and the included threads are not guaranteed to be still blocking by the time they are accessed.

Returns:

  • (Set{Thread})

Since:

  • 1.0.0



156
157
158
# File 'lib/back_pressure/gated_executor.rb', line 156

def blocked_threads
  @blocked_threads_mutex.synchronize { @blocked_threads.dup.freeze }
end

#engage_back_pressure(reason = DEFAULT_REASON) ⇒ void

This method returns an undefined value.

Engages back-pressure and immediately returns; threads that send this instance ‘GatedExecutor#execute` will be blocked until back-pressure is removed.

Parameters:

  • reason (String) (defaults to: DEFAULT_REASON)

    : the reason back-pressure is being applied, to be included in the log message (optional).

Since:

  • 1.0.0



84
85
86
87
88
89
90
91
92
93
# File 'lib/back_pressure/gated_executor.rb', line 84

def engage_back_pressure(reason=DEFAULT_REASON)
  @control_mutex.synchronize do
    if !@back_pressure_engaged
      @back_pressure_engaged = true
      @logger && @logger.info("#{@desc} back-pressure engaged (#{reason})")
    else
      @logger && @logger.debug("#{@desc} attempted to engage back-pressure when it is already engaged (#{reason})")
    end
  end
end

#execute(blocking_time_limit = nil) ⇒ Boolean

Note:

Care must be taken to ensure that back-pressure control is executed outside of this block, as the block provided is not executed while back-pressure is engaged.

Executes the provided block, after waiting out any back-pressure, returning ‘true` IFF the block was executed.

Parameters:

  • blocking_time_limit (Number) (defaults to: nil)

    : the maximum time to wait, in seconds, when back-pressure is being applied, before aborting (optional).

Yield Returns:

  • (void)

    : the value returned by the block is ignored by this method.

Returns:

  • (Boolean)

    : returns ‘true` if block was successfully executed, and `false` if tht `blocking_time_limit` was reached before it could be executed.

Since:

  • 1.0.0



129
130
131
132
133
134
135
136
137
138
# File 'lib/back_pressure/gated_executor.rb', line 129

def execute(blocking_time_limit=nil)
  fail(ArgumentError, 'block required!') unless block_given?

  if !@back_pressure_engaged || block_until_back_pressure_removed(blocking_time_limit)
    yield
    return true
  else
    return false
  end
end

#execute!(blocking_time_limit = nil) ⇒ Object

Note:

Care must be taken to ensure that back-pressure control is executed outside of this block, as the block provided is not executed while back-pressure is engaged.

Executes the provided block, after waiting out any back-pressure, returning the result of the block or raising an ‘ExecutionExpired` exception if the provided limit was reached before execution could begin.

Parameters:

  • blocking_time_limit (Number) (defaults to: nil)

    : the maximum time to wait, in seconds, when back-pressure is being applied, before aborting (optional).

Yield Returns:

  • (Object)

    : the value returned from the block is returned by this method

Returns:

  • (Object)

    : returns the unmodified value of the result of executing the provided block.

Raises:

Since:

  • 1.0.0



146
147
148
149
150
151
152
# File 'lib/back_pressure/gated_executor.rb', line 146

def execute!(blocking_time_limit=nil)
  execute(blocking_time_limit) do
    return yield
  end

  fail(ExecutionExpired)
end

#remove_back_pressure(reason = DEFAULT_REASON) ⇒ void

Note:

No guarantee of ordering are made with regard to threads that are blocked at the instant back-pressure is removed.

This method returns an undefined value.

Removes back-pressure, waking any threads that are currently blocked by back-pressure, and immediately returns.

Since:

  • 1.0.0



103
104
105
106
107
108
109
110
111
112
113
# File 'lib/back_pressure/gated_executor.rb', line 103

def remove_back_pressure(reason=DEFAULT_REASON)
  @control_mutex.synchronize do
    if @back_pressure_engaged
      @back_pressure_engaged = false
      @logger && @logger.info("#{@desc} back-pressure removed (#{reason})")
      @control_condv.broadcast # wakeup _all_ waiting threads
    else
      @logger && @logger.debug("#{@desc} attempted to remove back-pressure when it not engaged (#{reason})")
    end
  end
end