Module: Wukong::Elasticsearch::HadoopInvocationOverride

Defined in:
lib/wonderdog/hadoop_invocation_override.rb

Overview

This module overrides some methods defined in Wukong::Hadoop::HadoopInvocation. The overrides will only come into play if the job's input or output paths are URIs beginning with 'es://', implying reading or writing to/from Elasticsearch indices.

Constant Summary collapse

ES_STREAMING_INPUT_FORMAT =

The input format when reading from Elasticsearch as defined in the Java code accompanying Wonderdog.

"com.infochimps.elasticsearch.ElasticSearchStreamingInputFormat"
ES_STREAMING_OUTPUT_FORMAT =

The output format when writing to Elasticsearch as defined in the Java code accompanying Wonderdog.

"com.infochimps.elasticsearch.ElasticSearchStreamingOutputFormat"

Instance Method Summary collapse

Instance Method Details

#elasticsearch_hdfs_tmp_dir(io) ⇒ String

Returns a temporary path on the HDFS in which to store log data while the Hadoop job runs.

Parameters:

Returns:

  • (String)


161
162
163
164
165
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 161

def elasticsearch_hdfs_tmp_dir io
  cleaner  = %r{[^\w/\.\-\+]+}
  io_part  = [io.index, io.mapping].compact.map { |s| s.gsub(cleaner, '') }.join('/')
  File.join(settings[:es_tmp_dir] || '/', io_part || '', Time.now.strftime("%Y-%m-%d-%H-%M-%S"))
end

#elasticsearch_jarsArray<String>

All Elasticsearch, Wonderdog, and other support jars needed to connect Hadoop streaming with the ElasticSearchStreamingInputFormat and ElasticSearchStreamingOutputFormat provided by the Wonderdog Java code.

Returns:

  • (Array<String>)


152
153
154
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 152

def elasticsearch_jars
  Dir[File.join(settings[:es_lib_dir] || '/usr/lib/hadoop/lib', '{elasticsearch,lucene,jna,wonderdog}*.jar')].compact.uniq
end

#hadoop_filesObject

:nodoc:

Munge the settings object to add necessary jars if reading/writing to/from Elasticsearch, then call super().



138
139
140
141
142
143
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 138

def hadoop_files
  if reads_from_elasticsearch? || writes_to_elasticsearch?
    settings[:jars] = elasticsearch_jars if settings[:jars].empty?
  end
  super()
end

#hadoop_jobconf_optionsArray<String>

Adds Java options required to interact with the input/output formats defined by the Java code accompanying Wonderdog.

Will not change the default Hadoop jobconf options unless it has to.

Returns:

  • (Array<String>)


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 100

def hadoop_jobconf_options
  if reads_from_elasticsearch? || writes_to_elasticsearch?
    settings[:map_speculative]    = 'false' if settings[:map_speculative].nil?
    settings[:reduce_speculative] = 'false' if settings[:reduce_speculative].nil?
  end
  
  super() + [].tap do |o|
    if (reads_from_elasticsearch? || writes_to_elasticsearch?)
      o << java_opt('es.config',                    settings[:es_config])
      o << java_opt('elasticsearch.transport',      settings[:es_transport])
      o << java_opt('elasticsearch.transport.host', settings[:es_transport_host]) if settings[:es_transport] && settings[:es_transport_host]
      o << java_opt('elasticsearch.transport.port', settings[:es_transport_port]) if settings[:es_transport] && settings[:es_transport_port]
    end
    
    if reads_from_elasticsearch?
      o << java_opt('elasticsearch.input.index',          input_index.index)
      o << java_opt('elasticsearch.input.mapping',        input_index.mapping)
      o << java_opt('elasticsearch.input.splits',         settings[:es_input_splits])
      o << java_opt('elasticsearch.input.query',          settings[:es_query])
      o << java_opt('elasticsearch.input.request_size',   settings[:es_request_size])
      o << java_opt('elasticsearch.input.scroll_timeout', settings[:es_scroll_timeout])
    end

    if writes_to_elasticsearch?
      o << java_opt('elasticsearch.output.index',         output_index.index)
      o << java_opt('elasticsearch.output.mapping',       output_index.mapping)
      o << java_opt('elasticsearch.output.index.field',   settings[:es_index_field])
      o << java_opt('elasticsearch.output.mapping.field', settings[:es_mapping_field])
      o << java_opt('elasticsearch.output.id.field',      settings[:es_id_field])
      o << java_opt('elasticsearch.output.bulk_size',     settings[:es_bulk_size])
    end
  end.flatten.compact
end

#input_formatString

The input format to use for this job.

Will override the default value to ES_STREAMING_INPUT_FORMAT if reading from Elasticsearch.

Returns:

  • (String)


38
39
40
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 38

def input_format
  reads_from_elasticsearch? ? ES_STREAMING_INPUT_FORMAT : super()
end

#input_indexIndexAndMapping

The input index to use.

Returns:



45
46
47
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 45

def input_index
  @input_index ||= IndexAndMapping.new(settings[:input])
end

#input_pathsString

The input paths to use for this job.

Will override the default value with a temporary HDFS path when reading from Elasticsearch.

Returns:

  • (String)


55
56
57
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 55

def input_paths
  reads_from_elasticsearch? ? elasticsearch_hdfs_tmp_dir(input_index) : super()
end

#output_formatString

The output format to use for this job.

Will override the default value to ES_STREAMING_OUTPUT_FORMAT if writing to Elasticsearch.

Returns:

  • (String)


72
73
74
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 72

def output_format
  writes_to_elasticsearch? ? ES_STREAMING_OUTPUT_FORMAT : super()
end

#output_indexIndexAndMapping

The output index to use.

Returns:



79
80
81
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 79

def output_index
  @output_index ||= IndexAndMapping.new(settings[:output])
end

#output_pathString

The output path to use for this job.

Will override the default value with a temporary HDFS path when writing to Elasticsearch.

Returns:

  • (String)


89
90
91
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 89

def output_path
  writes_to_elasticsearch? ? elasticsearch_hdfs_tmp_dir(output_index) : super()
end

#reads_from_elasticsearch?true, false

Does this job read from Elasticsearch?

Returns:

  • (true, false)


28
29
30
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 28

def reads_from_elasticsearch?
  IndexAndMapping.matches?(settings[:input])
end

#writes_to_elasticsearch?true, false

Does this write to Elasticsearch?

Returns:

  • (true, false)


62
63
64
# File 'lib/wonderdog/hadoop_invocation_override.rb', line 62

def writes_to_elasticsearch?
  IndexAndMapping.matches?(settings[:output])
end