Class: Procrastinator::Queue

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
QueueValidation
Defined in:
lib/procrastinator/queue.rb

Overview

A Queue defines how a certain type task will be processed.

Author:

  • Robin Miller

Defined Under Namespace

Modules: QueueValidation

Constant Summary collapse

DEFAULT_TIMEOUT =

Default number of seconds to wait for a task to complete

3600
DEFAULT_MAX_ATTEMPTS =

Default number of times to retry a task

20
DEFAULT_UPDATE_PERIOD =

Default amount of time between checks for new Tasks

10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, task_class:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, store: TaskStore::SimpleCommaStore.new) ⇒ Queue

Timeout is in seconds

Raises:

  • (ArgumentError)


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/procrastinator/queue.rb', line 38

def initialize(name:, task_class:,
               max_attempts: DEFAULT_MAX_ATTEMPTS,
               timeout: DEFAULT_TIMEOUT,
               update_period: DEFAULT_UPDATE_PERIOD,
               store: TaskStore::SimpleCommaStore.new)
   raise ArgumentError, ':name cannot be nil' unless name

   raise ArgumentError, ':task_class cannot be nil' unless task_class
   raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

   raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

   @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
   @task_class    = task_class
   @task_store    = store
   @max_attempts  = max_attempts
   @timeout       = timeout
   @update_period = update_period

   validate!

   freeze
end

Instance Attribute Details

#:max_attempts(: max_attempts) ⇒ Integer (readonly)

Returns Maximum number of attempts for tasks in this queue.

Returns:

  • (Integer)

    Maximum number of attempts for tasks in this queue.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      TaskMetaData.new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, <<~ERROR
            found unexpected :data argument. Either do not provide :data when scheduling a task,
            or add this in the #{ @task_class } class definition:
                  attr_accessor :data
         ERROR
      end

      # TODO: shorten to using slice once updated to Ruby 2.5+
      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}

      create_data = TaskMetaData.new(**attrs).to_h
      create_data.delete(:id)
      create_data.delete(:attempts)
      create_data.delete(:last_fail_at)
      create_data.delete(:last_error)
      @task_store.create(**create_data)
   end

   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute
   def expects_data?
      @task_class.method_defined?(:data=)
   end

   private

   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)
      handler           = @task_class.new
      handler.data      = data if handler.respond_to?(:data=)
      handler.container = container
      handler.logger    = logger
      handler.scheduler = scheduler
      handler
   end

   def next_metas
      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|
         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)
      end

      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })
   end

   def sort_tasks(tasks)
      # TODO: improve this
      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data
      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted
      # results
      # Ideally, we'd use a better algo than qsort for this, but this will do for now
      tasks.shuffle.sort_by(&:run_at)
   end

   # Internal queue validator
   module QueueValidation
      private

      def validate!
         verify_task_class!
         verify_task_store!
      end

      def verify_task_class!
         verify_run_method!
         verify_accessors!
         verify_hooks!
      end

      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;
      # otherwise, you wouldn't know until task execution and that could be far in the future.
      # UX is important for devs, too.
      #    - R
      def verify_run_method!
         unless @task_class.method_defined? :run
            raise MalformedTaskError, "task #{ @task_class } does not support #run method"
         end

         return unless @task_class.instance_method(:run).arity.positive?

         raise MalformedTaskError, "task #{ @task_class } cannot require parameters to its #run method"
      end

      def verify_accessors!
         [:logger, :container, :scheduler].each do |method_name|
            next if @task_class.method_defined?(method_name) && @task_class.method_defined?("#{ method_name }=")

            raise MalformedTaskError, <<~ERR
               Task handler is missing a #{ method_name } accessor. Add this to the #{ @task_class } class definition:
                  attr_accessor :logger, :container, :scheduler
            ERR
         end
      end

      def verify_hooks!
         expected_arity = 1

         [:success, :fail, :final_fail].each do |method_name|
            next unless @task_class.method_defined?(method_name)
            next if @task_class.instance_method(method_name).arity == expected_arity

            err = "task #{ @task_class } must accept #{ expected_arity } parameter to its ##{ method_name } method"

            raise MalformedTaskError, err
         end
      end

      def verify_task_store!
         raise ArgumentError, ':store cannot be nil' if @task_store.nil?

         [:read, :create, :update, :delete].each do |method|
            unless @task_store.respond_to? method
               raise MalformedTaskStoreError, "task store #{ @task_store.class } must respond to ##{ method }"
            end
         end
      end
   end
   include QueueValidation
