Class: Pigeon::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/pigeon/queue.rb

Defined Under Namespace

Classes: BlockRequired, TaskNotQueued

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ Queue

Creates a new queue. If a block is given, it is used to compare two tasks and order them, so it should take two arguments and return the relative difference (-1, 0, 1) like Array#sort would work.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/pigeon/queue.rb', line 48

def initialize(&block)
  @filters = self.class.filters.dup
  @filters.extend(MonitorMixin)
  @observers = { }
  @observers.extend(MonitorMixin)
  @claimable_task = { }
  @processors = [ ]
  @next_task = { }
  
  if (block_given?)
    @sort_by = block
  else
    @sort_by = lambda { |a,b| a.priority <=> b.priority }
  end

  @tasks = Pigeon::SortedArray.new(&@sort_by)
end

Instance Attribute Details

#processorsObject (readonly)

Properties ===========================================================



24
25
26
# File 'lib/pigeon/queue.rb', line 24

def processors
  @processors
end

Class Method Details

.filter(name, &block) ⇒ Object

Defines a new filter with the given name and uses the supplied block to evaluate if a task qualifies or not.



39
40
41
# File 'lib/pigeon/queue.rb', line 39

def self.filter(name, &block)
  filters[name] = block
end

.filtersObject

Returns the current filter configuration. This is stored as a Hash with the key being the filter name, the value being the matching block. The nil key is the default filter which accepts all tasks.



31
32
33
34
35
# File 'lib/pigeon/queue.rb', line 31

def self.filters
  @filters ||= {
    nil => lambda { |task| true }
  }
end

Instance Method Details

#<<(task) ⇒ Object

Adds a task to the queue.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/pigeon/queue.rb', line 136

def <<(task)
  unless (task.is_a?(Pigeon::Task))
    raise "Cannot add task of class #{task.class} to #{self.class}"
  end
  
  # If there is an insert operation already in progress, put this task in
  # the backlog for subsequent processing.
  
  Pigeon::Engine.execute_in_main_thread do
    self.execute_add_task!(task)
  end
  
  task
end

#add_processor(processor, &claim) ⇒ Object

Adds a processor to the queue and adds an observer claim method.



106
107
108
109
110
111
112
# File 'lib/pigeon/queue.rb', line 106

def add_processor(processor, &claim)
  @observers.synchronize do
    @processors << processor
  end

  observe(&claim) if (claim)
end

#claim(task) ⇒ Object

Claims a task. This is used to indicate that the task will be processed without having to be inserted into the queue.



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/pigeon/queue.rb', line 292

def claim(task)
  @filters.synchronize do
    if (@claimable_task[task])
      @claimable_task[task] = false
    elsif (@tasks.delete(task))
      @next_task.each do |filter_name, next_task|
        if (task == next_task)
          @next_task[filter_name] = nil
        end
      end
    else
      raise TaskNotQueued, task
    end
  end
    
  task
end

#eachObject

Iterates over each of the tasks in the queue.



200
201
202
203
204
205
206
207
208
# File 'lib/pigeon/queue.rb', line 200

def each
  @filters.synchronize do
    tasks = @tasks.dup
  end
  
  tasks.each do
    yield(task)
  end
end

#empty?(filter_name = nil, &block) ⇒ Boolean

Returns true if the queue is empty, false otherwise. If filter_name is given, then will return true if there are no matching tasks, false otherwise. An optional block can further restrict qualifying tasks.

Returns:

  • (Boolean)


320
321
322
323
324
325
326
327
328
# File 'lib/pigeon/queue.rb', line 320

def empty?(filter_name = nil, &block)
  if (block_given?)
    @filters.synchronize do
      !@tasks.find(&block)
    end
  else
    !peek(filter_name)
  end
end

#execute_add_task!(task) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/pigeon/queue.rb', line 151

def execute_add_task!(task)
  # Set the claimable task flag for this task since it is not yet in the
  # actual task queue.
  @claimable_task[task] = true

  unless (@observers.empty?)
    @observers.synchronize do
      @observers.each do |filter_name, list|
        # Check if this task matches the filter restrictions, and if it
        # does then call the observer chain in order.
        if (@filters[filter_name].call(task))
          @observers[filter_name].each do |proc|
            case (proc.arity)
            when 2
              proc.call(self, task)
            else
              proc.call(task)
            end

            # An observer callback has the opportunity to claim a task,
            # and if it does, the claimable task flag will be false. Loop
            # only while the task is claimable.
            break unless (@claimable_task[task])
          end
        end
      end
    end
  end

  # If this task wasn't claimed by an observer then insert it in the
  # main task queue.
  if (@claimable_task.delete(task))
    @filters.synchronize do
      @tasks << task
      
      # Update the next task slots for all of the unassigned filters and
      # trigger observer callbacks as required.
      @next_task.each do |filter_name, next_task|
        next if (next_task)
        
        if (@filters[filter_name].call(task))
          @next_task[filter_name] = task
        end
      end
    end
  end
end

#filter(filter_name, &block) ⇒ Object

Creates a named filter for the queue using the provided block to select the tasks which should match.

Raises:



125
126
127
128
129
130
131
132
133
# File 'lib/pigeon/queue.rb', line 125

