Class: Fluent::Plugin::CouchOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_couch.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeCouchOutput

Returns a new instance of CouchOutput.



36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/out_couch.rb', line 36

def initialize
    super

    require 'msgpack'
    require 'jsonpath'
    Encoding.default_internal = 'UTF-8'
    require 'couchrest'
    Encoding.default_internal = 'ASCII-8BIT'
end

Instance Attribute Details

#dbObject (readonly)

for tests



5
6
7
# File 'lib/fluent/plugin/out_couch.rb', line 5

def db
  @db
end

Instance Method Details

#configure(conf) ⇒ Object



46
47
48
49
50
51
# File 'lib/fluent/plugin/out_couch.rb', line 46

def configure(conf)
    compat_parameters_convert(conf, :buffer, :inject)
    super
     = "#{@user}:#{@password}@" if @user && @password
    @db = CouchRest.database!("#{@protocol}://#{}#{@host}:#{@port}/#{@database}")
end

#format(tag, time, record) ⇒ Object



71
72
73
74
# File 'lib/fluent/plugin/out_couch.rb', line 71

def format(tag, time, record)
    record = inject_values_to_record(tag, time, record)
    record.to_msgpack
end

#formatted_to_msgpack_binaryObject



76
77
78
# File 'lib/fluent/plugin/out_couch.rb', line 76

def formatted_to_msgpack_binary
    true
end

#shutdownObject



67
68
69
# File 'lib/fluent/plugin/out_couch.rb', line 67

def shutdown
    super
end

#startObject



53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_couch.rb', line 53

def start
    super
    @views = []
    if @refresh_view_index
        begin
            @db.get("_design/#{@refresh_view_index}")['views'].each do |view_name,func|
                @views.push([@refresh_view_index,view_name])
            end
            rescue
            log.error 'design document not found!'
        end
    end
end

#update_docs(records) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/out_couch.rb', line 98

def update_docs(records)
    if records.length > 0
        records.each{|record|
            doc = nil
            begin
                doc = @db.get(record['_id'])
                rescue
            end
            record['_rev']=doc['_rev'] unless doc.nil?
            log.debug record
            @db.save_doc(record)
        }
    end
end

#update_view_indexObject



113
114
115
116
117
# File 'lib/fluent/plugin/out_couch.rb', line 113

def update_view_index()
    @views.each do |design,view|
        @db.view("#{design}/#{view}",{"limit"=>"0"})
    end
end

#write(chunk) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_couch.rb', line 80

def write(chunk)
    records = []
    doc_key_field, doc_key_jsonpath = expand_placeholders(chunk.)
    chunk.msgpack_each {|record|

        id = record[doc_key_field]
        id = JsonPath.new(doc_key_jsonpath).first(record) if id.nil? && !doc_key_jsonpath.nil?
        record['_id'] = id unless id.nil?
        records << record
    }
    unless @update_docs
        @db.bulk_save(records)
        else
        update_docs(records)
    end
    update_view_index
end