Class: Procrastinate::Scheduler
- Inherits:
-
Object
- Object
- Procrastinate::Scheduler
- Defined in:
- lib/procrastinate/scheduler.rb
Overview
Each scheduler owns its own thread that does all the processing. The interface between your main thread and the procrastinate thread is defined in this class.
API Frontend for the procrastinate library. Allows scheduling of tasks and workers in seperate processes and provides minimal locking primitives.
Synopsis
scheduler = Procrastinate::Scheduler.start
Schedule a block to run in its own process:
result = scheduler.schedule { Process.pid }
result.value # => child process pid
Or schedule a message call to an object to be run in another process:
proxy = scheduler.proxy(1)
result = proxy + 2
result.value # => 3
You can ask the result value if it is ready yet:
result.ready? # true/false
Stop the scheduler, waiting for all scheduled work to finish:
scheduler.shutdown
Or shutting down hard, doesn’t wait for work to finish:
scheduler.shutting(true)
Instance Attribute Summary collapse
-
#manager ⇒ Object
readonly
Process manager associated with this scheduler.
-
#strategy ⇒ Object
readonly
Schedule strategy associated with this scheduler.
-
#task_producer ⇒ Object
readonly
Task queue.
Class Method Summary collapse
-
.start(strategy = nil) ⇒ Scheduler
Starts a new scheduler.
Instance Method Summary collapse
-
#initialize(strategy) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#join ⇒ Object
Waits for the currently queued work to complete.
-
#proxy(worker) ⇒ Proxy
Returns a proxy for the
worker
instance that will allow executing its methods in a new process. -
#runtime ⇒ Runtime
Returns a runtime linked to this scheduler.
-
#schedule(task = nil, &block) ⇒ Object
Called by the proxy to schedule work.
-
#shutdown(hard = false) ⇒ Object
Immediately shuts down the procrastinate thread and frees resources.
- #start ⇒ Object
Constructor Details
#initialize(strategy) ⇒ Scheduler
Returns a new instance of Scheduler.
41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/procrastinate/scheduler.rb', line 41 def initialize(strategy) @strategy = strategy || Procrastinate::SpawnStrategy::Default.new @manager = Procrastinate::ProcessManager.new # State takes three values: :running, :soft_shutdown, :real_shutdown # :soft_shutdown will not accept any new tasks and wait for completion # :real_shutdown will stop as soon as possible (still closing down nicely) @state = :running # If we're used in server mode, this will be replaced with a task producer # that produces new worker processes. @task_producer = Queue.new end |
Instance Attribute Details
#manager ⇒ Object (readonly)
Process manager associated with this scheduler
34 35 36 |
# File 'lib/procrastinate/scheduler.rb', line 34 def manager @manager end |
#strategy ⇒ Object (readonly)
Schedule strategy associated with this scheduler
36 37 38 |
# File 'lib/procrastinate/scheduler.rb', line 36 def strategy @strategy end |
#task_producer ⇒ Object (readonly)
Task queue
38 39 40 |
# File 'lib/procrastinate/scheduler.rb', line 38 def task_producer @task_producer end |
Class Method Details
.start(strategy = nil) ⇒ Scheduler
Starts a new scheduler.
61 62 63 64 |
# File 'lib/procrastinate/scheduler.rb', line 61 def self.start(strategy=nil) new(strategy). tap { |obj| obj.start } end |
Instance Method Details
#join ⇒ Object
Waits for the currently queued work to complete. This can be used at the end of short scripts to ensure that all work is done.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/procrastinate/scheduler.rb', line 127 def join @state = :soft_shutdown # NOTE: Currently, this method busy-loops until all childs terminate. # This is not as elegant as I whish it to be, but its a start. # Wait until all tasks are done. loop do manager.wakeup break if task_producer.empty? && manager.process_count==0 sleep 0.01 end ensure @state = :running end |
#proxy(worker) ⇒ Proxy
Returns a proxy for the worker
instance that will allow executing its methods in a new process.
79 80 81 |
# File 'lib/procrastinate/scheduler.rb', line 79 def proxy(worker) return Procrastinate::Proxy.new(worker, self) end |
#runtime ⇒ Runtime
Returns a runtime linked to this scheduler. This method should only be used inside task execution processes; If you call it from your main process, the result is undefined.
89 90 91 |
# File 'lib/procrastinate/scheduler.rb', line 89 def runtime Procrastinate::Runtime.new end |
#schedule(task = nil) ⇒ Task::Result #schedule(&block) ⇒ Task::Result
Called by the proxy to schedule work. You can implement your own Task classes; the relevant interface consists of only a #run method.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/procrastinate/scheduler.rb', line 106 def schedule(task=nil, &block) fail "Shutting down..." if @state != :running fail ArgumentError, "Either task or block must be given." \ if !task && !block if block task = Procrastinate::Task::Callable.new(block) end task_producer << task # Create an occasion for spawning manager.wakeup task.result end |
#shutdown(hard = false) ⇒ Object
Immediately shuts down the procrastinate thread and frees resources. If there are any tasks left in the queue, they will NOT be executed.
147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/procrastinate/scheduler.rb', line 147 def shutdown(hard=false) join unless hard # Set the flag that will provoke shutdown @state = :real_shutdown # Wake the manager up, making it check the flag manager.wakeup # Wait for the manager to finish its work. This waits for child processes # and then reaps their result, avoiding zombies. @thread.join if @thread end |
#start ⇒ Object
65 66 67 |
# File 'lib/procrastinate/scheduler.rb', line 65 def start start_thread end |