Class: Elastomer::Client::Bulk
- Inherits:
-
Object
- Object
- Elastomer::Client::Bulk
- Defined in:
- lib/elastomer/client/bulk.rb
Overview
The Bulk class provides some abstractions and helper methods for working with the ElasticSearch bulk API command. Instances of the Bulk class accumulate indexing and delete operations and then issue a single bulk API request to ElasticSearch. Those operations are then executed by the cluster.
A maximum request size can be set. As soon as the size of the request body hits this threshold, a bulk request will be made to the search cluster. This happens as operations are added.
Additionally, a maximum action count can be set. As soon as the number of actions equals the action count, a bulk request will be made.
You can also use the ‘call` method explicitly to send a bulk request immediately.
Constant Summary collapse
- SPECIAL_KEYS =
%w[id type index version version_type routing parent timestamp ttl consistency refresh retry_on_conflict]
- SPECIAL_KEYS_HASH =
SPECIAL_KEYS.inject({}) { |h, k| h[k] = "_#{k}"; h }
Instance Attribute Summary collapse
-
#action_count ⇒ Object
Returns the value of attribute action_count.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#request_size ⇒ Object
Returns the value of attribute request_size.
Instance Method Summary collapse
-
#add_to_actions(action, document = nil) ⇒ Object
Internal: Add the given ‘action` to the list of actions that will be performed by this bulk request.
-
#call ⇒ Object
Immediately execute a bulk API call with the currently accumulated actions.
-
#convert_special_keys(params) ⇒ Object
Internal: Convert incoming Ruby symbol keys to their special underscore versions.
-
#create(document, params) ⇒ Object
Add a create action to the list of bulk actions to be performed when the bulk API call is made.
-
#delete(params) ⇒ Object
Add a delete action to the list of bulk actions to be performed when the bulk API call is made.
-
#from_document(document) ⇒ Object
Internal: Extract special keys for bulk indexing from the given ‘document`.
-
#index(document, params = {}) ⇒ Object
Add an index action to the list of bulk actions to be performed when the bulk API call is made.
-
#initialize(client, params = {}) ⇒ Bulk
constructor
Create a new bulk client for handling some of the details of accumulating documents to index and then formatting them properly for the bulk API command.
-
#prepare_params(document, params) ⇒ Object
Internal: convert special key parameters to their wire representation and apply any override document parameters.
-
#update(document, params) ⇒ Object
Add an update action to the list of bulk actions to be performed when the bulk API call is made.
Constructor Details
#initialize(client, params = {}) ⇒ Bulk
Create a new bulk client for handling some of the details of accumulating documents to index and then formatting them properly for the bulk API command.
client - Elastomer::Client used for HTTP requests to the server params - Parameters Hash to pass to the Client#bulk method
:request_size - the maximum request size in bytes
:action_count - the maximum number of actions
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/elastomer/client/bulk.rb', line 73 def initialize( client, params = {} ) @client = client @params = params @actions = [] @current_request_size = 0 @current_action_count = 0 self.request_size = params.delete(:request_size) self.action_count = params.delete(:action_count) end |
Instance Attribute Details
#action_count ⇒ Object
Returns the value of attribute action_count.
84 85 86 |
# File 'lib/elastomer/client/bulk.rb', line 84 def action_count @action_count end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
84 85 86 |
# File 'lib/elastomer/client/bulk.rb', line 84 def client @client end |
#request_size ⇒ Object
Returns the value of attribute request_size.
84 85 86 |
# File 'lib/elastomer/client/bulk.rb', line 84 def request_size @request_size end |
Instance Method Details
#add_to_actions(action, document = nil) ⇒ Object
Internal: Add the given ‘action` to the list of actions that will be performed by this bulk request. An optional `document` can also be given.
If the total size of the accumulated actions meets our desired request size, then a bulk API call will be performed. After the call the actions list is cleared and we’ll start accumulating actions again.
action - The bulk action (as a Hash) to perform document - Optional document for the action as a Hash or JSON encoded String
Returns the response from the bulk call if one was made or nil.
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/elastomer/client/bulk.rb', line 275 def add_to_actions( action, document = nil ) action = MultiJson.dump action @actions << action @current_request_size += action.bytesize @current_action_count += 1 unless document.nil? document = MultiJson.dump document unless String === document @actions << document @current_request_size += document.bytesize end if (request_size && @current_request_size >= request_size) || (action_count && @current_action_count >= action_count) call else nil end end |
#call ⇒ Object
Immediately execute a bulk API call with the currently accumulated actions. The accumulated actions list will be cleared after the call has been made.
If the accumulated actions list is empty then no action is taken.
Returns the response body Hash.
198 199 200 201 202 203 204 205 206 207 |
# File 'lib/elastomer/client/bulk.rb', line 198 def call return nil if @actions.empty? body = @actions.join("\n") + "\n" client.bulk(body, @params) ensure @current_request_size = 0 @current_action_count = 0 @actions.clear end |
#convert_special_keys(params) ⇒ Object
Internal: Convert incoming Ruby symbol keys to their special underscore versions. Maintains API compaibility with the ‘Docs` API for `index`, `create`, `update` and `delete`.
:id -> :_id ‘id’ -> ‘_id’
params - Hash.
Returns a new params Hash with the special keys replaced.
252 253 254 255 256 257 258 259 260 261 |
# File 'lib/elastomer/client/bulk.rb', line 252 def convert_special_keys(params) new_params = params.dup SPECIAL_KEYS_HASH.each do |k1, k2| new_params[k2] = new_params.delete k1 if new_params.key? k1 new_params[k2.to_sym] = new_params.delete k1.to_sym if new_params.key? k1.to_sym end new_params end |
#create(document, params) ⇒ Object
Add a create action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).
document - The document to create as a Hash or JSON encoded String params - Parameters for the create action (as a Hash) (optional)
Examples
create({"foo" => "bar"}, {:_id => 1}
create({"foo" => "bar"}, {:id => 1}
create("foo" => "bar", "_id" => 1)
Returns the response from the bulk call if one was made or nil.
153 154 155 156 |
# File 'lib/elastomer/client/bulk.rb', line 153 def create( document, params ) params = prepare_params(document, params) add_to_actions({:create => params}, document) end |
#delete(params) ⇒ Object
Add a delete action to the list of bulk actions to be performed when the bulk API call is made.
params - Parameters for the delete action (as a Hash)
Examples
delete(:_id => 1, :_type => 'foo')
Returns the response from the bulk call if one was made or nil.
186 187 188 189 |
# File 'lib/elastomer/client/bulk.rb', line 186 def delete( params ) params = prepare_params(nil, params) add_to_actions({:delete => params}) end |
#from_document(document) ⇒ Object
Internal: Extract special keys for bulk indexing from the given ‘document`. The keys and their values are returned as a Hash from this method. If a value is `nil` then it will be ignored.
document - The document Hash
Returns extracted key/value pairs as a Hash.
230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/elastomer/client/bulk.rb', line 230 def from_document( document ) opts = {} SPECIAL_KEYS_HASH.values.each do |field| key = field.to_sym opts[key] = document.delete field if document.key? field opts[key] = document.delete key if document.key? key end opts end |
#index(document, params = {}) ⇒ Object
Add an index action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).
document - The document to index as a Hash or JSON encoded String params - Parameters for the index action (as a Hash) (optional)
Examples
index({"foo" => "bar"}, {:_id => 1, :_type => "foo"}
index({"foo" => "bar"}, {:id => 1, :type => "foo"}
index("foo" => "bar", "_id" => 1, "_type" => "foo")
Returns the response from the bulk call if one was made or nil.
134 135 136 137 |
# File 'lib/elastomer/client/bulk.rb', line 134 def index( document, params = {} ) params = prepare_params(document, params) add_to_actions({:index => params}, document) end |
#prepare_params(document, params) ⇒ Object
Internal: convert special key parameters to their wire representation and apply any override document parameters.
214 215 216 217 218 219 220 221 |
# File 'lib/elastomer/client/bulk.rb', line 214 def prepare_params(document, params) params = convert_special_keys(params) if document.is_a? Hash params = from_document(document).merge(params) end params.delete(:_id) if params[:_id].nil? || params[:_id].to_s.empty? params end |
#update(document, params) ⇒ Object
Add an update action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).
document - The document to update as a Hash or JSON encoded String params - Parameters for the update action (as a Hash) (optional)
Examples
update({"foo" => "bar"}, {:_id => 1}
update({"foo" => "bar"}, {:id => 1}
update("foo" => "bar", "_id" => 1)
Returns the response from the bulk call if one was made or nil.
172 173 174 175 |
# File 'lib/elastomer/client/bulk.rb', line 172 def update( document, params ) params = prepare_params(document, params) add_to_actions({:update => params}, document) end |