Class: BackgroundQueue::ServerLib::BalancedQueue

Inherits:
PriorityQueue show all
Includes:
QueueRegistry
Defined in:
lib/background_queue/server_lib/balanced_queue.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from QueueRegistry

#add_item, #finish_item, #next_item, #remove_item

Methods inherited from PriorityQueue

#each_item, #empty?, #number_if_items_at_priority, #number_of_priorities, #peek, #pop, #priority, #push, #remove, #stalled=, #stalled?

Constructor Details

#initialize(server) ⇒ BalancedQueue

Returns a new instance of BalancedQueue.



9
10
11
12
13
14
15
16
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 9

def initialize(server)
  @task_registry = BackgroundQueue::ServerLib::TaskRegistry.new
  @condvar = ConditionVariable.new
  @mutex = Mutex.new
  @server = server
  @thread_manager = server.thread_manager
  super()
end

Instance Attribute Details

#serverObject (readonly)

Returns the value of attribute server.



7
8
9
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 7

def server
  @server
end

Class Method Details

.queue_classObject



58
59
60
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 58

def self.queue_class
  BackgroundQueue::ServerLib::Owner
end

Instance Method Details

#add_task(task) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 18

def add_task(task)
  @thread_manager.protect_access {
    status, existing_task = @task_registry.register(task)
    if status != :waiting
      if status == :existing
        remove_item(existing_task)
      end
      add_item(task)
      @thread_manager.signal_access #wake anything reading from the queue
    end
  }
end

#finish_task(task) ⇒ Object



37
38
39
40
41
42
43
44
45
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 37

def finish_task(task)
  @thread_manager.protect_access {
    finish_item(task)
    existing_task = @task_registry.de_register(task.id)
    if existing_task
      add_item(task)
    end
  }
end

#load_from_file(io) ⇒ Object



86
87
88
89
90
91
92
93
94
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 86

def load_from_file(io)
  @server.logger.debug("Loading task queue from file")
  tasks = JSON.parse(io.read, :symbolize_names=>true)
  @server.logger.debug("Adding #{tasks.length} tasks from file")
  for task_data in tasks
    task = Task.new(task_data[:owner_id], task_data[:job_id], task_data[:id], task_data[:priority], task_data[:worker], task_data[:params], task_data[:options])
    add_task(task)
  end
end

#next_taskObject



47
48
49
50
51
52
53
54
55
56
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 47

def next_task
  task = nil
  @thread_manager.control_access {
    task = next_item
    if task.nil?
      @thread_manager.wait_on_access
    end
  }
  task
end

#register_job(job) ⇒ Object



62
63
64
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 62

def register_job(job)
  @server.jobs.register(job)
end

#remove_task(task) ⇒ Object



31
32
33
34
35
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 31

def remove_task(task)
  @thread_manager.protect_access {
    remove_item(task)
  }
end

#save_to_file(io) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 70

def save_to_file(io)
  data = []
  @server.logger.debug("Saving task queue to file")
  @thread_manager.protect_access {
    each_item { |owner|
      owner.each_item { |job|
        job.each_item { |task|
          data << task.to_json_object(true)
        }
      }
    }
  }
  @server.logger.debug("Writing #{data.length} entries to file")
  io.write(JSON.fast_generate(data))
end

#synchronous?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/background_queue/server_lib/balanced_queue.rb', line 66

def synchronous?
  false
end