Class: Faktory::Launcher

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/faktory/launcher.rb

Constant Summary collapse

PROCTITLES =
[]

Constants included from Util

Util::EXPIRY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #server, #watchdog

Methods included from ExceptionHandler

#handle_exception

Constructor Details

#initialize(options) ⇒ Launcher

Returns a new instance of Launcher.



11
12
13
14
15
16
# File 'lib/faktory/launcher.rb', line 11

def initialize(options)
  merged_options = Faktory.options.merge(options)
  @manager = Faktory::Manager.new(merged_options)
  @current_state = nil
  @options = merged_options
end

Instance Attribute Details

#managerObject

Returns the value of attribute manager.



9
10
11
# File 'lib/faktory/launcher.rb', line 9

def manager
  @manager
end

Instance Method Details

#heartbeatObject



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/faktory/launcher.rb', line 52

def heartbeat
  title = ['faktory-worker', Faktory::VERSION, @options[:tag]].compact.join(" ")
  PROCTITLES << proc { title }
  PROCTITLES << proc { "[#{Processor.busy_count} of #{@options[:concurrency]} busy]" }
  PROCTITLES << proc { "stopping" if stopping? }
  PROCTITLES << proc { "quiet" if quiet? }

  loop do
    $0 = PROCTITLES.map {|p| p.call }.join(" ")

    begin
      result = Faktory.server {|c| c.beat(@current_state) }
      case result
      when "OK"
        # all good
      when "terminate"
        ::Process.kill('TERM', $$)
      when "quiet"
        ::Process.kill('TSTP', $$)
      else
        Faktory.logger.warn "Got unexpected BEAT: #{result}"
      end
    rescue => ex
      # best effort, try again in a few secs
    end
    sleep 10
  end
end

#quietObject

Stops this instance from processing any more jobs,



24
25
26
27
# File 'lib/faktory/launcher.rb', line 24

def quiet
  @current_state = 'quiet'
  @manager.quiet
end

#quiet?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/faktory/launcher.rb', line 44

def quiet?
  @current_state == 'quiet'
end

#runObject



18
19
20
21
# File 'lib/faktory/launcher.rb', line 18

def run
  @thread = safe_thread("heartbeat", &method(:heartbeat))
  @manager.start
end

#stopObject

Shuts down the process. This method does not return until all work is complete and cleaned up. It can take up to the timeout to complete.



32
33
34
35
36
37
38
# File 'lib/faktory/launcher.rb', line 32

def stop
  deadline = Time.now + @options[:timeout]

  @current_state = 'terminate'
  @manager.quiet
  @manager.stop(deadline)
end

#stopping?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/faktory/launcher.rb', line 40

def stopping?
  @current_state == 'terminate'
end