end

#:name(: name) ⇒ Symbol (readonly)

Returns The queue’s identifier symbol.

Returns:

  • (Symbol)

    The queue’s identifier symbol



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      TaskMetaData.new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, <<~ERROR
            found unexpected :data argument. Either do not provide :data when scheduling a task,
            or add this in the #{ @task_class } class definition:
                  attr_accessor :data
         ERROR
      end

      # TODO: shorten to using slice once updated to Ruby 2.5+
      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}

      create_data = TaskMetaData.new(**attrs).to_h
      create_data.delete(:id)
      create_data.delete(:attempts)
      create_data.delete(:last_fail_at)
      create_data.delete(:last_error)
      @task_store.create(**create_data)
   end

   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute
   def expects_data?
      @task_class.method_defined?(:data=)
   end

   private

   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)
      handler           = @task_class.new
      handler.data      = data if handler.respond_to?(:data=)
      handler.container = container
      handler.logger    = logger
      handler.scheduler = scheduler
      handler
   end

   def next_metas
      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|
         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)
      end

      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })
   end

   def sort_tasks(tasks)
      # TODO: improve this
      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data
      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted
      # results
      # Ideally, we'd use a better algo than qsort for this, but this will do for now
      tasks.shuffle.sort_by(&:run_at)
   end

   # Internal queue validator
   module QueueValidation
      private

      def validate!
         verify_task_class!
         verify_task_store!
      end

      def verify_task_class!
         verify_run_method!
         verify_accessors!
         verify_hooks!
      end

      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;
      # otherwise, you wouldn't know until task execution and that could be far in the future.
      # UX is important for devs, too.
      #    - R
      def verify_run_method!
         unless @task_class.method_defined? :run
            raise MalformedTaskError, "task #{ @task_class } does not support #run method"
         end

         return unless @task_class.instance_method(:run).arity.positive?

         raise MalformedTaskError, "task #{ @task_class } cannot require parameters to its #run method"
      end

      def verify_accessors!
         [:logger, :container, :scheduler].each do |method_name|
            next if @task_class.method_defined?(method_name) && @task_class.method_defined?("#{ method_name }=")

            raise MalformedTaskError, <<~ERR
               Task handler is missing a #{ method_name } accessor. Add this to the #{ @task_class } class definition:
                  attr_accessor :logger, :container, :scheduler
            ERR
         end
      end

      def verify_hooks!
         expected_arity = 1

         [:success, :fail, :final_fail].each do |method_name|
            next unless @task_class.method_defined?(method_name)
            next if @task_class.instance_method(method_name).arity == expected_arity

            err = "task #{ @task_class } must accept #{ expected_arity } parameter to its ##{ method_name } method"

            raise MalformedTaskError, err
         end
      end

      def verify_task_store!
         raise ArgumentError, ':store cannot be nil' if @task_store.nil?

         [:read, :create, :update, :delete].each do |method|
            unless @task_store.respond_to? method
               raise MalformedTaskStoreError, "task store #{ @task_store.class } must respond to ##{ method }"
            end
         end
      end
   end
   include QueueValidation
end

#:task_class(: task_class) ⇒ Class (readonly)

Returns Class that defines the work to be done for jobs in this queue.

