Class: ElasticSearch::BulkStream
- Inherits:
-
Object
- Object
- ElasticSearch::BulkStream
- Defined in:
- lib/jruby-elasticsearch/bulkstream.rb
Instance Method Summary collapse
- #flush ⇒ Object
- #index(*args) ⇒ Object
-
#initialize(client, queue_size = 10, flush_interval = 1) ⇒ BulkStream
constructor
Create a new bulk stream.
- #stop ⇒ Object
Constructor Details
#initialize(client, queue_size = 10, flush_interval = 1) ⇒ BulkStream
Create a new bulk stream. This allows you to send index and other bulk events asynchronously and use the bulk api in ElasticSearch in a streaming way.
The ‘queue_size’ is the maximum size of unflushed requests. If the queue reaches this size, new requests will block until there is room to move.
12 13 14 15 16 17 18 |
# File 'lib/jruby-elasticsearch/bulkstream.rb', line 12 def initialize(client, queue_size=10, flush_interval=1) @bulkthread = Thread.new { run } @client = client @queue_size = queue_size @queue = SizedQueue.new(@queue_size) @flush_interval = flush_interval end |
Instance Method Details
#flush ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/jruby-elasticsearch/bulkstream.rb', line 61 def flush bulk = @client.bulk flush_one = proc do # block if no data. method, *args = @queue.pop return if args.nil? # probably we are now stopping. bulk.send(method, *args) end flush_one.call 1.upto([@queue.size, @queue_size - 1].min) do flush_one.call end # Block until this finishes bulk.execute! end |
#index(*args) ⇒ Object
22 23 24 25 26 27 |
# File 'lib/jruby-elasticsearch/bulkstream.rb', line 22 def index(*args) # TODO(sissel): It's not clear I need to queue this up, I could just # call BulkRequest#index() and when we have 10 or whatnot, flush, but # Queue gives us a nice blocking mechanism anyway. @queue << [:index, *args] end |
#stop ⇒ Object
53 54 55 56 |
# File 'lib/jruby-elasticsearch/bulkstream.rb', line 53 def stop @queue << nil @stop = true end |