Class: LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/elasticsearch_java/protocol.rb

Direct Known Subclasses

TransportClient

Constant Summary collapse

CLIENT_MUTEX =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ NodeClient

Returns a new instance of NodeClient.



17
18
19
20
21
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 17

def initialize(options={})
  @logger = Cabin::Channel.get
  @client_options = DEFAULT_OPTIONS.merge(options)
  create_settings
end

Instance Attribute Details

#client_optionsObject (readonly)

Returns the value of attribute client_options.



13
14
15
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 13

def client_options
  @client_options
end

#settingsObject (readonly)

Returns the value of attribute settings.



13
14
15
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 13

def settings
  @settings
end

Class Method Details

.clear_node_clientObject

For use in test helpers



32
33
34
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 32

def self.clear_node_client
  client_mutex_synchronize { @@client = nil }
end

Instance Method Details

#build_request(action, args, source) ⇒ Object

def bulk



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 129

def build_request(action, args, source)
  case action
    when "index"
      request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
      request.id(args[:_id]) if args[:_id]
      request.routing(args[:_routing]) if args[:_routing]
      request.source(source)
    when "delete"
      request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index])
      request.id(args[:_id])
      request.routing(args[:_routing]) if args[:_routing]
    when "create"
      request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
      request.id(args[:_id]) if args[:_id]
      request.routing(args[:_routing]) if args[:_routing]
      request.source(source)
      request.opType("create")
    when "create_unless_exists"
      unless args[:_id].nil?
        request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
        request.id(args[:_id])
        request.routing(args[:_routing]) if args[:_routing]
        request.source(source)
        request.opType("create")
      else
        raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.")
      end
    when "update"
      unless args[:_id].nil?
        request = org.elasticsearch.action.update.UpdateRequest.new(args[:_index], args[:_type], args[:_id])
        request.routing(args[:_routing]) if args[:_routing]
        request.doc(source)
        if @client_options[:doc_as_upsert]
          request.docAsUpsert(true)
        else
          request.upsert(args[:_upsert]) if args[:_upsert]
        end
      else
        raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.")
      end
    else
      raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.")
  end # case action

  request.type(args[:_type]) if args[:_type]
  return request
end

#bulk(actions) ⇒ Object



116
117
118
119
120
121
122
123
124
125
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 116

def bulk(actions)
  # Actions an array of [ action, action_metadata, source ]
  prep = client.prepareBulk
  actions.each do |action, args, source|
    prep.add(build_request(action, args, source))
  end
  response = prep.execute.actionGet()

  self.normalize_bulk_response(response)
end

#clientObject



27
28
29
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 27

def client
  client_mutex_synchronize { @@client ||= make_client }
end

#client_mutex_synchronizeObject



23
24
25
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 23

def client_mutex_synchronize
  CLIENT_MUTEX.synchronize { yield }
end

#create_settingsObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 36

def create_settings
  @settings = org.elasticsearch.common.settings.Settings.settingsBuilder()
  if @client_options[:hosts]
    @settings.put("discovery.zen.ping.multicast.enabled", false)
    @settings.put("discovery.zen.ping.unicast.hosts", hosts(@client_options))
  end

  @settings.put("node.client", true)
  @settings.put("http.enabled", false)
  @settings.put("path.home", Dir.pwd)

  if @client_options[:client_settings]
    @client_options[:client_settings].each do |key, value|
      @settings.put(key, value)
    end
  end

  @settings
end

#hosts(options) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 56

def hosts(options)
  # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
  result = Array.new
  if options[:hosts].class == Array
    options[:hosts].each do |host|
      if host.to_s =~ /^.+:.+$/
        # For host in format: host:port, ignore options[:port]
        result << host
      else
        if options[:port].to_s =~ /^\d+-\d+$/
          # port ranges are 'host[port1-port2]'b
          result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" }
        else
          result << "#{host}:#{options[:port]}"
        end
      end
    end
  else
    if options[:hosts].to_s =~ /^.+:.+$/
      # For host in format: host:port, ignore options[:port]
      result << options[:hosts]
    else
      if options[:port].to_s =~ /^\d+-\d+$/
        # port ranges are 'host[port1-port2]' according to
        # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
        # However, it seems to only query the first port.
        # So generate our own list of unicast hosts to scan.
        range = Range.new(*options[:port].split("-"))
        result << range.collect { |p| "#{options[:hosts]}:#{p}" }
      else
        result << "#{options[:hosts]}:#{options[:port]}"
      end
    end
  end
  result.flatten.join(",")
end

#make_clientObject



111
112
113
114
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 111

def make_client
  nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder
  nodebuilder.settings(settings.build).node().client()
end

#normalize_bulk_response(bulk_response) ⇒ Object

Normalizes the Java response to a reasonable approximation of the HTTP datastructure for interop with the HTTP code



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 95

def normalize_bulk_response(bulk_response)
  # TODO(talevy): parse item response objects to retrieve correct 200 (OK) or 201(created) status codes		+            items = bulk_response.map {|i|
  items = bulk_response.map { |i|
    if i.is_failed
      [[i.get_op_type, {"status" => i.get_failure.get_status.get_status, "message" => i.failureMessage}]]
    else
      [[i.get_op_type, {"status" => 200, "message" => "OK"}]]
    end
  }
  if bulk_response.has_failures()
    {"errors" => true, "items" => items}
  else
    {"errors" => false}
  end
end

#template_exists?(name) ⇒ Boolean

def build_request

Returns:

  • (Boolean)


179
180
181
182
183
184
185
186
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 179

def template_exists?(name)
  return !client.admin.indices.
    prepareGetTemplates(name).
    execute().
    actionGet().
    getIndexTemplates().
    isEmpty
end

#template_install(name, template, force = false) ⇒ Object



188
189
190
191
192
193
194
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 188

def template_install(name, template, force=false)
  if template_exists?(name) && !force
    @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name)
    return
  end
  template_put(name, template)
end

#template_put(name, template) ⇒ Object



196
197
198
199
200
201
202
203
204
# File 'lib/logstash/outputs/elasticsearch_java/protocol.rb', line 196

def template_put(name, template)
  response = client.admin.indices.
    preparePutTemplate(name).
    setSource(LogStash::Json.dump(template)).
    execute().
    actionGet()

  raise "Could not index template!" unless response.isAcknowledged
end