Returns:

  • (Class)

    Class that defines the work to be done for jobs in this queue.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      TaskMetaData.new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, <<~ERROR
            found unexpected :data argument. Either do not provide :data when scheduling a task,
            or add this in the #{ @task_class } class definition:
                  attr_accessor :data
         ERROR
      end

      # TODO: shorten to using slice once updated to Ruby 2.5+
      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}

      create_data = TaskMetaData.new(**attrs).to_h
      create_data.delete(:id)
      create_data.delete(:attempts)
      create_data.delete(:last_fail_at)
      create_data.delete(:last_error)
      @task_store.create(**create_data)
   end

   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute
   def expects_data?
      @task_class.method_defined?(:data=)
   end

   private

   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)
      handler           = @task_class.new
      handler.data      = data if handler.respond_to?(:data=)
      handler.container = container
      handler.logger    = logger
      handler.scheduler = scheduler
      handler
   end

   def next_metas
      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|
         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)
      end

      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })
   end

   def sort_tasks(tasks)
      # TODO: improve this
      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data
      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted
      # results
      # Ideally, we'd use a better algo than qsort for this, but this will do for now
      tasks.shuffle.sort_by(&:run_at)
   end

   # Internal queue validator
   module QueueValidation
      private

      def validate!
         verify_task_class!
         verify_task_store!
      end

      def verify_task_class!
         verify_run_method!
         verify_accessors!
         verify_hooks!
      end

      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;
      # otherwise, you wouldn't know until task execution and that could be far in the future.
      # UX is important for devs, too.
      #    - R
      def verify_run_method!
         unless @task_class.method_defined? :run
            raise MalformedTaskError, "task #{ @task_class } does not support #run method"
         end

         return unless @task_class.instance_method(:run).arity.positive?

         raise MalformedTaskError, "task #{ @task_class } cannot require parameters to its #run method"
      end

      def verify_accessors!
         [:logger, :container, :scheduler].each do |method_name|
            next if @task_class.method_defined?(method_name) && @task_class.method_defined?("#{ method_name }=")

            raise MalformedTaskError, <<~ERR
               Task handler is missing a #{ method_name } accessor. Add this to the #{ @task_class } class definition:
                  attr_accessor :logger, :container, :scheduler
            ERR
         end
      end

      def verify_hooks!
         expected_arity = 1

         [:success, :fail, :final_fail].each do |method_name|
            next unless @task_class.method_defined?(method_name)
            next if @task_class.instance_method(method_name).arity == expected_arity

            err = "task #{ @task_class } must accept #{ expected_arity } parameter to its ##{ method_name } method"

            raise MalformedTaskError, err
         end
      end

      def verify_task_store!
         raise ArgumentError, ':store cannot be nil' if @task_store.nil?

         [:read, :create, :update, :delete].each do |method|
            unless @task_store.respond_to? method
               raise MalformedTaskStoreError, "task store #{ @task_store.class } must respond to ##{ method }"
            end
         end
      end
   end
   include QueueValidation
end

#:timeout(: timeout) ⇒ Numeric (readonly)

Returns Duration (seconds) after which tasks in this queue should fail for taking too long.

Returns:

  • (Numeric)

    Duration (seconds) after which tasks in this queue should fail for taking too long.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      TaskMetaData.new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, <<~ERROR
            found unexpected :data argument. Either do not provide :data when scheduling a task,
            or add this in the #{ @task_class } class definition:
                  attr_accessor :data
         ERROR
      end

      # TODO: shorten to using slice once updated to Ruby 2.5+
      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}

      create_data = TaskMetaData.new(**attrs).to_h
      create_data.delete(:id)
      create_data.delete(:attempts)
      create_data.delete(:last_fail_at)
      create_data.delete(:last_error)
      @task_store.create(**create_data)
   end

   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute
   def expects_data?
      @task_class.method_defined?(:data=)
   end

   private

   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)
      handler           = @task_class.new
      handler.data      = data if handler.respond_to?(:data=)
      handler.container = container
      handler.logger    = logger
      handler.scheduler = scheduler
      handler
   end

   def next_metas
      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|
         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)
      end

      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })
   end

   def sort_tasks(tasks)
      # TODO: improve this
      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data
      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted
      # results
      # Ideally, we'd use a better algo than qsort for this, but this will do for now
      tasks.shuffle.sort_by(&:run_at)
   end

   # Internal queue validator
   module QueueValidation
      private

      def validate!
         verify_task_class!
         verify_task_store!
      end

      def verify_task_class!
         verify_run_method!
         verify_accessors!
         verify_hooks!
      end

      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;
      # otherwise, you wouldn't know until task execution and that could be far in the future.
      # UX is important for devs, too.
      #    - R
      def verify_run_method!
         unless @task_class.method_defined? :run
            raise MalformedTaskError, "task #{ @task_class } does not support #run method"
         end

         return unless @task_class.instance_method(:run).arity.positive?

         raise MalformedTaskError, "task #{ @task_class } cannot require parameters to its #run method"
      end

      def verify_accessors!
         [:logger, :container, :scheduler].each do |method_name|
            next if @task_class.method_defined?(method_name) && @task_class.method_defined?("#{ method_name }=")

            raise MalformedTaskError, <<~ERR
               Task handler is missing a #{ method_name } accessor. Add this to the #{ @task_class } class definition:
                  attr_accessor :logger, :container, :scheduler
            ERR
         end
      end

      def verify_hooks!
         expected_arity = 1

         [:success, :fail, :final_fail].each do |method_name|
            next unless @task_class.method_defined?(method_name)
            next if @task_class.instance_method(method_name).arity == expected_arity

            err = "task #{ @task_class } must accept #{ expected_arity } parameter to its ##{ method_name } method"

            raise MalformedTaskError, err
         end
      end

      def verify_task_store!
         raise ArgumentError, ':store cannot be nil' if @task_store.nil?

         [:read, :create, :update, :delete].each do |method|
            unless @task_store.respond_to? method
               raise MalformedTaskStoreError, "task store #{ @task_store.class } must respond to ##{ method }"
            end
         end
      end
   end
   include QueueValidation
