Class: Swarm::Engine::Volatile::Queue

Inherits:
Queue
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/swarm/engine/volatile/queue.rb

Defined Under Namespace

Classes: Tube

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Queue

#bury_job, #clean_up_job, #remove_worker

Constructor Details

#initialize(name:) ⇒ Queue

Returns a new instance of Queue.



24
25
26
27
# File 'lib/swarm/engine/volatile/queue.rb', line 24

def initialize(name:)
  @name = name
  @tube = self.class.get_tube(name)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



12
13
14
# File 'lib/swarm/engine/volatile/queue.rb', line 12

def name
  @name
end

#tubeObject (readonly)

Returns the value of attribute tube.



12
13
14
# File 'lib/swarm/engine/volatile/queue.rb', line 12

def tube
  @tube
end

Class Method Details

.get_tube(name) ⇒ Object



19
20
21
# File 'lib/swarm/engine/volatile/queue.rb', line 19

def get_tube(name)
  tubes[name] ||= Tube.new([], [])
end

.tubesObject



15
16
17
# File 'lib/swarm/engine/volatile/queue.rb', line 15

def tubes
  @tubes ||= {}
end

Instance Method Details

#add_job(data) ⇒ Object



34
35
36
37
38
# File 'lib/swarm/engine/volatile/queue.rb', line 34

def add_job(data)
  new_job = Job.new(queue: self, data: data)
  jobs << new_job
  new_job
end

#add_worker(worker) ⇒ Object



75
76
77
# File 'lib/swarm/engine/volatile/queue.rb', line 75

def add_worker(worker)
  workers << worker
end

#clearObject



67
68
69
# File 'lib/swarm/engine/volatile/queue.rb', line 67

def clear
  tube.jobs = []
end

#delete_job(job_to_delete) ⇒ Object



59
60
61
# File 'lib/swarm/engine/volatile/queue.rb', line 59

def delete_job(job_to_delete)
  jobs.delete_if { |job| job == job_to_delete }
end

#has_job?(job_to_find) ⇒ Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/swarm/engine/volatile/queue.rb', line 63

def has_job?(job_to_find)
  jobs.any? { |job| job == job_to_find }
end

#idle?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/swarm/engine/volatile/queue.rb', line 79

def idle?
  jobs.empty?
end

#prepare_for_work(worker) ⇒ Object



29
30
31
32
# File 'lib/swarm/engine/volatile/queue.rb', line 29

def prepare_for_work(worker)
  add_worker(worker) unless workers.include?(worker)
  self
end

#reserve_job(worker) ⇒ Object



48
49
50
51
52
53
54
55
56
57
# File 'lib/swarm/engine/volatile/queue.rb', line 48

def reserve_job(worker)
  wait_for_job
  index = jobs.index { |job| job.available? }
  raise JobNotFoundError unless index
  job = jobs[index]
  job.reserve!(worker)
  job
rescue JobNotFoundError, Job::AlreadyReservedError
  raise JobReservationFailed
end

#wait_for_jobObject



40
41
42
43
44
45
46
# File 'lib/swarm/engine/volatile/queue.rb', line 40

def wait_for_job
  delay_time = 0
  until jobs.count > 0
    delay_time += 0.01 unless delay_time > 1.0
    sleep(delay_time)
  end
end

#worker_countObject



71
72
73
# File 'lib/swarm/engine/volatile/queue.rb', line 71

def worker_count
  workers.count
end