def filter(filter_name, &block)
  raise BlockRequired unless (block_given?)

  @filters.synchronize do
    @filters[filter_name] = block
  end
  
  assign_next_task(filter_name)
end

#include?(task) ⇒ Boolean

Returns true if the task is queued, false otherwise.

Returns:

  • (Boolean)


311
312
313
314
315
# File 'lib/pigeon/queue.rb', line 311

def include?(task)
  @filters.synchronize do
    @tasks.include?(task)
  end
end

#length(filter_name = nil, &block) ⇒ Object Also known as: size, count

Returns the number of entries in the queue. If filter_name is given, then will return the number of matching tasks. An optional block can further restrict qualifying tasks.



333
334
335
336
337
338
339
# File 'lib/pigeon/queue.rb', line 333

def length(filter_name = nil, &block)
  filter_proc = @filters[filter_name] 

  @filters.synchronize do
    filter_proc ? @tasks.count(&filter_proc) : nil
  end
end

#observe(filter_name = nil, &block) ⇒ Object

Sets up a callback for the queue that will execute the block if new tasks are added to the queue. If filter_name is specified, this block will be run for tasks matching that filtered subset.

Raises:



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/pigeon/queue.rb', line 82

def observe(filter_name = nil, &block)
  raise BlockRequired unless (block_given?)
  
  @observers.synchronize do
    set = @observers[filter_name] ||= [ ]

    set << block
  end

  task = assign_next_task(filter_name)
end

#peek(filter_name = nil, &block) ⇒ Object

Peeks at the next task in the queue, or if filter_name is provided, then the next task meeting those filter conditions. An optional block can also be used to further restrict the qualifying tasks.



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# File 'lib/pigeon/queue.rb', line 213

def peek(filter_name = nil, &block)
  if (block_given?)
    @filters.synchronize do
      @tasks.find(&block)
    end
  elsif (filter_name)
    @next_task[filter_name] ||= begin
      @filters.synchronize do
        filter_proc = @filters[filter_name]
    
        filter_proc and @tasks.find(&filter_proc)
      end
    end
  else
    @filters.synchronize do
      @tasks.first
    end
  end
end

#pop(filter_name = nil, &block) ⇒ Object

Returns the next task from the queue. If a filter_name is given, then will only select tasks matching that filter’s conditions. An optional block can also be used to further restrict the qualifying tasks. The task will be removed from the queue and must be re-inserted if it is to be scheduled again.



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/pigeon/queue.rb', line 261

def pop(filter_name = nil, &block)
  @filters.synchronize do
    task =
      if (block_given?)
        @tasks.find(&block)
      elsif (filter_name)
        @next_task[filter_name] || begin
          filter_proc = @filters[filter_name]

          filter_proc and @tasks.find(&filter_proc)
        end
      else
        @tasks.first
      end
  
    if (task)
      @tasks.delete(task)

      @next_task.each do |filter_name, next_task|
        if (task == next_task)
          @next_task[filter_name] = nil
        end
      end
    end

    task
  end
end

#pull(filter_name = nil, &block) ⇒ Object

Removes all tasks from the queue. If a filter_name is given, then will only remove tasks matching that filter’s conditions. An optional block can also be used to further restrict the qualifying tasks.



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/pigeon/queue.rb', line 236

def pull(filter_name = nil, &block)
  if (!block_given? and filter_name)
    block = @filters[filter_name]
  end
  
  @filters.synchronize do
    tasks = block ? @tasks.select(&block) : @tasks

    @tasks -= tasks
    
    @next_task.each do |filter_name, next_task|
      if (tasks.include?(@next_task[filter_name]))
        @next_task[filter_name] = nil
      end
    end
    
    tasks
  end
end

#remove_observer(filter_name = nil, &block) ⇒ Object

Removes references to the callback function specified. Note that the same Proc must be passed in, as a block with an identical function will not be considered equivalent.



97
98
99
100
101
102
103
# File 'lib/pigeon/queue.rb', line 97

def remove_observer(filter_name = nil, &block)
  @observers.synchronize do
    set = @observers[filter_name]

    set and set.delete(block)
  end
end

#remove_processor(processor, &claim) ⇒ Object

Removes a processor from the queue and removes an observer claim method.



115
116
117
118
119
120
121
# File 'lib/pigeon/queue.rb', line 115

def remove_processor(processor, &claim)
  @observers.synchronize do
    @processors.delete(processor)
  end

  remove_observer(&claim) if (claim)
end

#sort_by(&block) ⇒ Object

Returns the contents sorted by the given block. The block will be passed a single Task and the results are sorted by the return value.

Raises:



68
69
70
71
72
73
74
75
76
77
# File 'lib/pigeon/queue.rb', line 68

def sort_by(&block)
  raise BlockRequired unless (block_given?)

  @sort_by = block
  @filters.synchronize do
    @tasks = Pigeon::SortedArray.new(&@sort_by) + @tasks
    
    @next_task = { }
  end
end

#to_aObject

Copies the list of queued tasks to a new Array.



344
345
346
347
348
# File 'lib/pigeon/queue.rb', line 344

def to_a
  @filters.synchronize do
    @tasks.dup
  end
end