end

#:update_period(: update_period) ⇒ Numeric (readonly)

Returns Delay (seconds) between reloads of tasks from the task store.

Returns:

  • (Numeric)

    Delay (seconds) between reloads of tasks from the task store.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/procrastinator/queue.rb', line 18

class Queue
   extend Forwardable

   # Default number of seconds to wait for a task to complete
   DEFAULT_TIMEOUT = 3600 # in seconds; one hour total

   # Default number of times to retry a task
   DEFAULT_MAX_ATTEMPTS = 20

   # Default amount of time between checks for new Tasks
   DEFAULT_UPDATE_PERIOD = 10 # seconds

   attr_reader :name, :max_attempts, :timeout, :update_period, :task_store, :task_class

   alias store task_store
   alias storage task_store

   def_delegators :@task_store, :read, :update, :delete

   # Timeout is in seconds
   def initialize(name:, task_class:,
                  max_attempts: DEFAULT_MAX_ATTEMPTS,
                  timeout: DEFAULT_TIMEOUT,
                  update_period: DEFAULT_UPDATE_PERIOD,
                  store: TaskStore::SimpleCommaStore.new)
      raise ArgumentError, ':name cannot be nil' unless name

      raise ArgumentError, ':task_class cannot be nil' unless task_class
      raise ArgumentError, 'Task class must be initializable' unless task_class.respond_to? :new

      raise ArgumentError, ':timeout cannot be negative' if timeout&.negative?

      @name          = name.to_s.strip.gsub(/[^A-Za-z0-9]+/, '_').to_sym
      @task_class    = task_class
      @task_store    = store
      @max_attempts  = max_attempts
      @timeout       = timeout
      @update_period = update_period

      validate!

      freeze
   end

   # Constructs the next available task on the queue.
   #
   # @param logger [Logger] logger to provide to the constructed task handler
   # @param container [Object, nil] container to provide to the constructed task handler
   # @param scheduler [Procrastinator::Scheduler, nil] the scheduler to provide to the constructed task handler
   # @return [LoggedTask, nil] A Task or nil if no task is found
   def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
       = next_metas.find(&:runnable?)

      return nil unless 

      task = Task.new(, task_handler(data:      .data,
                                             container: container,
                                             logger:    logger,
                                             scheduler: scheduler))

      LoggedTask.new(task, logger: logger)
   end

   # Fetch a task matching the given identifier
   #
   # @param identifier [Hash] attributes to match
   #
   # @raise [NoSuchTaskError] when no task matches the identifier.
   # @raise [AmbiguousTaskFilterError] when many tasks match the identifier, meaning you need to be more specific.
   def fetch_task(identifier)
      identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

      tasks = read(**identifier)

      raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
      if tasks.size > 1
         raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
      end

      args = tasks.first.merge(queue: self)

      TaskMetaData.new(**args)
   end

   # Creates a task on the queue, saved using the Task Store strategy.
   #
   # @param run_at [Time] Earliest time to attempt running the task
   # @param expire_at [Time, nil] Time after which the task will not be attempted
   # @param data [Hash, String, Numeric, nil] The data to save
   #
   # @raise [ArgumentError] when the keyword `:data` is needed by the task handler, but is missing
   # @raise [MalformedTaskError] when the keyword `:data` is provided but not expected by the task handler.
   def create(run_at:, expire_at:, data:)
      if data.nil? && expects_data?
         raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
      end

      unless data.nil? || expects_data?
         raise MalformedTaskError, <<~ERROR
            found unexpected :data argument. Either do not provide :data when scheduling a task,
            or add this in the #{ @task_class } class definition:
                  attr_accessor :data
         ERROR
      end

      # TODO: shorten to using slice once updated to Ruby 2.5+
      attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}

      create_data = TaskMetaData.new(**attrs).to_h
      create_data.delete(:id)
      create_data.delete(:attempts)
      create_data.delete(:last_fail_at)
      create_data.delete(:last_error)
      @task_store.create(**create_data)
   end

   # @return [Boolean] whether the task handler will accept data to be assigned via its :data attribute
   def expects_data?
      @task_class.method_defined?(:data=)
   end

   private

   def task_handler(data: nil, container: nil, logger: nil, scheduler: nil)
      handler           = @task_class.new
      handler.data      = data if handler.respond_to?(:data=)
      handler.container = container
      handler.logger    = logger
      handler.scheduler = scheduler
      handler
   end

   def next_metas
      tasks = read(queue: @name).reject { |t| t[:run_at].nil? }.collect do |t|
         t.to_h.delete_if { |key| !TaskMetaData::EXPECTED_DATA.include?(key) }.merge(queue: self)
      end

      sort_tasks(tasks.collect { |t| TaskMetaData.new(**t) })
   end

   def sort_tasks(tasks)
      # TODO: improve this
      # shuffling and re-sorting to avoid worst case O(n^2) when receiving already sorted data
      # on quicksort (which is default ruby sort). It is not unreasonable that the persister could return sorted
      # results
      # Ideally, we'd use a better algo than qsort for this, but this will do for now
      tasks.shuffle.sort_by(&:run_at)
   end

   # Internal queue validator
   module QueueValidation
      private

      def validate!
         verify_task_class!
         verify_task_store!
      end

      def verify_task_class!
         verify_run_method!
         verify_accessors!
         verify_hooks!
      end

      # The interface compliance is checked on init because it's one of those rare cases where you want to know early;
      # otherwise, you wouldn't know until task execution and that could be far in the future.
      # UX is important for devs, too.
      #    - R
      def verify_run_method!
         unless @task_class.method_defined? :run
            raise MalformedTaskError, "task #{ @task_class } does not support #run method"
         end

         return unless @task_class.instance_method(:run).arity.positive?

         raise MalformedTaskError, "task #{ @task_class } cannot require parameters to its #run method"
      end

      def verify_accessors!
         [:logger, :container, :scheduler].each do |method_name|
            next if @task_class.method_defined?(method_name) && @task_class.method_defined?("#{ method_name }=")

            raise MalformedTaskError, <<~ERR
               Task handler is missing a #{ method_name } accessor. Add this to the #{ @task_class } class definition:
                  attr_accessor :logger, :container, :scheduler
            ERR
         end
      end

      def verify_hooks!
         expected_arity = 1

         [:success, :fail, :final_fail].each do |method_name|
            next unless @task_class.method_defined?(method_name)
            next if @task_class.instance_method(method_name).arity == expected_arity

            err = "task #{ @task_class } must accept #{ expected_arity } parameter to its ##{ method_name } method"

            raise MalformedTaskError, err
         end
      end

      def verify_task_store!
         raise ArgumentError, ':store cannot be nil' if @task_store.nil?

         [:read, :create, :update, :delete].each do |method|
            unless @task_store.respond_to? method
               raise MalformedTaskStoreError, "task store #{ @task_store.class } must respond to ##{ method }"
            end
         end
      end
   end
   include QueueValidation
