Class: Fluent::Plugin::CouchOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::CouchOutput
- Defined in:
- lib/fluent/plugin/out_couch.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Attribute Summary collapse
-
#db ⇒ Object
readonly
for tests.
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
-
#initialize ⇒ CouchOutput
constructor
A new instance of CouchOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #update_docs(records) ⇒ Object
- #update_view_index ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ CouchOutput
Returns a new instance of CouchOutput.
36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/out_couch.rb', line 36 def initialize super require 'msgpack' require 'jsonpath' Encoding.default_internal = 'UTF-8' require 'couchrest' Encoding.default_internal = 'ASCII-8BIT' end |
Instance Attribute Details
#db ⇒ Object (readonly)
for tests
5 6 7 |
# File 'lib/fluent/plugin/out_couch.rb', line 5 def db @db end |
Instance Method Details
#configure(conf) ⇒ Object
46 47 48 49 50 51 |
# File 'lib/fluent/plugin/out_couch.rb', line 46 def configure(conf) compat_parameters_convert(conf, :buffer, :inject) super account = "#{@user}:#{@password}@" if @user && @password @db = CouchRest.database!("#{@protocol}://#{account}#{@host}:#{@port}/#{@database}") end |
#format(tag, time, record) ⇒ Object
71 72 73 74 |
# File 'lib/fluent/plugin/out_couch.rb', line 71 def format(tag, time, record) record = inject_values_to_record(tag, time, record) record.to_msgpack end |
#formatted_to_msgpack_binary ⇒ Object
76 77 78 |
# File 'lib/fluent/plugin/out_couch.rb', line 76 def formatted_to_msgpack_binary true end |
#shutdown ⇒ Object
67 68 69 |
# File 'lib/fluent/plugin/out_couch.rb', line 67 def shutdown super end |
#start ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_couch.rb', line 53 def start super @views = [] if @refresh_view_index begin @db.get("_design/#{@refresh_view_index}")['views'].each do |view_name,func| @views.push([@refresh_view_index,view_name]) end rescue log.error 'design document not found!' end end end |
#update_docs(records) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/fluent/plugin/out_couch.rb', line 98 def update_docs(records) if records.length > 0 records.each{|record| doc = nil begin doc = @db.get(record['_id']) rescue end record['_rev']=doc['_rev'] unless doc.nil? log.debug record @db.save_doc(record) } end end |
#update_view_index ⇒ Object
113 114 115 116 117 |
# File 'lib/fluent/plugin/out_couch.rb', line 113 def update_view_index() @views.each do |design,view| @db.view("#{design}/#{view}",{"limit"=>"0"}) end end |
#write(chunk) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/out_couch.rb', line 80 def write(chunk) records = [] doc_key_field, doc_key_jsonpath = (chunk.) chunk.msgpack_each {|record| id = record[doc_key_field] id = JsonPath.new(doc_key_jsonpath).first(record) if id.nil? && !doc_key_jsonpath.nil? record['_id'] = id unless id.nil? records << record } unless @update_docs @db.bulk_save(records) else update_docs(records) end update_view_index end |