Class: Elastomer::Client::Bulk

Inherits:
Object
  • Object
show all
Defined in:
lib/elastomer/client/bulk.rb

Overview

The Bulk class provides some abstractions and helper methods for working with the ElasticSearch bulk API command. Instances of the Bulk class accumulate indexing and delete operations and then issue a single bulk API request to ElasticSearch. Those operations are then executed by the cluster.

A maximum request size can be set. As soon as the size of the request body hits this threshold, a bulk request will be made to the search cluster. This happens as operations are added.

Additionally, a maximum action count can be set. As soon as the number of actions equals the action count, a bulk request will be made.

You can also use the ‘call` method explicitly to send a bulk request immediately.

Constant Summary collapse

SPECIAL_KEYS =
%w[id type index version version_type routing parent percolator timestamp ttl retry_on_conflict]
SPECIAL_KEYS_HASH =
SPECIAL_KEYS.inject({}) { |h, k| h[k] = "_#{k}"; h }

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, params = {}) ⇒ Bulk

Create a new bulk client for handling some of the details of accumulating documents to index and then formatting them properly for the bulk API command.

client - Elastomer::Client used for HTTP requests to the server params - Parameters Hash to pass to the Client#bulk method

:request_size - the maximum request size in bytes
:action_count - the maximum number of actions


73
74
75
76
77
78
79
80
81
82
# File 'lib/elastomer/client/bulk.rb', line 73

def initialize( client, params = {} )
  @client  = client
  @params  = params

  @actions = []
  @current_request_size = 0
  @current_action_count = 0
  self.request_size = params.delete(:request_size)
  self.action_count = params.delete(:action_count)
end

Instance Attribute Details

#action_countObject

Returns the value of attribute action_count.



84
85
86
# File 'lib/elastomer/client/bulk.rb', line 84

def action_count
  @action_count
end

#clientObject (readonly)

Returns the value of attribute client.



84
85
86
# File 'lib/elastomer/client/bulk.rb', line 84

def client
  @client
end

#request_sizeObject

Returns the value of attribute request_size.



84
85
86
# File 'lib/elastomer/client/bulk.rb', line 84

def request_size
  @request_size
end

Instance Method Details

#add_to_actions(action, document = nil) ⇒ Object

Internal: Add the given ‘action` to the list of actions that will be performed by this bulk request. An optional `document` can also be given.

If the total size of the accumulated actions meets our desired request size, then a bulk API call will be performed. After the call the actions list is cleared and we’ll start accumulating actions again.

action - The bulk action (as a Hash) to perform document - Optional document for the action as a Hash or JSON encoded String

Returns the response from the bulk call if one was made or nil.



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/elastomer/client/bulk.rb', line 276

def add_to_actions( action, document = nil )
  action = MultiJson.dump action
  @actions << action
  @current_request_size += action.bytesize
  @current_action_count += 1

  unless document.nil?
    document = MultiJson.dump document unless String === document
    @actions << document
    @current_request_size += document.bytesize
  end

  if (request_size && @current_request_size >= request_size) ||
     (action_count && @current_action_count >= action_count)
    call
  else
    nil
  end
end

#callObject

Immediately execute a bulk API call with the currently accumulated actions. The accumulated actions list will be cleared after the call has been made.

If the accumulated actions list is empty then no action is taken.

Returns the response body Hash.



199
200
201
202
203
204
205
206
207
208
# File 'lib/elastomer/client/bulk.rb', line 199

def call
  return nil if @actions.empty?

  body = @actions.join("\n") + "\n"
  client.bulk(body, @params)
ensure
  @current_request_size = 0
  @current_action_count = 0
  @actions.clear
end

#convert_special_keys(params) ⇒ Object

