Class: Procrastinate::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/procrastinate/scheduler.rb

Overview

Note:

Each scheduler owns its own thread that does all the processing. The interface between your main thread and the procrastinate thread is defined in this class.

API Frontend for the procrastinate library. Allows scheduling of tasks and workers in seperate processes and provides minimal locking primitives.

Synopsis

scheduler = Procrastinate::Scheduler.start

Schedule a block to run in its own process:

result = scheduler.schedule { Process.pid }
result.value  # => child process pid

Or schedule a message call to an object to be run in another process:

proxy = scheduler.proxy(1)
result = proxy + 2
result.value  # => 3

You can ask the result value if it is ready yet:

result.ready? # true/false

Stop the scheduler, waiting for all scheduled work to finish:

scheduler.shutdown

Or shutting down hard, doesn’t wait for work to finish:

scheduler.shutting(true)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(strategy) ⇒ Scheduler

Returns a new instance of Scheduler.

See Also:



41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/procrastinate/scheduler.rb', line 41

def initialize(strategy)
  @strategy   = strategy || Procrastinate::SpawnStrategy::Default.new
  @manager = Procrastinate::ProcessManager.new

  # State takes three values: :running, :soft_shutdown, :real_shutdown
  # :soft_shutdown will not accept any new tasks and wait for completion
  # :real_shutdown will stop as soon as possible (still closing down nicely)
  @state          = :running
  
  # If we're used in server mode, this will be replaced with a task producer
  # that produces new worker processes.
  @task_producer  = Queue.new
end

Instance Attribute Details

#managerObject (readonly)

Process manager associated with this scheduler



34
35
36
# File 'lib/procrastinate/scheduler.rb', line 34

def manager
  @manager
end

#strategyObject (readonly)

Schedule strategy associated with this scheduler



36
37
38
# File 'lib/procrastinate/scheduler.rb', line 36

def strategy
  @strategy
end

#task_producerObject (readonly)

Task queue



38
39
40
# File 'lib/procrastinate/scheduler.rb', line 38

def task_producer
  @task_producer
end

Class Method Details

.start(strategy = nil) ⇒ Scheduler

Starts a new scheduler.

Parameters:

Returns:



61
62
63
64
# File 'lib/procrastinate/scheduler.rb', line 61

def self.start(strategy=nil)
  new(strategy).
    tap { |obj| obj.start }
end

Instance Method Details

#joinObject

Waits for the currently queued work to complete. This can be used at the end of short scripts to ensure that all work is done.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/procrastinate/scheduler.rb', line 127

def join
  @state = :soft_shutdown
  
  # NOTE: Currently, this method busy-loops until all childs terminate. 
  # This is not as elegant as I whish it to be, but its a start. 
  
  # Wait until all tasks are done.
  loop do
    manager.wakeup
    break if task_producer.empty? && manager.process_count==0
    sleep 0.01
  end
  
ensure
  @state = :running
end

#proxy(worker) ⇒ Proxy

Returns a proxy for the worker instance that will allow executing its methods in a new process.

Examples:

proxy = scheduler.proxy(worker)
status = proxy.do_some_work    # will execute later and in its own process

Parameters:

  • worker (Object)

    Ruby object that executes the work

Returns:



79
80
81
# File 'lib/procrastinate/scheduler.rb', line 79

def proxy(worker)
  return Procrastinate::Proxy.new(worker, self)
end

#runtimeRuntime

Returns a runtime linked to this scheduler. This method should only be used inside task execution processes; If you call it from your main process, the result is undefined.

Returns:



89
90
91
# File 'lib/procrastinate/scheduler.rb', line 89

def runtime
  Procrastinate::Runtime.new
end

#schedule(task = nil) ⇒ Task::Result #schedule(&block) ⇒ Task::Result

Called by the proxy to schedule work. You can implement your own Task classes; the relevant interface consists of only a #run method.

Overloads:

  • #schedule(task = nil) ⇒ Task::Result

    Runs task in its own worker process.

    Parameters:

    • task (#run) (defaults to: nil)

      task to be run in its own worker process

    Returns:

  • #schedule(&block) ⇒ Task::Result

    Executes the Ruby block in its own worker process.

    Parameters:

    • block (Proc)

      block to be executed in worker process

    Returns:



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/procrastinate/scheduler.rb', line 106

def schedule(task=nil, &block)
  fail "Shutting down..." if @state != :running
  
  fail ArgumentError, "Either task or block must be given." \
    if !task && !block
  
  if block
    task = Procrastinate::Task::Callable.new(block)
  end
  
  task_producer << task
  
  # Create an occasion for spawning
  manager.wakeup
  
  task.result
end

#shutdown(hard = false) ⇒ Object

Immediately shuts down the procrastinate thread and frees resources. If there are any tasks left in the queue, they will NOT be executed.



147
148
149
150
151
152
153
154
155
156
157
# File 'lib/procrastinate/scheduler.rb', line 147

def shutdown(hard=false)
  join unless hard

  # Set the flag that will provoke shutdown
  @state = :real_shutdown
  # Wake the manager up, making it check the flag
  manager.wakeup
  # Wait for the manager to finish its work. This waits for child processes
  # and then reaps their result, avoiding zombies. 
  @thread.join if @thread
end

#startObject



65
66
67
# File 'lib/procrastinate/scheduler.rb', line 65

def start
  start_thread
end