Class: Embulk::Input::Elasticsearch::Connection
- Inherits:
-
Object
- Object
- Embulk::Input::Elasticsearch::Connection
- Defined in:
- lib/embulk/input/elasticsearch/connection.rb
Instance Method Summary collapse
- #create_client(nodes:, reload_connections:, reload_on_failure:, retry_on_failure:, request_timeout:) ⇒ Object
-
#initialize(task) ⇒ Connection
constructor
A new instance of Connection.
- #search_with_query(query) ⇒ Object
Constructor Details
#initialize(task) ⇒ Connection
Returns a new instance of Connection.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/embulk/input/elasticsearch/connection.rb', line 8 def initialize(task) @scroll = task['scroll'] @index = task['index'] @index_type = task['index_type'] @size = task['per_size'] @fields = task['fields'] @sort = task['sort'] @limit_size = task['limit_size'] @retry_on_failure = task['retry_on_failure'] @client = create_client( nodes: task['nodes'], reload_connections: task['reload_connections'], reload_on_failure: task['reload_on_failure'], retry_on_failure: task['retry_on_failure'], request_timeout: task['request_timeout'] ) end |
Instance Method Details
#create_client(nodes:, reload_connections:, reload_on_failure:, retry_on_failure:, request_timeout:) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/embulk/input/elasticsearch/connection.rb', line 26 def create_client(nodes: ,reload_connections: ,reload_on_failure: ,retry_on_failure: ,request_timeout:) transport = ::Elasticsearch::Transport::Transport::HTTP::Faraday.new( { hosts: nodes.map{ |node| Hash[node.map{ |k, v| [k.to_sym, v] }] }, options: { reload_connections: reload_connections, reload_on_failure: reload_on_failure, retry_on_failure: retry_on_failure, transport_options: { request: { timeout: request_timeout } } } } ) ::Elasticsearch::Client.new transport: transport end |
#search_with_query(query) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/embulk/input/elasticsearch/connection.rb', line 44 def search_with_query(query) search_option = get_search_option(query) Embulk.logger.info("#{search_option}") r = search_with_retry { @client.search(search_option) } i = 0 Converter.get_sources(r, @fields).each do |result| yield(result) if block_given? return if @limit_size == (i += 1) end while r = (search_with_retry { @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) }) and (not r['hits']['hits'].empty?) do Converter.get_sources(r, @fields).each do |result| yield(result) if block_given? return if @limit_size == (i += 1) end end end |