Module: Quark::CommandProcessor

Included in:
Server::WebsocketServer::WebsocketProcessor
Defined in:
lib/quark/commands/put.rb,
lib/quark/commands/list.rb,
lib/quark/commands/peek.rb,
lib/quark/commands/ping.rb,
lib/quark/commands/tree.rb,
lib/quark/commands/fetch.rb,
lib/quark/commands/observe.rb,
lib/quark/command_processor.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.process_command(data, userdata = nil) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/quark/command_processor.rb', line 31

def self.process_command(data, userdata=nil)
# if the data perfectly resembles a graphite-formatted metric (without a command prefix)
# then assume the command is "observe"
  if Quark::Config.get("quark.graphite_compat") and data =~ /^[\w\.\-]+ -?[\d\.]+ \d{10,13}$/
    command   = "PUT"
    arguments = data.chomp
  else
    if data =~ /\}$/
      data, linedata = data.chomp.split('{',2)

      userdata = (userdata || {}).merge(Hash[linedata.gsub(/(^\{|\}$)/,'').split(/,\s*/).collect{|i|
        i.split('=',2)
      }])
    end

    command, arguments = data.strip.chomp.split(' ',2)
  end

  begin
    if methods.include?(:"process_command_#{command.downcase}")
      rv = send(:"process_command_#{command.downcase}", arguments)

      if rv.nil?
        return {
          :success   => true,
          :command   => command,
          :arguments => arguments,
          :userdata  => userdata
        }
      else
        return {
          :success   => true,
          :command   => command,
          :arguments => arguments,
          :results   => rv,
          :userdata  => userdata
        }
      end
    else
      raise Quark::InvalidCommand.new("Unknown command '#{command.upcase}'")
    end

  rescue Quark::Error => e
    return {
      :success => false,
      :userdata  => userdata,
      :error => {
        :command => data.strip.chomp,
        :class   => e.class.name,
        :message => e.message
      }
    }

  rescue Exception => e
    return {
      :success => false,
      :userdata  => userdata,
      :error => {
        :command   => data.strip.chomp,
        :class     => e.class.name,
        :message   => e.message,
        :backtrace => e.backtrace
      }
    }
  end
end

