Class: Procrastinator::Queue

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
QueueValidation
Defined in:
lib/procrastinator/queue.rb

Overview

A Queue defines how a certain type task will be processed.

Author:

  • Robin Miller

Defined Under Namespace

Modules: QueueValidation

Constant Summary collapse

DEFAULT_TIMEOUT =

Default number of seconds to wait for a task to complete

3600
DEFAULT_MAX_ATTEMPTS =

Default number of times to retry a task

20
DEFAULT_UPDATE_PERIOD =

Default amount of time between checks for new Tasks

10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) ⇒ Queue

Timeout is in seconds

Raises:

  • (ArgumentError)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/procrastinator/queue.rb', line 38

def initialize(name:, task_class:,
               max_attempts: DEFAULT_MAX_ATTEMPTS,
               timeout: DEFAULT_TIMEOUT,
               update_period: DEFAULT_UPDATE_PERIOD,
               store: TaskStore::SimpleCommaStore.new)
   raise ArgumentError, ':name cannot be nil' unless name

   raise ArgumentError, ':task_class cannot be nil' unless task_class
   raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

   raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

   @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
   @task_class    = task_class
   @task_store    = store
   @max_attempts  = max_attempts
   @timeout       = timeout
   @update_period = update_period

   validate!

   freeze
end

Instance Attribute Details

#:max_attempts(: max_attempts) ⇒ Integer (readonly)



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      .new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, "            found unexpected :data argument. Either do not provide :data when scheduling a task,\n            or add this in the \#{ @task_class } class definition:\n                  attr_accessor :data\n         ERROR\n      end\n\n      # TODO: shorten to using slice once updated to Ruby 2.5+\n      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n      create_data = TaskMetaData.new(**attrs).to_h\n      create_data.delete(:id)\n      create_data.delete(:attempts)\n      create_data.delete(:last_fail_at)\n      create_data.delete(:last_error)\n      @task_store.create(**create_data)\n   end\n\n   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n   def expects_data?\n      @task_class.method_defined?(:data=)\n   end\n\n   private\n\n   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n      handler           = @task_class.new\n      handler.data      = data if handler.respond_to?(:data=)\n      handler.container = container\n      handler.logger    = logger\n      handler.scheduler = scheduler\n      handler\n   end\n\n   def next_metas\n      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n      end\n\n      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n   end\n\n   def sort_tasks(tasks)\n      # TODO: improve this\n      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n      # results\n      # Ideally, we'd use a better algo than qsort for this, but this will do for now\n      tasks.shuffle.sort_by(&:run_at)\n   end\n\n   # Internal queue validator\n   module QueueValidation\n      private\n\n      def validate!\n         verify_task_class!\n         verify_task_store!\n      end\n\n      def verify_task_class!\n         verify_run_method!\n         verify_accessors!\n         verify_hooks!\n      end\n\n      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n      # otherwise, you wouldn't know until task execution and that could be far in the future.\n      # UX is important for devs, too.\n      #    - R\n      def verify_run_method!\n         unless @task_class.method_defined? :run\n            raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n         end\n\n         return unless @task_class.instance_method(:run).arity.positive?\n\n         raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n      end\n\n      def verify_accessors!\n         [:logger, :container, :scheduler].each do |method_name|\n            next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n            raise MalformedTaskError, <<~ERR\n               Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n                  attr_accessor :logger, :container, :scheduler\n            ERR\n         end\n      end\n\n      def verify_hooks!\n         expected_arity = 1\n\n         [:success, :fail, :final_fail].each do |method_name|\n            next unless @task_class.method_defined?(method_name)\n            next if @task_class.instance_method(method_name).arity == expected_arity\n\n            err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n            raise MalformedTaskError, err\n         end\n      end\n\n      def verify_task_store!\n         raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n         [:read, :create, :update, :delete].each do |method|\n            unless @task_store.respond_to? method\n               raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n            end\n         end\n      end\n   end\n   include QueueValidation\nend\n"