Internal: Convert incoming Ruby symbol keys to their special underscore versions. Maintains API compaibility with the ‘Docs` API for `index`, `create`, `update` and `delete`.

:id -> :_id ‘id’ -> ‘_id’

params - Hash.

Returns a new params Hash with the special keys replaced.



253
254
255
256
257
258
259
260
261
262
# File 'lib/elastomer/client/bulk.rb', line 253

def convert_special_keys(params)
  new_params = params.dup

  SPECIAL_KEYS_HASH.each do |k1, k2|
    new_params[k2] = new_params.delete k1 if new_params.key? k1
    new_params[k2.to_sym] = new_params.delete k1.to_sym if new_params.key? k1.to_sym
  end

  new_params
end

#create(document, params) ⇒ Object

Add a create action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).

document - The document to create as a Hash or JSON encoded String params - Parameters for the create action (as a Hash) (optional)

Examples

create({"foo" => "bar"}, {:_id => 1}
create({"foo" => "bar"}, {:id => 1}
create("foo" => "bar", "_id" => 1)

Returns the response from the bulk call if one was made or nil.



154
155
156
157
# File 'lib/elastomer/client/bulk.rb', line 154

def create( document, params )
  params = prepare_params(document, params)
  add_to_actions({:create => params}, document)
end

#delete(params) ⇒ Object

Add a delete action to the list of bulk actions to be performed when the bulk API call is made.

params - Parameters for the delete action (as a Hash)

Examples

delete(:_id => 1, :_type => 'foo')

Returns the response from the bulk call if one was made or nil.



187
188
189
190
# File 'lib/elastomer/client/bulk.rb', line 187

def delete( params )
  params = prepare_params(nil, params)
  add_to_actions({:delete => params})
end

#from_document(document) ⇒ Object

Internal: Extract special keys for bulk indexing from the given ‘document`. The keys and their values are returned as a Hash from this method. If a value is `nil` then it will be ignored.

document - The document Hash

Returns extracted key/value pairs as a Hash.



231
232
233
234
235
236
237
238
239
240
241
# File 'lib/elastomer/client/bulk.rb', line 231

def from_document( document )
  opts = {}

  SPECIAL_KEYS_HASH.values.each do |field|
    key = field.to_sym
    opts[key] = document.delete field if document.key? field
    opts[key] = document.delete key   if document.key? key
  end

  opts
end

#index(document, params = {}) ⇒ Object Also known as: add

Add an index action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).

document - The document to index as a Hash or JSON encoded String params - Parameters for the index action (as a Hash) (optional)

Examples

index({"foo" => "bar"}, {:_id => 1, :_type => "foo"}
index({"foo" => "bar"}, {:id => 1, :type => "foo"}
index("foo" => "bar", "_id" => 1, "_type" => "foo")

Returns the response from the bulk call if one was made or nil.



134
135
136
137
# File 'lib/elastomer/client/bulk.rb', line 134

def index( document, params = {} )
  params = prepare_params(document, params)
  add_to_actions({:index => params}, document)
end

#prepare_params(document, params) ⇒ Object

Internal: convert special key parameters to their wire representation and apply any override document parameters.



215
216
217
218
219
220
221
222
# File 'lib/elastomer/client/bulk.rb', line 215

def prepare_params(document, params)
  params = convert_special_keys(params)
  unless document.nil? || String === document
    params = from_document(document).merge(params)
  end
  params.delete(:_id) if params[:_id].nil? || params[:_id].to_s.empty?
  params
end

#update(document, params) ⇒ Object

Add an update action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).

document - The document to update as a Hash or JSON encoded String params - Parameters for the update action (as a Hash) (optional)

Examples

update({"foo" => "bar"}, {:_id => 1}
update({"foo" => "bar"}, {:id => 1}
update("foo" => "bar", "_id" => 1)

Returns the response from the bulk call if one was made or nil.



173
174
175
176
# File 'lib/elastomer/client/bulk.rb', line 173

def update( document, params )
  params = prepare_params(document, params)
  add_to_actions({:update => params}, document)
end