Class: CI::Queue::Redis::Monitor

Inherits:
Object
  • Object
show all
Defined in:
lib/ci/queue/redis/monitor.rb

Constant Summary collapse

DEV_SCRIPTS_ROOT =
::File.expand_path('../../../../../../redis', __FILE__)
RELEASE_SCRIPTS_ROOT =
::File.expand_path('../../redis', __FILE__)
HEADER =
'L'
HEADER_SIZE =
[0].pack(HEADER).bytesize

Instance Method Summary collapse

Constructor Details

#initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key) ⇒ Monitor

Returns a new instance of Monitor.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/ci/queue/redis/monitor.rb', line 16

def initialize(pipe, logger, redis_url, zset_key, processed_key, owners_key, worker_queue_key)
  @zset_key = zset_key
  @processed_key = processed_key
  @owners_key = owners_key
  @worker_queue_key = worker_queue_key
  @logger = logger
  @redis = ::Redis.new(url: redis_url, reconnect_attempts: [0, 0, 0.1, 0.5, 1, 3, 5])
  @shutdown = false
  @pipe = pipe
  @self_pipe_reader, @self_pipe_writer = IO.pipe
  @self_pipe_writer.sync = true
  @queue = []
  @deadlines = {}
  %i[TERM INT USR1].each do |sig|
    Signal.trap(sig) { soft_signal(sig) }
  end
end

Instance Method Details

#eval_script(script, *args) ⇒ Object



49
50
51
# File 'lib/ci/queue/redis/monitor.rb', line 49

def eval_script(script, *args)
  @redis.evalsha(load_script(script), *args)
end

#load_script(script) ⇒ Object



53
54
55
56
# File 'lib/ci/queue/redis/monitor.rb', line 53

def load_script(script)
  @scripts_cache ||= {}
  @scripts_cache[script] ||= @redis.script(:load, read_script(script))
end

#monitorObject



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/ci/queue/redis/monitor.rb', line 105

def monitor
  @logger.debug("Starting monitor")
  ios = [@self_pipe_reader, @pipe]

  until @shutdown
    while (sig = @queue.shift)
      case sig
      when :INT, :TERM
        @logger.debug("Received #{sig}, exiting")
        @shutdown = 0
        break
      else
        raise "Unknown signal: #{sig.inspect}"
      end
    end

    wait_for_events(ios)
  end

  @logger.debug('Done')
  @shutdown
end

#process_messages(io) ⇒ Object



79
80
81
82
83
84
85
# File 'lib/ci/queue/redis/monitor.rb', line 79

def process_messages(io)
  while (message = read_message(io))
    type, kwargs = message
    kwargs.transform_keys!(&:to_sym)
    public_send("process_#{type}", **kwargs)
  end
end

#process_tick!(id:) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/ci/queue/redis/monitor.rb', line 39

def process_tick!(id:)
  eval_script(
    :heartbeat,
    keys: [@zset_key, @processed_key, @owners_key, @worker_queue_key],
    argv: [Time.now.to_f, id]
  )
rescue => error
  @logger.info(error)
end

#read_message(io) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/ci/queue/redis/monitor.rb', line 66

def read_message(io)
  case header = io.read_nonblock(HEADER_SIZE, exception: false)
  when :wait_readable
    nil
  when nil
    @logger.debug('Broken pipe, exiting')
    @shutdown = 0
    false
  else
    JSON.parse(io.read(header.unpack1(HEADER)))
  end
end

#read_script(name) ⇒ Object



58
59
60
61
62
# File 'lib/ci/queue/redis/monitor.rb', line 58

def read_script(name)
  ::File.read(::File.join(DEV_SCRIPTS_ROOT, "#{name}.lua"))
rescue SystemCallError
  ::File.read(::File.join(RELEASE_SCRIPTS_ROOT, "#{name}.lua"))
end

#soft_signal(sig) ⇒ Object



34
35
36
37
# File 'lib/ci/queue/redis/monitor.rb', line 34

def soft_signal(sig)
  @queue << sig
  @self_pipe_writer << '.'
end

#wait_for_events(ios) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/ci/queue/redis/monitor.rb', line 87

def wait_for_events(ios)
  return if @shutdown

  return unless (ready = IO.select(ios, nil, nil, 10))

  ready[0].each do |io|
    case io
    when @self_pipe_reader
      io.read_nonblock(512, exception: false) # Just flush the pipe, the information is in the @queue
    when @pipe
      process_messages(@pipe)
    else
      @logger.debug("Unknown reader: #{io.inspect}")
      raise "Unknown reader: #{io.inspect}"
    end
  end
end