Class: Mamiya::Agent::TaskQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/mamiya/agent/task_queue.rb

Defined Under Namespace

Classes: Stopped

Constant Summary collapse

GRACEFUL_TIMEOUT =
30
JOIN_TIMEOUT =
30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(agent, task_classes: [], logger: Mamiya::Logger.new) ⇒ TaskQueue

Returns a new instance of TaskQueue.



12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/mamiya/agent/task_queue.rb', line 12

def initialize(agent, task_classes: [], logger: Mamiya::Logger.new)
  @agent = agent
  @task_classes = task_classes
  @external_queue = Queue.new
  @queues = {}
  @worker_threads = nil
  @statuses = nil
  @queueing_thread = nil
  @lifecycle_mutex = Mutex.new
  @terminate = false
  @logger = logger.with_clean_progname['task_queue']
end

Instance Attribute Details

#agentObject (readonly)

Returns the value of attribute agent.



25
26
27
# File 'lib/mamiya/agent/task_queue.rb', line 25

def agent
  @agent
end

#task_classesObject (readonly)

Returns the value of attribute task_classes.



25
26
27
# File 'lib/mamiya/agent/task_queue.rb', line 25

def task_classes
  @task_classes
end

#worker_threadsObject (readonly)

Returns the value of attribute worker_threads.



25
26
27
# File 'lib/mamiya/agent/task_queue.rb', line 25

def worker_threads
  @worker_threads
end

Instance Method Details

#enqueue(task_name, task) ⇒ Object

Raises:



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/mamiya/agent/task_queue.rb', line 85

def enqueue(task_name, task)
  raise Stopped, 'this task queue is stopped' unless running?

  if task['_labels'] && !agent.match?(task['_labels'])
    @logger.debug "skipping enqueue #{task_name.inspect}, #{task.inspect}, because agent doesn't match"
    return self
  end

  task.delete '_labels'
  task.delete 'task'
  task['task'] = task_name

  @logger.debug "enqueue #{task_name.inspect}, #{task.inspect}, #{@external_queue.inspect}"
  @external_queue << [task_name, task]
  self
end

#running?Boolean

Returns:

  • (Boolean)


77
78
79
# File 'lib/mamiya/agent/task_queue.rb', line 77

def running?
  @worker_threads && !@terminate
end

#start!Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/mamiya/agent/task_queue.rb', line 27

def start!
  @lifecycle_mutex.synchronize do
    return if running?

    worker_threads = {}
    queues = {}
    statuses = {}

    @task_classes.each { |klass|
      name = klass.identifier.to_sym
      queue = queues[name] = Queue.new
      statuses[name] = {pending: [], lock: Mutex.new}
      th = worker_threads[name] = Thread.new(
        klass, queue,
        statuses[name],
        &method(:worker_loop)
      )
      th.abort_on_exception = true
    }

    @terminate = false
    @statuses = statuses 
    @queues = queues
    exqueue = @external_queue = Queue.new
    @queueing_thread = Thread.new(queues, exqueue, statuses, &method(:queueing_loop))
    @queueing_thread.abort_on_exception = true
    @worker_threads = worker_threads
  end
end

#statusObject



102
103
104
105
106
107
108
109
110
# File 'lib/mamiya/agent/task_queue.rb', line 102

def status
  return nil unless running?
  Hash[@statuses.map do |name, st|
    [name, {
      queue: st[:pending].dup,
      working: st[:working] ? st[:working].dup : nil,
    }]
  end]
end

#stop!(graceful = false) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/mamiya/agent/task_queue.rb', line 57

def stop!(graceful = false)
  @lifecycle_mutex.synchronize do
    return unless running?
    @terminate = true
    @queueing_thread.kill if @queueing_thread.alive?
    if graceful
      @worker_threads.each do |th|
        th.join(GRACEFUL_TIMEOUT)
      end
    end
    @worker_threads.each do |name, th|
      next unless th.alive?
      th.kill
      th.join(JOIN_TIMEOUT) 
    end
    @queues = nil
    @worker_threads = nil
  end
end

#working?Boolean

Returns:

  • (Boolean)


81
82
83
# File 'lib/mamiya/agent/task_queue.rb', line 81

def working?
  running? && status.any? { |name, stat| stat[:working] }
end