.process_command_fetch(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/quark/commands/fetch.rb', line 3

def self.process_command_fetch(data)
  raise Quark::ArgumentError.new("FETCH command requires at least 1 argument") if data.nil? or data.empty?

  rv = {}
  keys, from, to = data.split(' ')
  now = (Time.now.to_f * 1000).to_i

# negative from/to values are treated as relative number of seconds from now
  if from.to_i < 0
    from = (now - (-1 * from.to_i * 1000))
  end

  if to.to_i < 0
    to = (now - (-1 * to.to_i * 1000))
  end

# if not set, to defaults to now
  to ||= now


  raise Quark::ArgumentError.new("FETCH 'from' argument must be numeric") if not from.nil? and (Integer(from) rescue false) === false
  raise Quark::ArgumentError.new("FETCH 'to' argument must be numeric") if not to.nil? and (Integer(to) rescue false) === false

# find all matching keys from all blocks
  @redis.keys("#{@_prefix}:observations:#{keys}:*").each do |key|
    x, y, name, block = key.split(':',4)
    block = block.to_i

  # skip keys whose block falls outside of the given time range (if given)
    next if block > 0 and not from.nil? and block < (from.to_i/Quark::Config.get("quark.blocksize").to_i).to_i
    next if block > 0 and not to.nil?   and block > (to.to_i/Quark::Config.get("quark.blocksize").to_i).to_i

  # get all values, sort by timestamp ascending, and append to ouput (within range, if given)
    @redis.hgetall(key).sort{|a,b|
      a[0] <=> b[0]
    }.each do |timestamp, value|
      timestamp = timestamp.to_i

      if timestamp >= from.to_i and timestamp <= to.to_i
        rv[name] ||= []
        rv[name] << [timestamp, value.to_f]
      end
    end
  end

  return rv
end

.process_command_list(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/quark/commands/list.rb', line 3

def self.process_command_list(data)
  pattern, fields, delimiter = data.strip.chomp.split(' ', 3)
  fields = fields.to_s.split(',')
  fields = fields.collect{|f| f.split('-',2) }

  return @redis.keys("#{@_prefix}:observations:#{pattern}:*").collect{|i|
    value = i.split(':')[2]

    if not fields.empty?
      rv = []
      parts = value.split(delimiter || '.')
      fields.each do |range|
        if range.length == 1
          rv << parts[range.first.to_i]
        elsif range.length == 2
          rv << parts[(range.first.to_i)..(range.last.to_i)]
        end
      end

      value = rv.compact.join(delimiter || '.')
    end

    value
  }.compact.sort.uniq()
end

.process_command_observe(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/quark/commands/observe.rb', line 3

def self.process_command_observe(data)
  id, value, timestamp = data.chomp.split(' ')
  timestamp = timestamp.to_i

# automatically convert epoch seconds to milliseconds to provide Graphite metric compatibility
  if timestamp < 2147483647
    (timestamp = timestamp * 1000)
  end

  block = Integer(timestamp / Quark::Config.get("quark.blocksize").to_i)

  if Quark::Config.get("debug")
    puts "OBSERVE #{@_prefix}:#{id}:#{block} #{value} #{timestamp}"
  end

  @redis.hset("#{@_prefix}:observations:#{id}:#{block}", timestamp, value)
  @redis.sadd("#{@_prefix}:blocks:#{Quark::Config.get("quark.blocksize").to_i}", block)

  return ({
    :metric => id,
    :block  => block
  })
end

.process_command_peek(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/quark/commands/peek.rb', line 3

def self.process_command_peek(data)
  rv = {}
  keys = @redis.keys("#{@_prefix}:observations:#{data}:*")
  peek_keys = []

# get unique metric names
  keys.collect{|i|
    i.split(':')[2]
  }.uniq().each do |name|
  # get most recent bucket for this metric
    peek_keys << keys.select{|i|
      i.split(':')[2] == name
    }.sort.last
  end

# now that we have the most recent bucket, pull the most recent value from that bucket
  peek_keys.each do |peek|
    data = @redis.hgetall(peek).sort{|a,b|
      a[0] <=> b[0]
    }.last

    rv[peek.split(':')[2]] = [data[0].to_i, data[1].to_f]
  end

  return rv
end

.process_command_ping(data) ⇒ Object



5
6
7
8
9
10
11
12
# File 'lib/quark/commands/ping.rb', line 5

def self.process_command_ping(data)
  return {
    :alive    => true,
    :version  => Quark::VERSION,
    :hostname => Socket.gethostname(),
    :time     => (Time.now.to_f * 1000).to_i
  }
end

.process_command_put(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/quark/commands/put.rb', line 3

def self.process_command_put(data)
  id, value, timestamp = data.chomp.split(' ')
  timestamp = timestamp.to_i

# automatically convert epoch seconds to milliseconds to provide Graphite metric compatibility
  if timestamp < 2147483647
    (timestamp = timestamp * 1000)
  end

  block = Integer(timestamp / Quark::Config.get("quark.blocksize").to_i)

  if Quark::Config.get("debug")
    puts "PUT #{@_prefix}:#{id}:#{block} #{value} #{timestamp}"
  end

  @redis.hset("#{@_prefix}:observations:#{id}:#{block}", timestamp, value)
  @redis.sadd("#{@_prefix}:blocks:#{Quark::Config.get("quark.blocksize").to_i}", block)

  return nil
end

.process_command_tree(data) ⇒ Object



3
4
5
6
7
8
9
10
11
# File 'lib/quark/commands/tree.rb', line 3

def self.process_command_tree(data)
  root, max_age = data.to_s.strip.chomp.split(' ', 2)
  depth = root.split('.').length
  root = (root.nil? or root.empty? ? '*' : "#{root}.*")

  return @redis.keys("#{@_prefix}:observations:#{root}:*").collect{|i|
    i.split(':')[2].split('.')[0..depth].join('.')
  }.compact.sort.uniq()
end

.setup(redis, prefix = 'quark') ⇒ Object



26
27
28
29
# File 'lib/quark/command_processor.rb', line 26

def self.setup(redis, prefix='quark')
  @redis   = redis
  @_prefix = prefix
end

Instance Method Details

#post_initObject



8
9
# File 'lib/quark/command_processor.rb', line 8

def post_init()
end

#receive_data(data) ⇒ Object



11
12
13
14
15
16
17
18
# File 'lib/quark/command_processor.rb', line 11

def receive_data(data)
  return data.split("\n").collect do |line|
    puts "DEBUG: Received '#{line.chomp.strip}'" if Quark::Config.get("debug")
    line = Quark::CommandProcessor.process_command(line)
    send_json(line) unless line[:results].nil?
    line
  end
end

#send_json(data) ⇒ Object



20
21
22
23
24
# File 'lib/quark/command_processor.rb', line 20

def send_json(data)
  data = MultiJson.dump(data)+"\n"
  puts "DEBUG: Replying with #{data.chomp}" if Quark::Config.get("debug")
  send_data(data)
end

#unbindObject



98
99
# File 'lib/quark/command_processor.rb', line 98

def unbind()
end