end

#max_attemptsObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def max_attempts
  @max_attempts
end

#nameObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def name
  @name
end

#task_classObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def task_class
  @task_class
end

#task_storeObject (readonly) Also known as: store, storage

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def task_store
  @task_store
end

#timeoutObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def timeout
  @timeout
end

#update_periodObject (readonly)

seconds



30
31
32
# File 'lib/procrastinator/queue.rb', line 30

def update_period
  @update_period
end

Instance Method Details

#create(run_at:, expire_at:, data:) ⇒ Object

Creates a task on the queue, saved using the Task Store strategy.

Parameters:

  • run_at (Time)

    Earliest time to attempt running the task

  • expire_at (Time, nil)

    Time after which the task will not be attempted

  • data (Hash, String, Numeric, nil)

    The data to save

Raises:

  • (ArgumentError)

    when the keyword ‘:data` is needed by the task handler, but is missing

  • (MalformedTaskError)

    when the keyword ‘:data` is provided but not expected by the task handler.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/procrastinator/queue.rb', line 110

def create(run_at:, expire_at:, data:)
   if data.nil? && expects_data?
      raise ArgumentError, "task #{ @task_class } expects to receive :data. Provide :data to #delay."
   end

   unless data.nil? || expects_data?
      raise MalformedTaskError, <<~ERROR
         found unexpected :data argument. Either do not provide :data when scheduling a task,
         or add this in the #{ @task_class } class definition:
               attr_accessor :data
      ERROR
   end

   # TODO: shorten to using slice once updated to Ruby 2.5+
   attrs = {queue: self, run_at: run_at, initial_run_at: run_at, expire_at: expire_at, data: JSON.dump(data)}

   create_data = TaskMetaData.new(**attrs).to_h
   create_data.delete(:id)
   create_data.delete(:attempts)
   create_data.delete(:last_fail_at)
   create_data.delete(:last_error)
   @task_store.create(**create_data)
