Class: Emque::Consuming::Runner

Inherits:
Object
  • Object
show all
Includes:
Helpers
Defined in:
lib/emque/consuming/runner.rb

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Runner

Returns a new instance of Runner.



20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/emque/consuming/runner.rb', line 20

def initialize(options = {})
  self.control = Emque::Consuming::Control.new
  self.options = options
  self.receivers = []
  self.status = Emque::Consuming::Status.new
  apply_options
  Emque::Consuming
    .application
    .initialize_logger(daemonized: options.fetch(:daemon) { false })
  self.class.instance = self
  self.pidfile = options.fetch(:pidfile, default_pidfile)
  self.pid = Emque::Consuming::Pidfile.new(pidfile)
end

Class Attribute Details

.instanceObject

Returns the value of attribute instance.



15
16
17
# File 'lib/emque/consuming/runner.rb', line 15

def instance
  @instance
end

Instance Attribute Details

#controlObject

Returns the value of attribute control.



18
19
20
# File 'lib/emque/consuming/runner.rb', line 18

def control
  @control
end

#pidfileObject

Returns the value of attribute pidfile.



18
19
20
# File 'lib/emque/consuming/runner.rb', line 18

def pidfile
  @pidfile
end

#statusObject

Returns the value of attribute status.



18
19
20
# File 'lib/emque/consuming/runner.rb', line 18

def status
  @status
end

Instance Method Details

#appObject



34
35
36
# File 'lib/emque/consuming/runner.rb', line 34

def app
  super
end

#consoleObject



38
39
40
41
# File 'lib/emque/consuming/runner.rb', line 38

def console
  require "pry"
  Pry.start
end

#http?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/emque/consuming/runner.rb', line 43

def http?
  config.status == :on
end

#phased_restartObject



47
48
49
# File 'lib/emque/consuming/runner.rb', line 47

def phased_restart
  receivers.each { |r| r.stop && r.start }
end

#restartObject



51
52
53
# File 'lib/emque/consuming/runner.rb', line 51

def restart
  stop && start
end

#restart_applicationObject



55
56
57
# File 'lib/emque/consuming/runner.rb', line 55

def restart_application
  receivers.first.restart
end

#sock?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/emque/consuming/runner.rb', line 59

def sock?
  true
end

#startObject



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/emque/consuming/runner.rb', line 63

def start
  exit_if_already_running!
  daemonize! if daemonize?
  write_pidfile!
  @persist = Thread.new { loop { sleep 1 } }
  set_process_title
  setup_receivers
  receivers.each(&:start)
  persist.join
rescue Interrupt
  stop
end

#stop(timeout: 5) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/emque/consuming/runner.rb', line 76

def stop(timeout: 5)
  if persist
    Thread.new do
      sleep timeout
      logger.error("Timeout Exceeded. Forcing Shutdown.")
      persist.exit if persist.alive?
    end
    receivers.each(&:stop)
    logger.info("Graceful shutdown successful.")
    logger.info("#{config.app_name.capitalize} stopped.")
    persist.exit if persist.alive?
  else
    Emque::Consuming::Transmitter.send(
      :command => :stop,
      :socket_path => config.socket_path
    )
  end
end