Class: Fluent::Plugin::SolrOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_solr.rb

Constant Summary collapse

DEFAULT_COLLECTION =
'collection1'
DEFAULT_TAG_FIELD =
'tag'
DEFAULT_TIME_FIELD =
'time'
DEFAULT_TIME_FORMAT =
'%FT%TZ'
DEFAULT_MILLISECOND =
false
DEFAULT_IGNORE_UNDEFINED_FIELDS =
false
DEFAULT_FLUSH_SIZE =
100
DEFAULT_BUFFER_TYPE =
'memory'
DEFAULT_COMMIT_WITH_FLUSH =
true
MODE_STANDALONE =
'Standalone'
MODE_SOLRCLOUD =
'SolrCloud'

Instance Method Summary collapse

Constructor Details

#initializeSolrOutput

Returns a new instance of SolrOutput.



66
67
68
# File 'lib/fluent/plugin/out_solr.rb', line 66

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



70
71
72
73
# File 'lib/fluent/plugin/out_solr.rb', line 70

def configure(conf)
  compat_parameters_convert(conf, :inject)
  super
end

#format(tag, time, record) ⇒ Object



111
112
113
# File 'lib/fluent/plugin/out_solr.rb', line 111

def format(tag, time, record)
  [time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



115
116
117
# File 'lib/fluent/plugin/out_solr.rb', line 115

def formatted_to_msgpack_binary
  true
end

#get_fieldsObject



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/fluent/plugin/out_solr.rb', line 213

def get_fields
  fields = []

  begin
    response = nil

    if @mode == MODE_STANDALONE then
      response = @solr.get 'schema/fields'
    elsif @mode == MODE_SOLRCLOUD then
      response = @solr.get 'schema/fields', collection: @collection
    end
    response['fields'].each do |field|
      fields.push(field['name'])
    end
    log.debug "Fields: #{fields}"
  rescue Exception
    log.warn 'An error occurred while getting fields'
  end

  return fields
end

#get_unique_keyObject



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/fluent/plugin/out_solr.rb', line 194

def get_unique_key
  unique_key = 'id'

  begin
    response = nil
    if @mode == MODE_STANDALONE then
      response = @solr.get 'schema/uniquekey'
    elsif @mode == MODE_SOLRCLOUD then
      response = @solr.get 'schema/uniquekey', collection: @collection
    end
    unique_key = response['uniqueKey']
    log.debug "Unique key: #{unique_key}"
  rescue Exception
    log.warn 'An error occurred while getting unique key'
  end

  return unique_key
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


119
120
121
# File 'lib/fluent/plugin/out_solr.rb', line 119

def multi_workers_ready?
  true
end

#shutdownObject



103
104
105
106
107
108
109
# File 'lib/fluent/plugin/out_solr.rb', line 103

def shutdown
  super

  unless @zk.nil? then
    @zk.close
  end
end

#startObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/out_solr.rb', line 75

def start
  super

  @mode = nil
  if ! @base_url.nil? then
    @mode = MODE_STANDALONE
  elsif ! @zk_host.nil?
    @mode = MODE_SOLRCLOUD
  end

  @solr = nil
  @zk = nil

  if @mode == MODE_STANDALONE then
    @solr = RSolr.connect :url => @base_url.end_with?('/') ? @base_url + @collection : @base_url + '/' + @collection
  elsif @mode == MODE_SOLRCLOUD then
    @zk = ZK.new(@zk_host)
    cloud_connection = RSolr::Cloud::Connection.new(@zk)
    @solr = RSolr::Client.new(cloud_connection, read_timeout: 60, open_timeout: 60)
  end

  # Get unique key field from Solr
  @unique_key = get_unique_key

  # Get fields from Solr
  @fields = get_fields
end

#update(documents) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/fluent/plugin/out_solr.rb', line 181

def update(documents)
  begin
    if @mode == MODE_STANDALONE then
      @solr.add documents, :params => {:commit => @commit_with_flush}
    elsif @mode == MODE_SOLRCLOUD then
      @solr.add documents, collection: @collection, :params => {:commit => @commit_with_flush}
    end
    log.debug "Sent #{documents.count} document(s) to Solr"
  rescue Exception
    log.warn "An error occurred while sending #{documents.count} document(s) to Solr"
  end
end

#write(chunk) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/fluent/plugin/out_solr.rb', line 123

def write(chunk)
  documents = []

  # Get fluentd tag
  tag = chunk..tag

  chunk.msgpack_each do |time, record|
    record = inject_values_to_record(tag, time, record)

    # Set unique key and value
    unless record.has_key?(@unique_key) then
      record.merge!({@unique_key => SecureRandom.uuid})
    end

    # Set Fluentd tag to Solr tag field
    unless record.has_key?(@tag_field) then
      record.merge!({@tag_field => tag})
    end

    # Set time
    tmp_time = Time.at(time).utc
    if record.has_key?(@time_field) then
      # Parsing the time field in the record by the specified format.
      begin
        tmp_time = Time.strptime(record[@time_field], @time_format).utc
      rescue Exception => e
        log.warn "An error occurred in parsing the time field: #{e.message}"
      end
    end
    if @millisecond then
      record.merge!({@time_field => '%s.%03dZ' % [tmp_time.strftime('%FT%T'), tmp_time.usec / 1000.0]})
    else
      record.merge!({@time_field => tmp_time.strftime('%FT%TZ')})
    end

    # Ignore undefined fields
    if @ignore_undefined_fields then
      record.each_key do |key|
        unless @fields.include?(key) then
          record.delete(key)
        end
      end
    end

    # Add record to documents
    documents << record

    # Update when flash size is reached
    if documents.count >= @flush_size
      update documents
      documents.clear
    end
  end

  # Update remaining documents
  update documents unless documents.empty?
end