Class: Epi::Job

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/epi/job.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_description, state) ⇒ Job

Returns a new instance of Job.



16
17
18
19
20
21
22
# File 'lib/epi/job.rb', line 16

def initialize(job_description, state)
  @job_description = job_description
  @triggers = job_description.triggers.map { |t| Trigger.make self, *t }
  @expected_count = state['expected_count'] || job_description.initial_processes
  @pids = state['pids']
  @dying_pids = state['dying_pids']
end

Instance Attribute Details

#expected_countObject

Returns the value of attribute expected_count.



8
9
10
# File 'lib/epi/job.rb', line 8

def expected_count
  @expected_count
end

#job_descriptionObject (readonly)

Returns the value of attribute job_description.



7
8
9
# File 'lib/epi/job.rb', line 7

def job_description
  @job_description
end

Instance Method Details

#dying_countObject



114
115
116
# File 'lib/epi/job.rb', line 114

def dying_count
  dying_pids.count
end

#dying_pidsHash

Get a hash of PIDs, with internal process IDs as keys and PIDs as values, for process that are dying

Examples:

{'1a2v3c4d' => 4820}

Returns:

  • (Hash)


44
45
46
# File 'lib/epi/job.rb', line 44

def dying_pids
  @dying_pids ||= {}
end

#loggerObject



12
13
14
# File 'lib/epi/job.rb', line 12

def logger
  Epi.logger
end

#pidsHash

Get a hash of PIDs, with internal process IDs as keys and PIDs as values

Examples:

{'1a2v3c4d' => 4820}

Returns:

  • (Hash)


36
37
38
# File 'lib/epi/job.rb', line 36

def pids
  @pids ||= {}
end

#replace(pid, &callback) ⇒ Object

Replace a running process with a new one

Parameters:

  • pid (Fixnum)

    PID of the process to replace



120
121
122
123
124
125
# File 'lib/epi/job.rb', line 120

def replace(pid, &callback)
  stop_one pid do
    start_one while running_count < expected_count
    callback.call if callback
  end
end

#restart!Object



95
96
97
98
99
100
101
102
103
104
# File 'lib/epi/job.rb', line 95

def restart!
  count = expected_count
  if count > 0
    self.expected_count = 0
    sync!
    self.expected_count = count
    sync!
  end
  self
end

#run_triggers!Object



72
73
74
# File 'lib/epi/job.rb', line 72

def run_triggers!
  @triggers.each &:try
end

#running_countObject



110
111
112
# File 'lib/epi/job.rb', line 110

def running_count
  pids.count
end

#running_processesObject



106
107
108
# File 'lib/epi/job.rb', line 106

def running_processes
  pids.map { |proc_id, pid| [proc_id, ProcessStatus[pid] || RunningProcess.new(pid)] }.select { |_, v| v.was_alive? }.to_h
end

#shutdown!(&callback) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/epi/job.rb', line 76

def shutdown!(&callback)
  count = running_count
  if count > 0
    count.times do
      stop_one do
        count -= 1
        callback.call if callback && count == 0
      end
    end
  else
    callback.call if callback
  end
end

#stateObject

noinspection RubyStringKeysInHashInspection



25
26
27
28
29
30
31
# File 'lib/epi/job.rb', line 25

def state
  {
      'expected_count' => expected_count,
      'pids' => pids,
      'dying_pids' => dying_pids
  }
end

#sync!Object

Stops processes that shouldn't run, starts process that should run, and fires event handlers



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/epi/job.rb', line 50

def sync!

  # Remove non-running PIDs from the list
  pids.reject { |_, pid| ProcessStatus.pids.include? pid }.each do |proc_id, pid|
    logger.debug "Lost process #{pid}"
    pids.delete proc_id
  end

  # Remove non-running PIDs from the dying list. This is just in case
  # the daemon crashed before it was able to clean up a dying worker
  # (i.e. it sent a TERM but didn't get around to sending a KILL)
  dying_pids.select! { |_, pid| ProcessStatus.pids.include? pid }

  # TODO: clean up processes that never died how they should have

  # Run new processes
  start_one while running_count < expected_count

  # Kill old processes
  stop_one while running_count > expected_count
end

#terminate!Object



90
91
92
93
# File 'lib/epi/job.rb', line 90

def terminate!
  self.expected_count = 0
  sync!
end