#:name(: name) ⇒ Symbol (readonly)



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      .new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, "            found unexpected :data argument. Either do not provide :data when scheduling a task,\n            or add this in the \#{ @task_class } class definition:\n                  attr_accessor :data\n         ERROR\n      end\n\n      # TODO: shorten to using slice once updated to Ruby 2.5+\n      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n      create_data = TaskMetaData.new(**attrs).to_h\n      create_data.delete(:id)\n      create_data.delete(:attempts)\n      create_data.delete(:last_fail_at)\n      create_data.delete(:last_error)\n      @task_store.create(**create_data)\n   end\n\n   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n   def expects_data?\n      @task_class.method_defined?(:data=)\n   end\n\n   private\n\n   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n      handler           = @task_class.new\n      handler.data      = data if handler.respond_to?(:data=)\n      handler.container = container\n      handler.logger    = logger\n      handler.scheduler = scheduler\n      handler\n   end\n\n   def next_metas\n      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n      end\n\n      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n   end\n\n   def sort_tasks(tasks)\n      # TODO: improve this\n      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n      # results\n      # Ideally, we'd use a better algo than qsort for this, but this will do for now\n      tasks.shuffle.sort_by(&:run_at)\n   end\n\n   # Internal queue validator\n   module QueueValidation\n      private\n\n      def validate!\n         verify_task_class!\n         verify_task_store!\n      end\n\n      def verify_task_class!\n         verify_run_method!\n         verify_accessors!\n         verify_hooks!\n      end\n\n      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n      # otherwise, you wouldn't know until task execution and that could be far in the future.\n      # UX is important for devs, too.\n      #    - R\n      def verify_run_method!\n         unless @task_class.method_defined? :run\n            raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n         end\n\n         return unless @task_class.instance_method(:run).arity.positive?\n\n         raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n      end\n\n      def verify_accessors!\n         [:logger, :container, :scheduler].each do |method_name|\n            next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n            raise MalformedTaskError, <<~ERR\n               Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n                  attr_accessor :logger, :container, :scheduler\n            ERR\n         end\n      end\n\n      def verify_hooks!\n         expected_arity = 1\n\n         [:success, :fail, :final_fail].each do |method_name|\n            next unless @task_class.method_defined?(method_name)\n            next if @task_class.instance_method(method_name).arity == expected_arity\n\n            err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n            raise MalformedTaskError, err\n         end\n      end\n\n      def verify_task_store!\n         raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n         [:read, :create, :update, :delete].each do |method|\n            unless @task_store.respond_to? method\n               raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n            end\n         end\n      end\n   end\n   include QueueValidation\nend\n"

#:task_class(: task_class) ⇒ Class (readonly)



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      .new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, "            found unexpected :data argument. Either do not provide :data when scheduling a task,\n            or add this in the \#{ @task_class } class definition:\n                  attr_accessor :data\n         ERROR\n      end\n\n      # TODO: shorten to using slice once updated to Ruby 2.5+\n      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n      create_data = TaskMetaData.new(**attrs).to_h\n      create_data.delete(:id)\n      create_data.delete(:attempts)\n      create_data.delete(:last_fail_at)\n      create_data.delete(:last_error)\n      @task_store.create(**create_data)\n   end\n\n   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n   def expects_data?\n      @task_class.method_defined?(:data=)\n   end\n\n   private\n\n   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n      handler           = @task_class.new\n      handler.data      = data if handler.respond_to?(:data=)\n      handler.container = container\n      handler.logger    = logger\n      handler.scheduler = scheduler\n      handler\n   end\n\n   def next_metas\n      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n      end\n\n      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n   end\n\n   def sort_tasks(tasks)\n      # TODO: improve this\n      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n      # results\n      # Ideally, we'd use a better algo than qsort for this, but this will do for now\n      tasks.shuffle.sort_by(&:run_at)\n   end\n\n   # Internal queue validator\n   module QueueValidation\n      private\n\n      def validate!\n         verify_task_class!\n         verify_task_store!\n      end\n\n      def verify_task_class!\n         verify_run_method!\n         verify_accessors!\n         verify_hooks!\n      end\n\n      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n      # otherwise, you wouldn't know until task execution and that could be far in the future.\n      # UX is important for devs, too.\n      #    - R\n      def verify_run_method!\n         unless @task_class.method_defined? :run\n            raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n         end\n\n         return unless @task_class.instance_method(:run).arity.positive?\n\n         raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n      end\n\n      def verify_accessors!\n         [:logger, :container, :scheduler].each do |method_name|\n            next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n            raise MalformedTaskError, <<~ERR\n               Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n                  attr_accessor :logger, :container, :scheduler\n            ERR\n         end\n      end\n\n      def verify_hooks!\n         expected_arity = 1\n\n         [:success, :fail, :final_fail].each do |method_name|\n            next unless @task_class.method_defined?(method_name)\n            next if @task_class.instance_method(method_name).arity == expected_arity\n\n            err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n            raise MalformedTaskError, err\n         end\n      end\n\n      def verify_task_store!\n         raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n         [:read, :create, :update, :delete].each do |method|\n            unless @task_store.respond_to? method\n               raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n            end\n         end\n      end\n   end\n   include QueueValidation\nend\n"

