Class: Fluent::CassandraSelector

Inherits:
Filter
  • Object
show all
Includes:
CassandraConnection
Defined in:
lib/fluent/plugin/filter_cassandra_selector.rb

Overview

module Fluent

class Plugin::CassandraSelector < Plugin::Filter

Instance Method Summary collapse

Methods included from CassandraConnection

#get_session

Instance Method Details

#configure(conf) ⇒ Object

shutdown

Raises:

  • (ConfigError)


36
37
38
39
40
41
# File 'lib/fluent/plugin/filter_cassandra_selector.rb', line 36

def configure(conf)
  super

  raise ConfigError, "params 'field' or 'field_json' is require least once"  if self.field_json.nil? && self.field.nil?

end

#filter(tag, time, record) ⇒ Object

configure



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/fluent/plugin/filter_cassandra_selector.rb', line 43

def filter(tag, time, record)

  dataList = nil
  cql = getCql(record)
  
  begin
    dataList = @session.execute(cql)
  rescue Exception => e
    $log.error "Cannot select Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}"
    raise e
  end

  if dataList.length == 1
    dataList.each do |row|
      record = prepareRowToHash(row, record)
    end
  elsif dataList.length > 1
    if self.field_json.nil? || self.field_json.empty?
      record["data_cassandra"] = dataList.rows.to_a
    else
      tmpListRec = []
      tmpRec = nil
      dataList.each do |row|
        tmpRec = prepareRowToHash(row,{})
        tmpListRec.push(tmpRec)
      end

      record["data_cassandra"] = tmpListRec
    end
  end
  record
end

#shutdownObject

start



31
32
33
34
# File 'lib/fluent/plugin/filter_cassandra_selector.rb', line 31

def shutdown
  super
  @session.close if @session
end

#startObject



26
27
28
29
# File 'lib/fluent/plugin/filter_cassandra_selector.rb', line 26

def start
  super
  @session ||= get_session(self.host, self.port, self.keyspace, self.connect_timeout, self.username, self.password)
end