Class: Fluent::CassandraCqlOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::CassandraCqlOutput
- Defined in:
- lib/fluent/plugin/out_cassandra_driver.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #session ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 26 def configure(conf) super # perform validations raise ConfigError, "'Hosts' is required by Cassandra output (ex: localhost, 127.0.0.1, ec2-54-242-141-252.compute-1.amazonaws.com" if self.hosts.nil? raise ConfigError, "'Keyspace' is required by Cassandra output (ex: FluentdLoggers)" if self.keyspace.nil? raise ConfigError, "'ColumnFamily' is required by Cassandra output (ex: events)" if self.columnfamily.nil? raise ConfigError, "'Schema' is required by Cassandra output (ex: id,ts,payload)" if self.schema.nil? raise ConfigError, "'Schema' must contain at least two column names (ex: id,ts,payload)" if self.schema.split(',').count < 2 raise ConfigError, "'DataKeys' is required by Cassandra output (ex: tag,created_at,data)" if self.data_keys.nil? # convert schema from string to hash # NOTE: ok to use eval b/c this isn't this isn't a user # supplied string self.schema = eval(self.schema) # convert data keys from string to array self.data_keys = self.data_keys.split(',') # split hosts to array self.hosts = self.hosts.split(',') end |
#format(tag, time, record) ⇒ Object
59 60 61 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 59 def format(tag, time, record) record.to_msgpack end |
#session ⇒ Object
22 23 24 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 22 def session @session ||= get_session(self.hosts, self.keyspace) end |
#shutdown ⇒ Object
54 55 56 57 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 54 def shutdown super @session.close if @session end |
#start ⇒ Object
49 50 51 52 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 49 def start super session end |
#write(chunk) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluent/plugin/out_cassandra_driver.rb', line 63 def write(chunk) chunk.msgpack_each { |record| $log.debug "Sending a new record to Cassandra: #{record.to_json}" values = build_insert_values_string(self.schema.keys, self.data_keys, record, self.pop_data_keys) cql = "INSERT INTO #{self.columnfamily} (#{self.schema.keys.join(',')}) VALUES (#{values}) USING TTL #{self.ttl}" begin @session.execute(cql) rescue Exception => e $log.error "Cannot send record to Cassandra: #{e.message}\nTrace: #{e.backtrace.to_s}" end } end |