Class: Swarm::Engine::Volatile::Queue

Inherits:
Queue
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/swarm/engine/volatile/queue.rb

Defined Under Namespace

Classes: Tube

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Queue

#bury_job, #clean_up_job, #remove_worker

Constructor Details

#initialize(name:) ⇒ Queue

Returns a new instance of Queue.



26
27
28
29
# File 'lib/swarm/engine/volatile/queue.rb', line 26

def initialize(name:)
  super
  @tube = self.class.get_tube(name)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



14
15
16
# File 'lib/swarm/engine/volatile/queue.rb', line 14

def name
  @name
end

#tubeObject (readonly)

Returns the value of attribute tube.



14
15
16
# File 'lib/swarm/engine/volatile/queue.rb', line 14

def tube
  @tube
end

Class Method Details

.get_tube(name) ⇒ Object



21
22
23
# File 'lib/swarm/engine/volatile/queue.rb', line 21

def get_tube(name)
  tubes[name] ||= Tube.new([], [])
end

.tubesObject



17
18
19
# File 'lib/swarm/engine/volatile/queue.rb', line 17

def tubes
  @tubes ||= {}
end

Instance Method Details

#add_job(data) ⇒ Object



36
37
38
39
40
# File 'lib/swarm/engine/volatile/queue.rb', line 36

def add_job(data)
  new_job = Job.new(queue: self, data: data)
  jobs << new_job
  new_job
end

#add_worker(worker) ⇒ Object



78
79
80
# File 'lib/swarm/engine/volatile/queue.rb', line 78

def add_worker(worker)
  workers << worker
end

#clearObject



70
71
72
# File 'lib/swarm/engine/volatile/queue.rb', line 70

def clear
  tube.jobs = []
end

#delete_job(job_to_delete) ⇒ Object



62
63
64
# File 'lib/swarm/engine/volatile/queue.rb', line 62

def delete_job(job_to_delete)
  jobs.delete_if { |job| job == job_to_delete }
end

#idle?Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/swarm/engine/volatile/queue.rb', line 82

def idle?
  jobs.empty?
end

#job_exists?(job_to_find) ⇒ Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/swarm/engine/volatile/queue.rb', line 66

def job_exists?(job_to_find)
  jobs.any? { |job| job == job_to_find }
end

#prepare_for_work(worker) ⇒ Object



31
32
33
34
# File 'lib/swarm/engine/volatile/queue.rb', line 31

def prepare_for_work(worker)
  add_worker(worker) unless workers.include?(worker)
  self
end

#reserve_job(worker) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/swarm/engine/volatile/queue.rb', line 50

def reserve_job(worker)
  wait_for_job
  index = jobs.index(&:available?)
  raise JobNotFoundError unless index

  job = jobs[index]
  job.reserve!(worker)
  job
rescue JobNotFoundError, Job::AlreadyReservedError
  raise JobReservationFailed
end

#wait_for_jobObject



42
43
44
45
46
47
48
# File 'lib/swarm/engine/volatile/queue.rb', line 42

def wait_for_job
  delay_time = 0
  until jobs.count > 0
    delay_time += 0.01 unless delay_time > 1.0
    sleep(delay_time)
  end
end

#worker_countObject



74
75
76
# File 'lib/swarm/engine/volatile/queue.rb', line 74

def worker_count
  workers.count
end