Class: Procrastinator::Queue
- Inherits:
-
Object
- Object
- Procrastinator::Queue
- Extended by:
- Forwardable
- Includes:
- QueueValidation
- Defined in:
- lib/procrastinator/queue.rb
Overview
A Queue defines how a certain type task will be processed.
Defined Under Namespace
Modules: QueueValidation
Constant Summary collapse
- DEFAULT_TIMEOUT =
Default number of seconds to wait for a task to complete
3600- DEFAULT_MAX_ATTEMPTS =
Default number of times to retry a task
20- DEFAULT_UPDATE_PERIOD =
Default amount of time between checks for new Tasks
10
Instance Attribute Summary collapse
-
#:max_attempts(: max_attempts) ⇒ Integer
readonly
Maximum number of attempts for tasks in this queue.
-
#:name(: name) ⇒ Symbol
readonly
The queue’s identifier symbol.
-
#:task_class(: task_class) ⇒ Class
readonly
Class that defines the work to be done for jobs in this queue.
-
#:timeout(: timeout) ⇒ Numeric
readonly
Duration (seconds) after which tasks in this queue should fail for taking too long.
-
#:update_period(: update_period) ⇒ Numeric
readonly
Delay (seconds) between reloads of tasks from the task store.
-
#max_attempts ⇒ Object
readonly
seconds.
-
#name ⇒ Object
readonly
seconds.
-
#task_class ⇒ Object
readonly
seconds.
-
#task_store ⇒ Object
(also: #store, #storage)
readonly
seconds.
-
#timeout ⇒ Object
readonly
seconds.
-
#update_period ⇒ Object
readonly
seconds.
Instance Method Summary collapse
-
#create(run_at:, expire_at:, data:) ⇒ Object
Creates a task on the queue, saved using the Task Store strategy.
-
#expects_data? ⇒ Boolean
Whether the task handler will accept data to be assigned via its :data attribute.
-
#fetch_task(identifier) ⇒ Object
Fetch a task matching the given identifier.
-
#initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) ⇒ Queue
constructor
Timeout is in seconds.
-
#next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) ⇒ LoggedTask?
Constructs the next available task on the queue.
Constructor Details
#initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) ⇒ Queue
Timeout is in seconds
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/procrastinator/queue.rb', line 38 def initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) raise ArgumentError, ':name cannot be nil' unless name raise ArgumentError, ':task_class cannot be nil' unless task_class raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new raise ArgumentError, ':timeout cannot be negative' if timeout&.negative? @name = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym @task_class = task_class @task_store = store @max_attempts = max_attempts @timeout = timeout @update_period = update_period validate! freeze end |
Instance Attribute Details
#:max_attempts(: max_attempts) ⇒ Integer (readonly)
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/procrastinator/queue.rb', line 18 class Queue extend Forwardable # Default number of seconds to wait for a task to complete DEFAULT_TIMEOUT = 3600 # in seconds; one hour total # Default number of times to retry a task DEFAULT_MAX_ATTEMPTS = 20 # Default amount of time between checks for new Tasks DEFAULT_UPDATE_PERIOD = 10 # seconds attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class alias store task_store alias storage task_store def_delegators :@task_store, :read, :update, :delete # Timeout is in seconds def initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) raise ArgumentError, ':name cannot be nil' unless name raise ArgumentError, ':task_class cannot be nil' unless task_class raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new raise ArgumentError, ':timeout cannot be negative' if timeout&.negative? @name = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym @task_class = task_class @task_store = store @max_attempts = max_attempts @timeout = timeout @update_period = update_period validate! freeze end # Constructs the next available task on the queue. # # @param logger [Logger] logger to provide to the constructed task handler # @param container [Object, nil] container to provide to the constructed task handler # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler # @return [LoggedTask, nil] A Task or nil if no task is found def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) = .find(&:runnable?) return nil unless task = Task.new(, task_handler(data: .data, container: container, logger: logger, scheduler: scheduler)) LoggedTask.new(task, logger: logger) end # Fetch a task matching the given identifier # # @param identifier [Hash] attributes to match # # @raise [NoSuchTaskError] when no task matches the identifier. # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific. def fetch_task(identifier) identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data] tasks = read(**identifier) raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty? if tasks.size > 1 raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }" end args = tasks.first.merge(queue: self) TaskMetaData.new(**args) end # Creates a task on the queue, saved using the Task Store strategy. # # @param run_at [Time] Earliest time to attempt running the task # @param expire_at [Time, nil] Time after which the task will not be attempted # @param data [Hash, String, Numeric, nil] The data to save # # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler. def create(run_at:, expire_at:, data:) if data.nil? && expects_data? raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay." end unless data.nil? || expects_data? raise MalformedTaskError, " found unexpected :data argument. Either do not provide :data when scheduling a task,\n or add this in the \#{ @task_class } class definition:\n attr_accessor :data\n ERROR\n end\n\n # TODO: shorten to using slice once updated to Ruby 2.5+\n attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n create_data = TaskMetaData.new(**attrs).to_h\n create_data.delete(:id)\n create_data.delete(:attempts)\n create_data.delete(:last_fail_at)\n create_data.delete(:last_error)\n @task_store.create(**create_data)\n end\n\n # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n def expects_data?\n @task_class.method_defined?(:data=)\n end\n\n private\n\n def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n handler = @task_class.new\n handler.data = data if handler.respond_to?(:data=)\n handler.container = container\n handler.logger = logger\n handler.scheduler = scheduler\n handler\n end\n\n def next_metas\n tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n end\n\n sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n end\n\n def sort_tasks(tasks)\n # TODO: improve this\n # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n # results\n # Ideally, we'd use a better algo than qsort for this, but this will do for now\n tasks.shuffle.sort_by(&:run_at)\n end\n\n # Internal queue validator\n module QueueValidation\n private\n\n def validate!\n verify_task_class!\n verify_task_store!\n end\n\n def verify_task_class!\n verify_run_method!\n verify_accessors!\n verify_hooks!\n end\n\n # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n # otherwise, you wouldn't know until task execution and that could be far in the future.\n # UX is important for devs, too.\n # - R\n def verify_run_method!\n unless @task_class.method_defined? :run\n raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n end\n\n return unless @task_class.instance_method(:run).arity.positive?\n\n raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n end\n\n def verify_accessors!\n [:logger, :container, :scheduler].each do |method_name|\n next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n raise MalformedTaskError, <<~ERR\n Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n attr_accessor :logger, :container, :scheduler\n ERR\n end\n end\n\n def verify_hooks!\n expected_arity = 1\n\n [:success, :fail, :final_fail].each do |method_name|\n next unless @task_class.method_defined?(method_name)\n next if @task_class.instance_method(method_name).arity == expected_arity\n\n err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n raise MalformedTaskError, err\n end\n end\n\n def verify_task_store!\n raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n [:read, :create, :update, :delete].each do |method|\n unless @task_store.respond_to? method\n raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n end\n end\n end\n end\n include QueueValidation\nend\n" |
#:name(: name) ⇒ Symbol (readonly)
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/procrastinator/queue.rb', line 18 class Queue extend Forwardable # Default number of seconds to wait for a task to complete DEFAULT_TIMEOUT = 3600 # in seconds; one hour total # Default number of times to retry a task DEFAULT_MAX_ATTEMPTS = 20 # Default amount of time between checks for new Tasks DEFAULT_UPDATE_PERIOD = 10 # seconds attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class alias store task_store alias storage task_store def_delegators :@task_store, :read, :update, :delete # Timeout is in seconds def initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) raise ArgumentError, ':name cannot be nil' unless name raise ArgumentError, ':task_class cannot be nil' unless task_class raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new raise ArgumentError, ':timeout cannot be negative' if timeout&.negative? @name = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym @task_class = task_class @task_store = store @max_attempts = max_attempts @timeout = timeout @update_period = update_period validate! freeze end # Constructs the next available task on the queue. # # @param logger [Logger] logger to provide to the constructed task handler # @param container [Object, nil] container to provide to the constructed task handler # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler # @return [LoggedTask, nil] A Task or nil if no task is found def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) = .find(&:runnable?) return nil unless task = Task.new(, task_handler(data: .data, container: container, logger: logger, scheduler: scheduler)) LoggedTask.new(task, logger: logger) end # Fetch a task matching the given identifier # # @param identifier [Hash] attributes to match # # @raise [NoSuchTaskError] when no task matches the identifier. # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific. def fetch_task(identifier) identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data] tasks = read(**identifier) raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty? if tasks.size > 1 raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }" end args = tasks.first.merge(queue: self) TaskMetaData.new(**args) end # Creates a task on the queue, saved using the Task Store strategy. # # @param run_at [Time] Earliest time to attempt running the task # @param expire_at [Time, nil] Time after which the task will not be attempted # @param data [Hash, String, Numeric, nil] The data to save # # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler. def create(run_at:, expire_at:, data:) if data.nil? && expects_data? raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay." end unless data.nil? || expects_data? raise MalformedTaskError, " found unexpected :data argument. Either do not provide :data when scheduling a task,\n or add this in the \#{ @task_class } class definition:\n attr_accessor :data\n ERROR\n end\n\n # TODO: shorten to using slice once updated to Ruby 2.5+\n attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n create_data = TaskMetaData.new(**attrs).to_h\n create_data.delete(:id)\n create_data.delete(:attempts)\n create_data.delete(:last_fail_at)\n create_data.delete(:last_error)\n @task_store.create(**create_data)\n end\n\n # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n def expects_data?\n @task_class.method_defined?(:data=)\n end\n\n private\n\n def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n handler = @task_class.new\n handler.data = data if handler.respond_to?(:data=)\n handler.container = container\n handler.logger = logger\n handler.scheduler = scheduler\n handler\n end\n\n def next_metas\n tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n end\n\n sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n end\n\n def sort_tasks(tasks)\n # TODO: improve this\n # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n # results\n # Ideally, we'd use a better algo than qsort for this, but this will do for now\n tasks.shuffle.sort_by(&:run_at)\n end\n\n # Internal queue validator\n module QueueValidation\n private\n\n def validate!\n verify_task_class!\n verify_task_store!\n end\n\n def verify_task_class!\n verify_run_method!\n verify_accessors!\n verify_hooks!\n end\n\n # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n # otherwise, you wouldn't know until task execution and that could be far in the future.\n # UX is important for devs, too.\n # - R\n def verify_run_method!\n unless @task_class.method_defined? :run\n raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n end\n\n return unless @task_class.instance_method(:run).arity.positive?\n\n raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n end\n\n def verify_accessors!\n [:logger, :container, :scheduler].each do |method_name|\n next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n raise MalformedTaskError, <<~ERR\n Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n attr_accessor :logger, :container, :scheduler\n ERR\n end\n end\n\n def verify_hooks!\n expected_arity = 1\n\n [:success, :fail, :final_fail].each do |method_name|\n next unless @task_class.method_defined?(method_name)\n next if @task_class.instance_method(method_name).arity == expected_arity\n\n err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n raise MalformedTaskError, err\n end\n end\n\n def verify_task_store!\n raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n [:read, :create, :update, :delete].each do |method|\n unless @task_store.respond_to? method\n raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n end\n end\n end\n end\n include QueueValidation\nend\n" |
#:task_class(: task_class) ⇒ Class (readonly)
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/procrastinator/queue.rb', line 18 class Queue extend Forwardable # Default number of seconds to wait for a task to complete DEFAULT_TIMEOUT = 3600 # in seconds; one hour total # Default number of times to retry a task DEFAULT_MAX_ATTEMPTS = 20 # Default amount of time between checks for new Tasks DEFAULT_UPDATE_PERIOD = 10 # seconds attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class alias store task_store alias storage task_store def_delegators :@task_store, :read, :update, :delete # Timeout is in seconds def initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) raise ArgumentError, ':name cannot be nil' unless name raise ArgumentError, ':task_class cannot be nil' unless task_class raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new raise ArgumentError, ':timeout cannot be negative' if timeout&.negative? @name = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym @task_class = task_class @task_store = store @max_attempts = max_attempts @timeout = timeout @update_period = update_period validate! freeze end # Constructs the next available task on the queue. # # @param logger [Logger] logger to provide to the constructed task handler # @param container [Object, nil] container to provide to the constructed task handler # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler # @return [LoggedTask, nil] A Task or nil if no task is found def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) = .find(&:runnable?) return nil unless task = Task.new(, task_handler(data: .data, container: container, logger: logger, scheduler: scheduler)) LoggedTask.new(task, logger: logger) end # Fetch a task matching the given identifier # # @param identifier [Hash] attributes to match # # @raise [NoSuchTaskError] when no task matches the identifier. # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific. def fetch_task(identifier) identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data] tasks = read(**identifier) raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty? if tasks.size > 1 raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }" end args = tasks.first.merge(queue: self) TaskMetaData.new(**args) end # Creates a task on the queue, saved using the Task Store strategy. # # @param run_at [Time] Earliest time to attempt running the task # @param expire_at [Time, nil] Time after which the task will not be attempted # @param data [Hash, String, Numeric, nil] The data to save # # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler. def create(run_at:, expire_at:, data:) if data.nil? && expects_data? raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay." end unless data.nil? || expects_data? raise MalformedTaskError, " found unexpected :data argument. Either do not provide :data when scheduling a task,\n or add this in the \#{ @task_class } class definition:\n attr_accessor :data\n ERROR\n end\n\n # TODO: shorten to using slice once updated to Ruby 2.5+\n attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n create_data = TaskMetaData.new(**attrs).to_h\n create_data.delete(:id)\n create_data.delete(:attempts)\n create_data.delete(:last_fail_at)\n create_data.delete(:last_error)\n @task_store.create(**create_data)\n end\n\n # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n def expects_data?\n @task_class.method_defined?(:data=)\n end\n\n private\n\n def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n handler = @task_class.new\n handler.data = data if handler.respond_to?(:data=)\n handler.container = container\n handler.logger = logger\n handler.scheduler = scheduler\n handler\n end\n\n def next_metas\n tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n end\n\n sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n end\n\n def sort_tasks(tasks)\n # TODO: improve this\n # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n # results\n # Ideally, we'd use a better algo than qsort for this, but this will do for now\n tasks.shuffle.sort_by(&:run_at)\n end\n\n # Internal queue validator\n module QueueValidation\n private\n\n def validate!\n verify_task_class!\n verify_task_store!\n end\n\n def verify_task_class!\n verify_run_method!\n verify_accessors!\n verify_hooks!\n end\n\n # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n # otherwise, you wouldn't know until task execution and that could be far in the future.\n # UX is important for devs, too.\n # - R\n def verify_run_method!\n unless @task_class.method_defined? :run\n raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n end\n\n return unless @task_class.instance_method(:run).arity.positive?\n\n raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n end\n\n def verify_accessors!\n [:logger, :container, :scheduler].each do |method_name|\n next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n raise MalformedTaskError, <<~ERR\n Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n attr_accessor :logger, :container, :scheduler\n ERR\n end\n end\n\n def verify_hooks!\n expected_arity = 1\n\n [:success, :fail, :final_fail].each do |method_name|\n next unless @task_class.method_defined?(method_name)\n next if @task_class.instance_method(method_name).arity == expected_arity\n\n err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n raise MalformedTaskError, err\n end\n end\n\n def verify_task_store!\n raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n [:read, :create, :update, :delete].each do |method|\n unless @task_store.respond_to? method\n raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n end\n end\n end\n end\n include QueueValidation\nend\n" |
#:timeout(: timeout) ⇒ Numeric (readonly)
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/procrastinator/queue.rb', line 18 class Queue extend Forwardable # Default number of seconds to wait for a task to complete DEFAULT_TIMEOUT = 3600 # in seconds; one hour total # Default number of times to retry a task DEFAULT_MAX_ATTEMPTS = 20 # Default amount of time between checks for new Tasks DEFAULT_UPDATE_PERIOD = 10 # seconds attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class alias store task_store alias storage task_store def_delegators :@task_store, :read, :update, :delete # Timeout is in seconds def initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) raise ArgumentError, ':name cannot be nil' unless name raise ArgumentError, ':task_class cannot be nil' unless task_class raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new raise ArgumentError, ':timeout cannot be negative' if timeout&.negative? @name = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym @task_class = task_class @task_store = store @max_attempts = max_attempts @timeout = timeout @update_period = update_period validate! freeze end # Constructs the next available task on the queue. # # @param logger [Logger] logger to provide to the constructed task handler # @param container [Object, nil] container to provide to the constructed task handler # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler # @return [LoggedTask, nil] A Task or nil if no task is found def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) = .find(&:runnable?) return nil unless task = Task.new(, task_handler(data: .data, container: container, logger: logger, scheduler: scheduler)) LoggedTask.new(task, logger: logger) end # Fetch a task matching the given identifier # # @param identifier [Hash] attributes to match # # @raise [NoSuchTaskError] when no task matches the identifier. # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific. def fetch_task(identifier) identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data] tasks = read(**identifier) raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty? if tasks.size > 1 raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }" end args = tasks.first.merge(queue: self) TaskMetaData.new(**args) end # Creates a task on the queue, saved using the Task Store strategy. # # @param run_at [Time] Earliest time to attempt running the task # @param expire_at [Time, nil] Time after which the task will not be attempted # @param data [Hash, String, Numeric, nil] The data to save # # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler. def create(run_at:, expire_at:, data:) if data.nil? && expects_data? raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay." end unless data.nil? || expects_data? raise MalformedTaskError, " found unexpected :data argument. Either do not provide :data when scheduling a task,\n or add this in the \#{ @task_class } class definition:\n attr_accessor :data\n ERROR\n end\n\n # TODO: shorten to using slice once updated to Ruby 2.5+\n attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n create_data = TaskMetaData.new(**attrs).to_h\n create_data.delete(:id)\n create_data.delete(:attempts)\n create_data.delete(:last_fail_at)\n create_data.delete(:last_error)\n @task_store.create(**create_data)\n end\n\n # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n def expects_data?\n @task_class.method_defined?(:data=)\n end\n\n private\n\n def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n handler = @task_class.new\n handler.data = data if handler.respond_to?(:data=)\n handler.container = container\n handler.logger = logger\n handler.scheduler = scheduler\n handler\n end\n\n def next_metas\n tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n end\n\n sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n end\n\n def sort_tasks(tasks)\n # TODO: improve this\n # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n # results\n # Ideally, we'd use a better algo than qsort for this, but this will do for now\n tasks.shuffle.sort_by(&:run_at)\n end\n\n # Internal queue validator\n module QueueValidation\n private\n\n def validate!\n verify_task_class!\n verify_task_store!\n end\n\n def verify_task_class!\n verify_run_method!\n verify_accessors!\n verify_hooks!\n end\n\n # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n # otherwise, you wouldn't know until task execution and that could be far in the future.\n # UX is important for devs, too.\n # - R\n def verify_run_method!\n unless @task_class.method_defined? :run\n raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n end\n\n return unless @task_class.instance_method(:run).arity.positive?\n\n raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n end\n\n def verify_accessors!\n [:logger, :container, :scheduler].each do |method_name|\n next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n raise MalformedTaskError, <<~ERR\n Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n attr_accessor :logger, :container, :scheduler\n ERR\n end\n end\n\n def verify_hooks!\n expected_arity = 1\n\n [:success, :fail, :final_fail].each do |method_name|\n next unless @task_class.method_defined?(method_name)\n next if @task_class.instance_method(method_name).arity == expected_arity\n\n err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n raise MalformedTaskError, err\n end\n end\n\n def verify_task_store!\n raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n [:read, :create, :update, :delete].each do |method|\n unless @task_store.respond_to? method\n raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n end\n end\n end\n end\n include QueueValidation\nend\n" |
#:update_period(: update_period) ⇒ Numeric (readonly)
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/procrastinator/queue.rb', line 18 class Queue extend Forwardable # Default number of seconds to wait for a task to complete DEFAULT_TIMEOUT = 3600 # in seconds; one hour total # Default number of times to retry a task DEFAULT_MAX_ATTEMPTS = 20 # Default amount of time between checks for new Tasks DEFAULT_UPDATE_PERIOD = 10 # seconds attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class alias store task_store alias storage task_store def_delegators :@task_store, :read, :update, :delete # Timeout is in seconds def initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) raise ArgumentError, ':name cannot be nil' unless name raise ArgumentError, ':task_class cannot be nil' unless task_class raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new raise ArgumentError, ':timeout cannot be negative' if timeout&.negative? @name = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym @task_class = task_class @task_store = store @max_attempts = max_attempts @timeout = timeout @update_period = update_period validate! freeze end # Constructs the next available task on the queue. # # @param logger [Logger] logger to provide to the constructed task handler # @param container [Object, nil] container to provide to the constructed task handler # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler # @return [LoggedTask, nil] A Task or nil if no task is found def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) = .find(&:runnable?) return nil unless task = Task.new(, task_handler(data: .data, container: container, logger: logger, scheduler: scheduler)) LoggedTask.new(task, logger: logger) end # Fetch a task matching the given identifier # # @param identifier [Hash] attributes to match # # @raise [NoSuchTaskError] when no task matches the identifier. # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific. def fetch_task(identifier) identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data] tasks = read(**identifier) raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty? if tasks.size > 1 raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }" end args = tasks.first.merge(queue: self) TaskMetaData.new(**args) end # Creates a task on the queue, saved using the Task Store strategy. # # @param run_at [Time] Earliest time to attempt running the task # @param expire_at [Time, nil] Time after which the task will not be attempted # @param data [Hash, String, Numeric, nil] The data to save # # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler. def create(run_at:, expire_at:, data:) if data.nil? && expects_data? raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay." end unless data.nil? || expects_data? raise MalformedTaskError, " found unexpected :data argument. Either do not provide :data when scheduling a task,\n or add this in the \#{ @task_class } class definition:\n attr_accessor :data\n ERROR\n end\n\n # TODO: shorten to using slice once updated to Ruby 2.5+\n attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n create_data = TaskMetaData.new(**attrs).to_h\n create_data.delete(:id)\n create_data.delete(:attempts)\n create_data.delete(:last_fail_at)\n create_data.delete(:last_error)\n @task_store.create(**create_data)\n end\n\n # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n def expects_data?\n @task_class.method_defined?(:data=)\n end\n\n private\n\n def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n handler = @task_class.new\n handler.data = data if handler.respond_to?(:data=)\n handler.container = container\n handler.logger = logger\n handler.scheduler = scheduler\n handler\n end\n\n def next_metas\n tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n end\n\n sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n end\n\n def sort_tasks(tasks)\n # TODO: improve this\n # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n # results\n # Ideally, we'd use a better algo than qsort for this, but this will do for now\n tasks.shuffle.sort_by(&:run_at)\n end\n\n # Internal queue validator\n module QueueValidation\n private\n\n def validate!\n verify_task_class!\n verify_task_store!\n end\n\n def verify_task_class!\n verify_run_method!\n verify_accessors!\n verify_hooks!\n end\n\n # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n # otherwise, you wouldn't know until task execution and that could be far in the future.\n # UX is important for devs, too.\n # - R\n def verify_run_method!\n unless @task_class.method_defined? :run\n raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n end\n\n return unless @task_class.instance_method(:run).arity.positive?\n\n raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n end\n\n def verify_accessors!\n [:logger, :container, :scheduler].each do |method_name|\n next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n raise MalformedTaskError, <<~ERR\n Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n attr_accessor :logger, :container, :scheduler\n ERR\n end\n end\n\n def verify_hooks!\n expected_arity = 1\n\n [:success, :fail, :final_fail].each do |method_name|\n next unless @task_class.method_defined?(method_name)\n next if @task_class.instance_method(method_name).arity == expected_arity\n\n err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n raise MalformedTaskError, err\n end\n end\n\n def verify_task_store!\n raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n [:read, :create, :update, :delete].each do |method|\n unless @task_store.respond_to? method\n raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n end\n end\n end\n end\n include QueueValidation\nend\n" |
#max_attempts ⇒ Object (readonly)
seconds
30 31 32 |
# File 'lib/procrastinator/queue.rb', line 30 def max_attempts @max_attempts end |
#name ⇒ Object (readonly)
seconds
30 31 32 |
# File 'lib/procrastinator/queue.rb', line 30 def name @name end |
#task_class ⇒ Object (readonly)
seconds
30 31 32 |
# File 'lib/procrastinator/queue.rb', line 30 def task_class @task_class end |
#task_store ⇒ Object (readonly) Also known as: store, storage
seconds
30 31 32 |
# File 'lib/procrastinator/queue.rb', line 30 def task_store @task_store end |
#timeout ⇒ Object (readonly)
seconds
30 31 32 |
# File 'lib/procrastinator/queue.rb', line 30 def timeout @timeout end |
#update_period ⇒ Object (readonly)
seconds
30 31 32 |
# File 'lib/procrastinator/queue.rb', line 30 def update_period @update_period end |
Instance Method Details
#create(run_at:, expire_at:, data:) ⇒ Object
Creates a task on the queue, saved using the Task Store strategy.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/procrastinator/queue.rb', line 110 def create(run_at:, expire_at:, data:) if data.nil? && expects_data? raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay." end unless data.nil? || expects_data? raise MalformedTaskError, " found unexpected :data argument. Either do not provide :data when scheduling a task,\n or add this in the \#{ @task_class } class definition:\n attr_accessor :data\n ERROR\n end\n\n # TODO: shorten to using slice once updated to Ruby 2.5+\n attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n create_data = TaskMetaData.new(**attrs).to_h\n create_data.delete(:id)\n create_data.delete(:attempts)\n create_data.delete(:last_fail_at)\n create_data.delete(:last_error)\n @task_store.create(**create_data)\nend\n" |
#expects_data? ⇒ Boolean
135 136 137 |
# File 'lib/procrastinator/queue.rb', line 135 def expects_data? @task_class.method_defined?(:data=) end |
#fetch_task(identifier) ⇒ Object
Fetch a task matching the given identifier
87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/procrastinator/queue.rb', line 87 def fetch_task(identifier) identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data] tasks = read(**identifier) raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty? if tasks.size > 1 raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }" end args = tasks.first.merge(queue: self) TaskMetaData.new(**args) end |
#next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) ⇒ LoggedTask?
Constructs the next available task on the queue.
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/procrastinator/queue.rb', line 68 def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) = .find(&:runnable?) return nil unless task = Task.new(, task_handler(data: .data, container: container, logger: logger, scheduler: scheduler)) LoggedTask.new(task, logger: logger) end |