#:timeout(: timeout) ⇒ Numeric (readonly)



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      .new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, "            found unexpected :data argument. Either do not provide :data when scheduling a task,\n            or add this in the \#{ @task_class } class definition:\n                  attr_accessor :data\n         ERROR\n      end\n\n      # TODO: shorten to using slice once updated to Ruby 2.5+\n      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n      create_data = TaskMetaData.new(**attrs).to_h\n      create_data.delete(:id)\n      create_data.delete(:attempts)\n      create_data.delete(:last_fail_at)\n      create_data.delete(:last_error)\n      @task_store.create(**create_data)\n   end\n\n   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n   def expects_data?\n      @task_class.method_defined?(:data=)\n   end\n\n   private\n\n   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n      handler           = @task_class.new\n      handler.data      = data if handler.respond_to?(:data=)\n      handler.container = container\n      handler.logger    = logger\n      handler.scheduler = scheduler\n      handler\n   end\n\n   def next_metas\n      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n      end\n\n      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n   end\n\n   def sort_tasks(tasks)\n      # TODO: improve this\n      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n      # results\n      # Ideally, we'd use a better algo than qsort for this, but this will do for now\n      tasks.shuffle.sort_by(&:run_at)\n   end\n\n   # Internal queue validator\n   module QueueValidation\n      private\n\n      def validate!\n         verify_task_class!\n         verify_task_store!\n      end\n\n      def verify_task_class!\n         verify_run_method!\n         verify_accessors!\n         verify_hooks!\n      end\n\n      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n      # otherwise, you wouldn't know until task execution and that could be far in the future.\n      # UX is important for devs, too.\n      #    - R\n      def verify_run_method!\n         unless @task_class.method_defined? :run\n            raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n         end\n\n         return unless @task_class.instance_method(:run).arity.positive?\n\n         raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n      end\n\n      def verify_accessors!\n         [:logger, :container, :scheduler].each do |method_name|\n            next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n            raise MalformedTaskError, <<~ERR\n               Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n                  attr_accessor :logger, :container, :scheduler\n            ERR\n         end\n      end\n\n      def verify_hooks!\n         expected_arity = 1\n\n         [:success, :fail, :final_fail].each do |method_name|\n            next unless @task_class.method_defined?(method_name)\n            next if @task_class.instance_method(method_name).arity == expected_arity\n\n            err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n            raise MalformedTaskError, err\n         end\n      end\n\n      def verify_task_store!\n         raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n         [:read, :create, :update, :delete].each do |method|\n            unless @task_store.respond_to? method\n               raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n            end\n         end\n      end\n   end\n   include QueueValidation\nend\n"

