Class: Fluent::CassandraCqlOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::CassandraCqlOutput
- Defined in:
- lib/fluent/plugin/out_cassandra_driver.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #session ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 24 def configure(conf) super # perform validations raise ConfigError, "'Hosts' is required by Cassandra output (ex: localhost, 127.0.0.1, ec2-54-242-141-252.compute-1.amazonaws.com" if self.hosts.nil? raise ConfigError, "'Keyspace' is required by Cassandra output (ex: FluentdLoggers)" if self.keyspace.nil? raise ConfigError, "'ColumnFamily' is required by Cassandra output (ex: events)" if self.columnfamily.nil? raise ConfigError, "'Schema' is required by Cassandra output (ex: id,ts,payload)" if self.schema.nil? raise ConfigError, "'Schema' must contain at least two column names (ex: id,ts,payload)" if self.schema.split(',').count < 2 raise ConfigError, "'DataKeys' is required by Cassandra output (ex: tag,created_at,data)" if self.data_keys.nil? # convert schema from string to hash # NOTE: ok to use eval b/c this isn't this isn't a user # supplied string self.schema = eval(self.schema) # convert data keys from string to array self.data_keys = self.data_keys.split(',') # split hosts to array self.hosts = self.hosts.split(',') end |
#format(tag, time, record) ⇒ Object
57 58 59 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 57 def format(tag, time, record) record.to_msgpack end |
#session ⇒ Object
20 21 22 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 20 def session @session ||= get_session(self.hosts, self.keyspace) end |
#shutdown ⇒ Object
52 53 54 55 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 52 def shutdown super @session.close if @session end |
#start ⇒ Object
47 48 49 50 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 47 def start super session end |
#write(chunk) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 61 def write(chunk) chunk.msgpack_each { |record| $log.debug "Sending a new record to Cassandra: #{record.to_json}" values = build_insert_values(self.schema.keys, self.data_keys, record, self.pop_data_keys) cql = "INSERT INTO #{self.columnfamily} (#{self.schema.keys.join(',')}) VALUES (#{values.length.times.map { '?' }.join(',')}) USING TTL #{self.ttl}" $log.debug "CQL query: #{cql}" $log.debug "Running with values: #{values.to_json}" begin @session.execute(cql, arguments: values) rescue Exception => e $log.error "Cannot send record to Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}" raise e end } end |