Class: Concurrent::ScheduledTask

Inherits:
IVar
  • Object
show all
Includes:
Comparable
Defined in:
lib/concurrent/scheduled_task.rb

Overview

Note:

Time calculations one all platforms and languages are sensitive to changes to the system clock. To alleviate the potential problems associated with changing the system clock while an application is running, most modern operating systems provide a monotonic clock that operates independently of the system clock. A monotonic clock cannot be used to determine human-friendly clock times. A monotonic clock is used exclusively for calculating time intervals. Not all Ruby platforms provide access to an operating system monotonic clock. On these platforms a pure-Ruby monotonic clock will be used as a fallback. An operating system monotonic clock is both faster and more reliable than the pure-Ruby implementation. The pure-Ruby implementation should be fast and reliable enough for most non-realtime operations. At this time the common Ruby platforms that provide access to an operating system monotonic clock are MRI 2.1 and above and JRuby (all versions).

‘ScheduledTask` is a close relative of `Concurrent::Future` but with one important difference: A `Future` is set to execute as soon as possible whereas a `ScheduledTask` is set to execute after a specified delay. This implementation is loosely based on Java’s [ScheduledExecutorService](docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html). It is a more feature-rich variant of timer.

The intended schedule time of task execution is set on object construction with the ‘delay` argument. The delay is a numeric (floating point or integer) representing a number of seconds in the future. Any other value or a numeric equal to or less than zero will result in an exception. The actual schedule time of task execution is set when the `execute` method is called.

The constructor can also be given zero or more processing options. Currently the only supported options are those recognized by the [Dereferenceable](Dereferenceable) module.

The final constructor argument is a block representing the task to be performed. If no block is given an ‘ArgumentError` will be raised.

States

‘ScheduledTask` mixes in the [Obligation](Obligation) module thus giving it “future” behavior. This includes the expected lifecycle states. `ScheduledTask` has one additional state, however. While the task (block) is being executed the state of the object will be `:processing`. This additional state is necessary because it has implications for task cancellation.

Cancellation

A ‘:pending` task can be cancelled using the `#cancel` method. A task in any other state, including `:processing`, cannot be cancelled. The `#cancel` method returns a boolean indicating the success of the cancellation attempt. A cancelled `ScheduledTask` cannot be restarted. It is immutable.

**Obligation and Observation**

The result of a ‘ScheduledTask` can be obtained either synchronously or asynchronously. `ScheduledTask` mixes in both the [Obligation](Obligation) module and the [Observable](ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html) module from the Ruby standard library. With one exception `ScheduledTask` behaves identically to [Future](Observable) with regard to these modules.

## Copy Options

Object references in Ruby are mutable. This can lead to serious problems when the Concern::Obligation#value of an object is a mutable reference. Which is always the case unless the value is a ‘Fixnum`, `Symbol`, or similar “primative” data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the oject instance is created:

  • ‘:dup_on_deref` When true the object will call the `#dup` method on the `value` object every time the `#value` methid is called (default: false)

  • ‘:freeze_on_deref` When true the object will call the `#freeze` method on the `value` object every time the `#value` method is called (default: false)

  • ‘:copy_on_deref` When given a `Proc` object the `Proc` will be run every time the `#value` method is called. The `Proc` will be given the current `value` as its only argument and the result returned by the block will be the return value of the `#value` call. When `nil` this option will be ignored (default: nil)

When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:

  • ‘:copy_on_deref`

  • ‘:dup_on_deref`

  • ‘:freeze_on_deref`

Because of this ordering there is no need to ‘#freeze` an object created by a provided `:copy_on_deref` block. Simply set `:freeze_on_deref` to `true`. Setting both `:dup_on_deref` to `true` and `:freeze_on_deref` to `true` is as close to the behavior of a “pure” functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.

Examples:

Basic usage


require 'concurrent'
require 'thread'   # for Queue
require 'open-uri' # for open(uri)