#:update_period(: update_period) ⇒ Numeric (readonly)



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      .new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, "            found unexpected :data argument. Either do not provide :data when scheduling a task,\n            or add this in the \#{ @task_class } class definition:\n                  attr_accessor :data\n         ERROR\n      end\n\n      # TODO: shorten to using slice once updated to Ruby 2.5+\n      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n      create_data = TaskMetaData.new(**attrs).to_h\n      create_data.delete(:id)\n      create_data.delete(:attempts)\n      create_data.delete(:last_fail_at)\n      create_data.delete(:last_error)\n      @task_store.create(**create_data)\n   end\n\n   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute\n   def expects_data?\n      @task_class.method_defined?(:data=)\n   end\n\n   private\n\n   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)\n      handler           = @task_class.new\n      handler.data      = data if handler.respond_to?(:data=)\n      handler.container = container\n      handler.logger    = logger\n      handler.scheduler = scheduler\n      handler\n   end\n\n   def next_metas\n      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|\n         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)\n      end\n\n      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })\n   end\n\n   def sort_tasks(tasks)\n      # TODO: improve this\n      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data\n      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted\n      # results\n      # Ideally, we'd use a better algo than qsort for this, but this will do for now\n      tasks.shuffle.sort_by(&:run_at)\n   end\n\n   # Internal queue validator\n   module QueueValidation\n      private\n\n      def validate!\n         verify_task_class!\n         verify_task_store!\n      end\n\n      def verify_task_class!\n         verify_run_method!\n         verify_accessors!\n         verify_hooks!\n      end\n\n      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;\n      # otherwise, you wouldn't know until task execution and that could be far in the future.\n      # UX is important for devs, too.\n      #    - R\n      def verify_run_method!\n         unless @task_class.method_defined? :run\n            raise MalformedTaskError, \"task \#{ @task_class } does not support #run method\"\n         end\n\n         return unless @task_class.instance_method(:run).arity.positive?\n\n         raise MalformedTaskError, \"task \#{ @task_class } cannot require parameters to its #run method\"\n      end\n\n      def verify_accessors!\n         [:logger, :container, :scheduler].each do |method_name|\n            next if @task_class.method_defined?(method_name) && @task_class.method_defined?(\"\#{ method_name }=\")\n\n            raise MalformedTaskError, <<~ERR\n               Task handler is missing a \#{ method_name } accessor. Add this to the \#{ @task_class } class definition:\n                  attr_accessor :logger, :container, :scheduler\n            ERR\n         end\n      end\n\n      def verify_hooks!\n         expected_arity = 1\n\n         [:success, :fail, :final_fail].each do |method_name|\n            next unless @task_class.method_defined?(method_name)\n            next if @task_class.instance_method(method_name).arity == expected_arity\n\n            err = \"task \#{ @task_class } must accept \#{ expected_arity } parameter to its #\#{ method_name } method\"\n\n            raise MalformedTaskError, err\n         end\n      end\n\n      def verify_task_store!\n         raise ArgumentError, ':store cannot be nil' if @task_store.nil?\n\n         [:read, :create, :update, :delete].each do |method|\n            unless @task_store.respond_to? method\n               raise MalformedTaskStoreError, \"task store \#{ @task_store.class } must respond to #\#{ method }\"\n            end\n         end\n      end\n   end\n   include QueueValidation\nend\n"

#max_attemptsObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def max_attempts
  @max_attempts
end

#nameObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def name
  @name
end

#task_classObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def task_class
  @task_class
end

#task_storeObject (readonly) Also known as: store, storage

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def task_store
  @task_store
end

#timeoutObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def timeout
  @timeout
end

#update_periodObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def update_period
  @update_period
end

Instance Method Details

#create(run_at:, expire_at:, data:) ⇒ Object

Creates a task on the queue, saved using the Task Store strategy.

Raises:

  • (ArgumentError)

    when the keyword ‘:data` is needed by the task handler, but is missing

  • (MalformedTaskError)

    when the keyword ‘:data` is provided but not expected by the task handler.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/procrastinator/queue.rb', line 110

def create(run_at:, expire_at:, data:)
   if data.nil? && expects_data?
      raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
   end

   unless data.nil? || expects_data?
      raise MalformedTaskError, "         found unexpected :data argument. Either do not provide :data when scheduling a task,\n         or add this in the \#{ @task_class } class definition:\n               attr_accessor :data\n      ERROR\n   end\n\n   # TODO: shorten to using slice once updated to Ruby 2.5+\n   attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}\n\n   create_data = TaskMetaData.new(**attrs).to_h\n   create_data.delete(:id)\n   create_data.delete(:attempts)\n   create_data.delete(:last_fail_at)\n   create_data.delete(:last_error)\n   @task_store.create(**create_data)\nend\n"

#expects_data?Boolean



135
136
137
# File 'lib/procrastinator/queue.rb', line 135

def expects_data?
   @task_class.method_defined?(:data=)
end

#fetch_task(identifier) ⇒ Object

Fetch a task matching the given identifier

Raises:



87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/procrastinator/queue.rb', line 87

def fetch_task(identifier)
   identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

   tasks = read(**identifier)

   raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
   if tasks.size > 1
      raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
   end

   args = tasks.first.merge(queue: self)

   .new(**args)
end

#next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) ⇒ LoggedTask?

Constructs the next available task on the queue.



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/procrastinator/queue.rb', line 68

def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
    = next_metas.find(&:runnable?)

   return nil unless 

   task = Task.new(, task_handler(data:      .data,
                                          container: container,
                                          logger:    logger,
                                          scheduler: scheduler))

   LoggedTask.new(task, logger: logger)
end