Module: Quark::CommandProcessor

Included in:
Server::WebsocketServer::WebsocketProcessor
Defined in:
lib/quark/commands/peek.rb,
lib/quark/commands/ping.rb,
lib/quark/commands/fetch.rb,
lib/quark/commands/observe.rb,
lib/quark/command_processor.rb,
lib/quark/commands/subscribe.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.process_command(data, userdata = nil) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/quark/command_processor.rb', line 29

def self.process_command(data, userdata=nil)
# if the data perfectly resembles a graphite-formatted metric (without a command prefix)
# then assume the command is "observe"
  if Quark::Config.get("quark.graphite_compat") and data =~ /^[\w\.\-]+ -?[\d\.]+ \d{10,13}$/
    command   = "OBSERVE"
    arguments = data.chomp
  else
    if data =~ /\}$/
      data, linedata = data.chomp.split('{',2)

      userdata = (userdata || {}).merge(Hash[linedata.gsub(/(^\{|\}$)/,'').split(/,\s*/).collect{|i|
        i.split('=',2)
      }])
    end

    command, arguments = data.strip.chomp.split(' ',2)
  end

  begin
    if methods.include?(:"process_command_#{command.downcase}")
      rv = send(:"process_command_#{command.downcase}", arguments)
      if rv.nil?
        return {
          :success   => true,
          :command   => command,
          :arguments => arguments,
          :userdata  => userdata
        }
      else
        return {
          :success   => true,
          :command   => command,
          :arguments => arguments,
          :results   => rv,
          :userdata  => userdata
        }
      end
    else
      raise Quark::InvalidCommand.new("Unknown command '#{command.upcase}'")
    end

  rescue Quark::Error => e
    return {
      :success => false,
      :userdata  => userdata,
      :error => {
        :command => data.strip.chomp,
        :class   => e.class.name,
        :message => e.message
      }
    }

  rescue Exception => e
    return {
      :success => false,
      :userdata  => userdata,
      :error => {
        :command   => data.strip.chomp,
        :class     => e.class.name,
        :message   => e.message,
        :backtrace => e.backtrace
      }
    }
  end
end

.process_command_fetch(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/quark/commands/fetch.rb', line 3

def self.process_command_fetch(data)
  raise Quark::ArgumentError.new("FETCH command requires at least 1 argument") if data.nil? or data.empty?

  rv = {}
  keys, from, to = data.split(' ')
  now = (Time.now.to_f * 1000).to_i

# negative from/to values are treated as relative number of seconds from now
  if from.to_i < 0
    from = (now - (-1 * from.to_i * 1000))
  end

  if to.to_i < 0
    to = (now - (-1 * to.to_i * 1000))
  end

# if not set, to defaults to now
  to ||= now


  raise Quark::ArgumentError.new("FETCH 'from' argument must be numeric") if not from.nil? and (Integer(from) rescue false) === false
  raise Quark::ArgumentError.new("FETCH 'to' argument must be numeric") if not to.nil? and (Integer(to) rescue false) === false

# find all matching keys from all blocks
  @redis.keys("#{@_prefix}:#{keys}:*").each do |key|
    x, name, block = key.split(':',3)
    block = block.to_i

  # skip keys whose block falls outside of the given time range (if given)
    next if block > 0 and not from.nil? and block < (from.to_i/Quark::Config.get("quark.blocksize").to_i).to_i
    next if block > 0 and not to.nil?   and block > (to.to_i/Quark::Config.get("quark.blocksize").to_i).to_i

  # get all values, sort by timestamp ascending, and append to ouput (within range, if given)
    @redis.hgetall(key).sort{|a,b|
      a[0] <=> b[0]
    }.each do |timestamp, value|
      timestamp = timestamp.to_i

      if timestamp >= from.to_i and timestamp <= to.to_i
        rv[name] ||= []
        rv[name] << [timestamp, value.to_f]
      end
    end
  end

  return rv
end

.process_command_observe(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/quark/commands/observe.rb', line 3

def self.process_command_observe(data)
  id, value, timestamp = data.chomp.split(' ')
  timestamp = timestamp.to_i

# automatically convert epoch seconds to milliseconds to provide Graphite metric compatibility
  if timestamp < 2147483647
    (timestamp = timestamp * 1000)
  end

  block = Integer(timestamp / Quark::Config.get("quark.blocksize").to_i)

  if Quark::Config.get("debug")
    puts "OBSERVE #{@_prefix}:#{id}:#{block} #{value} #{timestamp}"
  end

  @redis.hset("#{@_prefix}:#{id}:#{block}", timestamp, value)
  return ({
    :metric => id,
    :block  => block
  })
end

.process_command_peek(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/quark/commands/peek.rb', line 3

def self.process_command_peek(data)
  rv = {}
  keys = @redis.keys("#{@_prefix}:#{data}:*")
  peek_keys = []

# get unique metric names
  keys.collect{|i|
    i.split(':')[1]
  }.uniq().each do |name|
  # get most recent bucket for this metric
    peek_keys << keys.select{|i|
      i.split(':')[1] == name
    }.sort.last
  end

# now that we have the most recent bucket, pull the most recent value from that bucket
  peek_keys.each do |peek|
    data = @redis.hgetall(peek).sort{|a,b|
      a[0] <=> b[0]
    }.last

    rv[peek.split(':')[1]] = [data[0].to_i, data[1].to_f]
  end

  return rv
end

.process_command_ping(data) ⇒ Object



5
6
7
8
9
10
11
# File 'lib/quark/commands/ping.rb', line 5

def self.process_command_ping(data)
  return {
    :alive    => true,
    :hostname => Socket.gethostname(),
    :time     => (Time.now.to_f * 1000).to_i
  }
end

.process_command_subscribe(data) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
# File 'lib/quark/commands/subscribe.rb', line 3

def self.process_command_subscribe(data)
  channel, patterns = data.chomp.split(' ', 2)
  channel = channel.to_sym

  Quark::Server.register_subscription(channel, Quark::Subscription.new({
    :channel  => channel,
    :patterns => patterns.split(/\s+/)
  }))

  return Quark::Server.subscriptions[channel]
end

.setup(redis, prefix = 'quark') ⇒ Object



24
25
26
27
# File 'lib/quark/command_processor.rb', line 24

def self.setup(redis, prefix='quark')
  @redis   = redis
  @_prefix = prefix
end

Instance Method Details

#post_initObject



8
9
# File 'lib/quark/command_processor.rb', line 8

def post_init()
end

#receive_data(data) ⇒ Object



11
12
13
14
15
16
# File 'lib/quark/command_processor.rb', line 11

def receive_data(data)
  puts "DEBUG: Received '#{data.chomp.strip}'" if Quark::Config.get("debug")
  data = Quark::CommandProcessor.process_command(data)
  send_json(data)
  data
end

#send_json(data) ⇒ Object



18
19
20
21
22
# File 'lib/quark/command_processor.rb', line 18

def send_json(data)
  data = MultiJson.dump(data)+"\n"
  puts "DEBUG: Replying with #{data.chomp}" if Quark::Config.get("debug")
  send_data(data)
end

#unbindObject



95
96
# File 'lib/quark/command_processor.rb', line 95

def unbind()
end