class Ticker
  def get_year_end_closing(symbol, year)
    uri = "http://ichart.finance.yahoo.com/table.csv?s=#{symbol}&a=11&b=01&c=#{year}&d=11&e=31&f=#{year}&g=m"
    data = open(uri) {|f| f.collect{|line| line.strip } }
    data[1].split(',')[4].to_f
  end
end

# Future
price = Concurrent::Future.execute{ Ticker.new.get_year_end_closing('TWTR', 2013) }
price.state #=> :pending
sleep(1)    # do other stuff
price.value #=> 63.65
price.state #=> :fulfilled

# ScheduledTask
task = Concurrent::ScheduledTask.execute(2){ Ticker.new.get_year_end_closing('INTC', 2013) }
task.state #=> :pending
sleep(3)   # do other stuff
task.value #=> 25.96

Successful task execution


task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }
task.state         #=> :unscheduled
task.execute
task.state         #=> pending

# wait for it...
sleep(3)

task.unscheduled? #=> false
task.pending?     #=> false
task.fulfilled?   #=> true
task.rejected?    #=> false
task.value        #=> 'What does the fox say?'

One line creation and execution


task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }.execute
task.state         #=> pending

task = Concurrent::ScheduledTask.execute(2){ 'What do you get when you multiply 6 by 9?' }
task.state         #=> pending

Failed task execution


task = Concurrent::ScheduledTask.execute(2){ raise StandardError.new('Call me maybe?') }
task.pending?      #=> true

# wait for it...
sleep(3)

task.unscheduled? #=> false
task.pending?     #=> false
task.fulfilled?   #=> false
task.rejected?    #=> true
task.value        #=> nil
task.reason       #=> #<StandardError: Call me maybe?>

Task execution with observation


observer = Class.new{
  def update(time, value, reason)
    puts "The task completed at #{time} with value '#{value}'"
  end
}.new

task = Concurrent::ScheduledTask.new(2){ 'What does the fox say?' }
task.add_observer(observer)
task.execute
task.pending?      #=> true

# wait for it...
sleep(3)

#>> The task completed at 2013-11-07 12:26:09 -0500 with value 'What does the fox say?'

See Also:

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from IVar

#add_observer, #fail, #set, #try_set

Methods included from Concern::Observable

#add_observer, #count_observers, #delete_observer, #delete_observers, #with_observer

Methods included from Concern::Obligation

#complete?, #exception, #fulfilled?, #incomplete?, #pending?, #reason, #rejected?, #state, #unscheduled?, #value, #value!, #wait, #wait!

Methods included from Concern::Dereferenceable

#value

Constructor Details

#initialize(delay, opts = {}) { ... } ⇒ ScheduledTask

Schedule a task for execution at a specified future time.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given ‘Executor` instance. Three special values are also supported: `:task` returns the global task pool, `:operation` returns the global operation pool, and `:immediate` returns a new `ImmediateExecutor` object.

  • :dup_on_deref (Boolean) — default: false

    Call ‘#dup` before returning the data from Concern::Obligation#value

  • :freeze_on_deref (Boolean) — default: false

    Call ‘#freeze` before returning the data from Concern::Obligation#value

  • :copy_on_deref (Proc) — default: nil

    When calling the Concern::Obligation#value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

  • :args (object, Array)

    zero or more arguments to be passed the task block on execution

Yields:

  • the task to be performed

Raises:

  • (ArgumentError)

    When no block is given

  • (ArgumentError)

    When given a time that is in the past



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/concurrent/scheduled_task.rb', line 165

def initialize(delay, opts = {}, &task)
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0

  super(NULL, opts, &nil)

  synchronize do
    ns_set_state(:unscheduled)
    @parent = opts.fetch(:timer_set, Concurrent.global_timer_set)
    @args = get_arguments_from(opts)
    @delay = delay.to_f
    @task = task
    @time = nil
    @executor = Options.executor_from_options(opts) || Concurrent.global_io_executor
    self.observers = Collection::CopyOnNotifyObserverSet.new
  end
end

Class Method Details

.execute(delay, opts = {}, &task) ⇒ ScheduledTask

