Class: Elastictastic::BulkPersistenceStrategy

Inherits:
Object
  • Object
show all
Defined in:
lib/elastictastic/bulk_persistence_strategy.rb

Defined Under Namespace

Classes: Operation

Constant Summary collapse

DEFAULT_HANDLER =
proc { |e| raise(e) if e }

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ BulkPersistenceStrategy

Returns a new instance of BulkPersistenceStrategy.



8
9
10
11
12
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 8

def initialize(options)
  @operations = []
  @operations_by_id = {}
  @auto_flush = options.delete(:auto_flush)
end

Instance Method Details

#create(instance, params = {}, &block) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 14

def create(instance, params = {}, &block)
  block ||= DEFAULT_HANDLER
  if instance.pending_save?
    raise Elastictastic::OperationNotAllowed,
      "Can't re-save transient document with pending save in bulk operation"
  end
  instance.pending_save!
  add(
    instance.index,
    instance.id,
    { 'create' => bulk_identifier_for_instance(instance) },
    instance.elasticsearch_doc
  ) do |response|
    if response['create']['error']
      block.call(ServerError[response['create']['error']])
    else
      instance.id = response['create']['_id']
      instance.version = response['create']['_version']
      instance.persisted!
      block.call
    end
  end
end

#destroy(instance, &block) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 56

def destroy(instance, &block)
  block ||= DEFAULT_HANDLER
  instance.pending_destroy!
  add(instance.index, instance.id, :delete => bulk_identifier_for_instance(instance)) do |response|
    if response['delete']['error']
      block.call(ServerError[response['delete']['error']])
    else
      instance.transient!
      instance.version = response['delete']['_version']
      block.call
    end
  end
end

#destroy!(index, type, id, routing, parent) ⇒ Object



70
71
72
73
74
75
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 70

def destroy!(index, type, id, routing, parent)
  add(
    index, id,
    :delete => bulk_identifier(index, type, id, routing, parent, nil)
  )
end

#flushObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 77

def flush
  return if @operations.empty?

  params = {}
  params[:refresh] = true if Elastictastic.config.auto_refresh
  io = StringIO.new
  operations = @operations.reject { |operation| operation.skip }
  @operations.clear

  operations.each do |operation|
    operation.commands.each do |command|
      io.puts Elastictastic.json_encode(command)
    end
  end
  response = Elastictastic.client.bulk(io.string, params)

  response['items'].each_with_index do |op_response, i|
    operation = operations[i]
    operation.handler.call(op_response) if operation.handler
  end
  response
end

#update(instance, &block) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 38

def update(instance, &block)
  block ||= DEFAULT_HANDLER
  instance.pending_save!
  add(
    instance.index,
    instance.id,
    { 'index' => bulk_identifier_for_instance(instance) },
    instance.elasticsearch_doc
  ) do |response|
    if response['index']['error']
      block.call(ServerError[response['index']['error']])
    else
      instance.version = response['index']['_version']
      block.call
    end
  end
end