Class: Fluent::MysqlReplicatorElasticsearchOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::MysqlReplicatorElasticsearchOutput
- Defined in:
- lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
Constant Summary collapse
- DEFAULT_TAG_FORMAT =
/(?<index_name>[^\.]+)\.(?<type_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$/
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ MysqlReplicatorElasticsearchOutput
constructor
A new instance of MysqlReplicatorElasticsearchOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ MysqlReplicatorElasticsearchOutput
Returns a new instance of MysqlReplicatorElasticsearchOutput.
13 14 15 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 13 def initialize super end |
Instance Method Details
#configure(conf) ⇒ Object
17 18 19 20 21 22 23 24 25 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 17 def configure(conf) super if @tag_format.nil? || @tag_format == DEFAULT_TAG_FORMAT @tag_format = DEFAULT_TAG_FORMAT else @tag_format = Regexp.new(conf['tag_format']) end end |
#format(tag, time, record) ⇒ Object
31 32 33 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 31 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
35 36 37 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 35 def shutdown super end |
#start ⇒ Object
27 28 29 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 27 def start super end |
#write(chunk) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb', line 39 def write(chunk) = [] chunk.msgpack_each do |tag, time, record| tag_parts = tag.match(@tag_format) target_index = tag_parts['index_name'] target_type = tag_parts['type_name'] id_key = tag_parts['primary_key'] if tag_parts['event'] == 'delete' = { "delete" => {"_index" => target_index, "_type" => target_type, "_id" => record[id_key]} } << Yajl::Encoder.encode() else = { "index" => {"_index" => target_index, "_type" => target_type} } if id_key && record[id_key] ['index']['_id'] = record[id_key] end << Yajl::Encoder.encode() << Yajl::Encoder.encode(record) end end << "" http = Net::HTTP.new(@host, @port.to_i) request = Net::HTTP::Post.new('/_bulk', {'content-type' => 'application/json; charset=utf-8'}) request.body = .join("\n") http.request(request).value end |