Module: Wukong::Elasticsearch::HadoopInvocationOverride
- Defined in:
- lib/wonderdog/hadoop_invocation_override.rb
Overview
This module overrides some methods defined in Wukong::Hadoop::HadoopInvocation. The overrides will only come into play if the job's input or output paths are URIs beginning with 'es://', implying reading or writing to/from Elasticsearch indices.
Constant Summary collapse
- ES_STREAMING_INPUT_FORMAT =
The input format when reading from Elasticsearch as defined in the Java code accompanying Wonderdog.
"com.infochimps.elasticsearch.ElasticSearchStreamingInputFormat"
- ES_STREAMING_OUTPUT_FORMAT =
The output format when writing to Elasticsearch as defined in the Java code accompanying Wonderdog.
"com.infochimps.elasticsearch.ElasticSearchStreamingOutputFormat"
Instance Method Summary collapse
-
#elasticsearch_hdfs_tmp_dir(io) ⇒ String
Returns a temporary path on the HDFS in which to store log data while the Hadoop job runs.
-
#elasticsearch_jars ⇒ Array<String>
All Elasticsearch, Wonderdog, and other support jars needed to connect Hadoop streaming with the ElasticSearchStreamingInputFormat and ElasticSearchStreamingOutputFormat provided by the Wonderdog Java code.
-
#hadoop_files ⇒ Object
:nodoc:.
-
#hadoop_jobconf_options ⇒ Array<String>
Adds Java options required to interact with the input/output formats defined by the Java code accompanying Wonderdog.
-
#input_format ⇒ String
The input format to use for this job.
-
#input_index ⇒ IndexAndMapping
The input index to use.
-
#input_paths ⇒ String
The input paths to use for this job.
-
#output_format ⇒ String
The output format to use for this job.
-
#output_index ⇒ IndexAndMapping
The output index to use.
-
#output_path ⇒ String
The output path to use for this job.
-
#reads_from_elasticsearch? ⇒ true, false
Does this job read from Elasticsearch?.
-
#writes_to_elasticsearch? ⇒ true, false
Does this write to Elasticsearch?.
Instance Method Details
#elasticsearch_hdfs_tmp_dir(io) ⇒ String
Returns a temporary path on the HDFS in which to store log data while the Hadoop job runs.
161 162 163 164 165 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 161 def elasticsearch_hdfs_tmp_dir io cleaner = %r{[^\w/\.\-\+]+} io_part = [io.index, io.mapping].compact.map { |s| s.gsub(cleaner, '') }.join('/') File.join(settings[:es_tmp_dir] || '/', io_part || '', Time.now.strftime("%Y-%m-%d-%H-%M-%S")) end |
#elasticsearch_jars ⇒ Array<String>
All Elasticsearch, Wonderdog, and other support jars needed to connect Hadoop streaming with the ElasticSearchStreamingInputFormat and ElasticSearchStreamingOutputFormat provided by the Wonderdog Java code.
152 153 154 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 152 def elasticsearch_jars Dir[File.join(settings[:es_lib_dir] || '/usr/lib/hadoop/lib', '{elasticsearch,lucene,jna,wonderdog}*.jar')].compact.uniq end |
#hadoop_files ⇒ Object
:nodoc:
Munge the settings object to add necessary jars if reading/writing to/from Elasticsearch, then call super().
138 139 140 141 142 143 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 138 def hadoop_files if reads_from_elasticsearch? || writes_to_elasticsearch? settings[:jars] = elasticsearch_jars if settings[:jars].empty? end super() end |
#hadoop_jobconf_options ⇒ Array<String>
Adds Java options required to interact with the input/output formats defined by the Java code accompanying Wonderdog.
Will not change the default Hadoop jobconf options unless it has to.
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 100 def if reads_from_elasticsearch? || writes_to_elasticsearch? settings[:map_speculative] = 'false' if settings[:map_speculative].nil? settings[:reduce_speculative] = 'false' if settings[:reduce_speculative].nil? end super() + [].tap do |o| if (reads_from_elasticsearch? || writes_to_elasticsearch?) o << java_opt('es.config', settings[:es_config]) o << java_opt('elasticsearch.transport', settings[:es_transport]) o << java_opt('elasticsearch.transport.host', settings[:es_transport_host]) if settings[:es_transport] && settings[:es_transport_host] o << java_opt('elasticsearch.transport.port', settings[:es_transport_port]) if settings[:es_transport] && settings[:es_transport_port] end if reads_from_elasticsearch? o << java_opt('elasticsearch.input.index', input_index.index) o << java_opt('elasticsearch.input.mapping', input_index.mapping) o << java_opt('elasticsearch.input.splits', settings[:es_input_splits]) o << java_opt('elasticsearch.input.query', settings[:es_query]) o << java_opt('elasticsearch.input.request_size', settings[:es_request_size]) o << java_opt('elasticsearch.input.scroll_timeout', settings[:es_scroll_timeout]) end if writes_to_elasticsearch? o << java_opt('elasticsearch.output.index', output_index.index) o << java_opt('elasticsearch.output.mapping', output_index.mapping) o << java_opt('elasticsearch.output.index.field', settings[:es_index_field]) o << java_opt('elasticsearch.output.mapping.field', settings[:es_mapping_field]) o << java_opt('elasticsearch.output.id.field', settings[:es_id_field]) o << java_opt('elasticsearch.output.bulk_size', settings[:es_bulk_size]) end end.flatten.compact end |
#input_format ⇒ String
The input format to use for this job.
Will override the default value to ES_STREAMING_INPUT_FORMAT if reading from Elasticsearch.
38 39 40 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 38 def input_format reads_from_elasticsearch? ? ES_STREAMING_INPUT_FORMAT : super() end |
#input_index ⇒ IndexAndMapping
The input index to use.
45 46 47 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 45 def input_index @input_index ||= IndexAndMapping.new(settings[:input]) end |
#input_paths ⇒ String
The input paths to use for this job.
Will override the default value with a temporary HDFS path when reading from Elasticsearch.
55 56 57 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 55 def input_paths reads_from_elasticsearch? ? elasticsearch_hdfs_tmp_dir(input_index) : super() end |
#output_format ⇒ String
The output format to use for this job.
Will override the default value to ES_STREAMING_OUTPUT_FORMAT if writing to Elasticsearch.
72 73 74 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 72 def output_format writes_to_elasticsearch? ? ES_STREAMING_OUTPUT_FORMAT : super() end |
#output_index ⇒ IndexAndMapping
The output index to use.
79 80 81 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 79 def output_index @output_index ||= IndexAndMapping.new(settings[:output]) end |
#output_path ⇒ String
The output path to use for this job.
Will override the default value with a temporary HDFS path when writing to Elasticsearch.
89 90 91 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 89 def output_path writes_to_elasticsearch? ? elasticsearch_hdfs_tmp_dir(output_index) : super() end |
#reads_from_elasticsearch? ⇒ true, false
Does this job read from Elasticsearch?
28 29 30 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 28 def reads_from_elasticsearch? IndexAndMapping.matches?(settings[:input]) end |
#writes_to_elasticsearch? ⇒ true, false
Does this write to Elasticsearch?
62 63 64 |
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 62 def writes_to_elasticsearch? IndexAndMapping.matches?(settings[:output]) end |