Class: Fluent::CassandraCqlOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_cassandra_driver.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (ConfigError)


24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 24

def configure(conf)
  super

  # perform validations
  raise ConfigError, "'Hosts' is required by Cassandra output (ex: localhost, 127.0.0.1, ec2-54-242-141-252.compute-1.amazonaws.com" if self.hosts.nil?
  raise ConfigError, "'Keyspace' is required by Cassandra output (ex: FluentdLoggers)" if self.keyspace.nil?
  raise ConfigError, "'ColumnFamily' is required by Cassandra output (ex: events)" if self.columnfamily.nil?
  raise ConfigError, "'Schema' is required by Cassandra output (ex: id,ts,payload)" if self.schema.nil?
  raise ConfigError, "'Schema' must contain at least two column names (ex: id,ts,payload)" if self.schema.split(',').count < 2
  raise ConfigError, "'DataKeys' is required by Cassandra output (ex: tag,created_at,data)" if self.data_keys.nil?

  # convert schema from string to hash
  # NOTE: ok to use eval b/c this isn't this isn't a user
  #       supplied string
  self.schema = eval(self.schema)

  # convert data keys from string to array
  self.data_keys = self.data_keys.split(',')

  # split hosts to array
  self.hosts = self.hosts.split(',')
end

#format(tag, time, record) ⇒ Object



57
58
59
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 57

def format(tag, time, record)
  record.to_msgpack
end

#sessionObject



20
21
22
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 20

def session
  @session ||= get_session(self.hosts, self.keyspace)
end

#shutdownObject



52
53
54
55
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 52

def shutdown
  super
  @session.close if @session
end

#startObject



47
48
49
50
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 47

def start
  super
  session
end

#write(chunk) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 61

def write(chunk)
  chunk.msgpack_each { |record|
    $log.debug "Sending a new record to Cassandra: #{record.to_json}"

    values = build_insert_values(self.schema.keys, self.data_keys, record, self.pop_data_keys)

    cql = "INSERT INTO #{self.columnfamily} (#{self.schema.keys.join(',')}) VALUES (#{values.length.times.map { '?' }.join(',')}) USING TTL #{self.ttl}"

    $log.debug "CQL query: #{cql}"
    $log.debug "Running with values: #{values.to_json}"

    begin
      @session.execute(cql, arguments: values)
    rescue Exception => e
      $log.error "Cannot send record to Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}"

      raise e
    end
  }
end