end

#expects_data?Boolean

Returns whether the task handler will accept data to be assigned via its :data attribute.

Returns:

  • (Boolean)

    whether the task handler will accept data to be assigned via its :data attribute



135
136
137
# File 'lib/procrastinator/queue.rb', line 135

def expects_data?
   @task_class.method_defined?(:data=)
end

#fetch_task(identifier) ⇒ Object

Fetch a task matching the given identifier

Parameters:

  • identifier (Hash)

    attributes to match

Raises:



87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/procrastinator/queue.rb', line 87

def fetch_task(identifier)
   identifier[:data] = JSON.dump(identifier[:data]) if identifier[:data]

   tasks = read(**identifier)

   raise NoSuchTaskError, "no task found matching #{ identifier }" if tasks.nil? || tasks.empty?
   if tasks.size > 1
      raise AmbiguousTaskFilterError, "too many (#{ tasks.size }) tasks match #{ identifier }. Found: #{ tasks }"
   end

   args = tasks.first.merge(queue: self)

   TaskMetaData.new(**args)
end

#next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil) ⇒ LoggedTask?

Constructs the next available task on the queue.

Parameters:

  • logger (Logger) (defaults to: Logger.new(StringIO.new))

    logger to provide to the constructed task handler

  • container (Object, nil) (defaults to: nil)

    container to provide to the constructed task handler

  • scheduler (Procrastinator::Scheduler, nil) (defaults to: nil)

    the scheduler to provide to the constructed task handler

Returns:

  • (LoggedTask, nil)

    A Task or nil if no task is found



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/procrastinator/queue.rb', line 68

def next_task(logger: Logger.new(StringIO.new), container: nil, scheduler: nil)
    = next_metas.find(&:runnable?)

   return nil unless 

   task = Task.new(, task_handler(data:      .data,
                                          container: container,
                                          logger:    logger,
                                          scheduler: scheduler))

   LoggedTask.new(task, logger: logger)
end