Create a new ‘ScheduledTask` object with the given block, execute it, and return the `:pending` object.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given ‘Executor` instance. Three special values are also supported: `:task` returns the global task pool, `:operation` returns the global operation pool, and `:immediate` returns a new `ImmediateExecutor` object.

  • :dup_on_deref (Boolean) — default: false

    Call ‘#dup` before returning the data from Concern::Obligation#value

  • :freeze_on_deref (Boolean) — default: false

    Call ‘#freeze` before returning the data from Concern::Obligation#value

  • :copy_on_deref (Proc) — default: nil

    When calling the Concern::Obligation#value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

Returns:

  • (ScheduledTask)

    the newly created ‘ScheduledTask` in the `:pending` state

Raises:

  • (ArgumentError)

    if no block is given



277
278
279
# File 'lib/concurrent/scheduled_task.rb', line 277

def self.execute(delay, opts = {}, &task)
  new(delay, opts, &task).execute
end

Instance Method Details

#cancelBoolean

Cancel this task and prevent it from executing. A task can only be cancelled if it is pending or unscheduled.

Returns:

  • (Boolean)

    true if successfully cancelled else false



222
223
224
225
226
227
228
229
230
231
# File 'lib/concurrent/scheduled_task.rb', line 222

def cancel
  if compare_and_set_state(:cancelled, :pending, :unscheduled)
    complete(false, nil, CancelledOperationError.new)
    # To avoid deadlocks this call must occur outside of #synchronize
    # Changing the state above should prevent redundant calls
    @parent.send(:remove_task, self)
  else
    false
  end
end

#cancelled?Boolean

Has the task been cancelled?

Returns:

  • (Boolean)

    true if the task is in the given state else false



207
208
209
# File 'lib/concurrent/scheduled_task.rb', line 207

def cancelled?
  synchronize { ns_check_state?(:cancelled) }
end

#executeScheduledTask

Execute an ‘:unscheduled` `ScheduledTask`. Immediately sets the state to `:pending` and starts counting down toward execution. Does nothing if the `ScheduledTask` is in any state other than `:unscheduled`.

Returns:



260
261
262
263
264
265
# File 'lib/concurrent/scheduled_task.rb', line 260

def execute
  if compare_and_set_state(:pending, :unscheduled)
    synchronize{ ns_schedule(@delay) }
  end
  self
end

#initial_delayFloat

The ‘delay` value given at instanciation.

Returns:

  • (Float)

    the initial delay.



186
187
188
# File 'lib/concurrent/scheduled_task.rb', line 186

def initial_delay
  synchronize { @delay }
end

#processing?Boolean

In the task execution in progress?

Returns:

  • (Boolean)

    true if the task is in the given state else false



214
215
216
# File 'lib/concurrent/scheduled_task.rb', line 214

def processing?
  synchronize { ns_check_state?(:processing) }
end

#reschedule(delay) ⇒ Boolean

Reschedule the task using the given delay and the current time. A task can only be reset while it is ‘:pending`.

Parameters:

  • delay (Float)

    the number of seconds to wait for before executing the task

Returns:

  • (Boolean)

    true if successfully rescheduled else false

Raises:

  • (ArgumentError)

    When given a time that is in the past



249
250
251
252
253
# File 'lib/concurrent/scheduled_task.rb', line 249

def reschedule(delay)
  delay = delay.to_f
  raise ArgumentError.new('seconds must be greater than zero') if delay < 0.0
  synchronize{ ns_reschedule(delay) }
end

#resetBoolean

Reschedule the task using the original delay and the current time. A task can only be reset while it is ‘:pending`.

Returns:

  • (Boolean)

    true if successfully rescheduled else false



237
238
239
# File 'lib/concurrent/scheduled_task.rb', line 237

def reset
  synchronize{ ns_reschedule(@delay) }
end

#schedule_timeFloat

The monotonic time at which the the task is scheduled to be executed.

Returns:

  • (Float)

    the schedule time or nil if ‘unscheduled`



193
194
195
# File 'lib/concurrent/scheduled_task.rb', line 193

def schedule_time
  synchronize { @time }
end