Module: Qs::Daemon::InstanceMethods

Defined in:
lib/qs/daemon.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#daemon_dataObject (readonly)

Returns the value of attribute daemon_data.



31
32
33
# File 'lib/qs/daemon.rb', line 31

def daemon_data
  @daemon_data
end

#loggerObject (readonly)

Returns the value of attribute logger.



31
32
33
# File 'lib/qs/daemon.rb', line 31

def logger
  @logger
end

#queue_redis_keysObject (readonly)

Returns the value of attribute queue_redis_keys.



32
33
34
# File 'lib/qs/daemon.rb', line 32

def queue_redis_keys
  @queue_redis_keys
end

#signals_redis_keyObject (readonly)

Returns the value of attribute signals_redis_key.



32
33
34
# File 'lib/qs/daemon.rb', line 32

def signals_redis_key
  @signals_redis_key
end

Instance Method Details

#halt(wait = false) ⇒ Object



94
95
96
97
98
99
# File 'lib/qs/daemon.rb', line 94

def halt(wait = false)
  return unless self.running?
  @signal.set :halt
  wakeup_work_loop_thread
  wait_for_shutdown if wait
end

#initializeObject

  • Set the size of the client to the max workers + 1. This ensures we have 1 connection for fetching work from redis and at least 1 connection for each worker to requeue its message when hard-shutdown.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/qs/daemon.rb', line 37

def initialize
  self.class.configuration.validate!
  Qs.init
  @daemon_data = DaemonData.new(self.class.configuration.to_hash)
  @logger = @daemon_data.logger

  @client = QsClient.new(Qs.redis_config.merge({
    :timeout => 1,
    :size    => self.daemon_data.max_workers + 1
  }))
  @queue_redis_keys = self.daemon_data.queue_redis_keys

  @work_loop_thread = nil
  @worker_pool      = nil

  @signals_redis_key = "signals:#{@daemon_data.name}-" \
                       "#{Socket.gethostname}-#{::Process.pid}"

  @worker_available_io = IOPipe.new
  @signal = Signal.new(:stop)
rescue InvalidError => exception
  exception.set_backtrace(caller)
  raise exception
end

#nameObject



62
63
64
# File 'lib/qs/daemon.rb', line 62

def name
  @daemon_data.name
end

#pid_fileObject



70
71
72
# File 'lib/qs/daemon.rb', line 70

def pid_file
  @daemon_data.pid_file
end

#process_labelObject



66
67
68
# File 'lib/qs/daemon.rb', line 66

def process_label
  @daemon_data.process_label
end

#running?Boolean



74
75
76
# File 'lib/qs/daemon.rb', line 74

def running?
  !!(@work_loop_thread && @work_loop_thread.alive?)
end

#startObject

  • Ping redis to check that it can communicate with redis before running, this is friendlier than starting and continously erroring because it can’t dequeue.



81
82
83
84
85
# File 'lib/qs/daemon.rb', line 81

def start
  @client.ping
  @signal.set :start
  @work_loop_thread ||= Thread.new{ work_loop }
end

#stop(wait = false) ⇒ Object



87
88
89
90
91
92
# File 'lib/qs/daemon.rb', line 87

def stop(wait = false)
  return unless self.running?
  @signal.set :stop
  wakeup_work_loop_thread
  wait_for_shutdown if wait
end