Module: Wukong::Storm::StormInvocation

Included in:
StormRunner
Defined in:
lib/wukong-storm/storm_invocation.rb

Overview

This module defines several methods that generate command lines that interact with Storm using the storm program.

Constant Summary collapse

TOPOLOGY_SUBMITTER_CLASS =

The default Java Submitter class.

"com.infochimps.wukong.storm.TopologySubmitter"

Instance Method Summary collapse

Instance Method Details

#blob_input?true, false

Does this topology read from a filesystem?

Returns:

  • (true, false)


50
51
52
# File 'lib/wukong-storm/storm_invocation.rb', line 50

def blob_input?
  s3_input? || file_input?
end

#blob_spout_optionsArray<Array>

Return Java -D option key-value pairs related to the topology's spout if it is reading from a generic filesystem.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/wukong-storm/storm_invocation.rb', line 217

def blob_spout_options
  [
   ["wukong.input.type", "blob"],
  ].tap do |so|
    so << ["wukong.input.blob.marker",       settings[:offset]] if settings[:offset]
    so << case
    when settings[:from_beginning]
      ["wukong.input.blob.start",        "EARLIEST"]
    when settings[:from_end]
      ["wukong.input.blob.start",        "LATEST"]
    when settings[:offset]
      ["wukong.input.blob.start",        "EXPLICIT"]
    else
      ["wukong.input.blob.start",        "RESUME"]
    end
  end
end

#dataflow_nameString

Name of the Wukong dataflow to be launched.

Obtained from either the first non-option argument passed to wu-storm or the --run option.

Returns:

  • (String)


28
29
30
# File 'lib/wukong-storm/storm_invocation.rb', line 28

def dataflow_name
  args.first || settings[:run]
end

#dataflow_optionsArray<Array>

Return Java -D option key-value pairs related to the Wukong dataflow run by the topology.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



311
312
313
314
315
316
317
318
319
320
# File 'lib/wukong-storm/storm_invocation.rb', line 311

def dataflow_options
  [
   ["wukong.directory",         Dir.pwd],
   ["wukong.dataflow",          dataflow_name],
   ["wukong.command",           wu_bolt_commandline],
   ["wukong.parallelism",       settings[:parallelism]],
  ].tap do |opts|
    opts << ["wukong.environment", settings[:environment]] if settings[:environment]
  end
end

#file_input?true, false

Does this topology read from a local filesystem?

Returns:

  • (true, false)


64
65
66
# File 'lib/wukong-storm/storm_invocation.rb', line 64

def file_input?
  input_uri.scheme == 'file'
end

#file_spout_optionsArray<Array>

Return Java -D option key-value pairs related to the topology's spout if it is reading from a local file.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



273
274
275
276
277
278
# File 'lib/wukong-storm/storm_invocation.rb', line 273

def file_spout_options
  [
   ["wukong.input.blob.type", "file"],
   ["wukong.input.blob.path", input_uri.path],
  ]
end

#fully_qualified_class_nameObject

Returns the fully qualified name of the Java submitter class.



153
154
155
# File 'lib/wukong-storm/storm_invocation.rb', line 153

def fully_qualified_class_name
  TOPOLOGY_SUBMITTER_CLASS
end

#input_uriURI

The input URI for the topology. Will determine the Trident spout that will be used.

Returns:

  • (URI)


36
37
38
# File 'lib/wukong-storm/storm_invocation.rb', line 36

def input_uri
  @input_uri ||= URI.parse(settings[:input])
end

#kafka_input?true, false

Does this topology read from Kafka?

Returns:

  • (true, false)


43
44
45
# File 'lib/wukong-storm/storm_invocation.rb', line 43

def kafka_input?
  ! blob_input?
end

#kafka_output?true, false

Does this topology write to Kafka?

Returns:

  • (true, false)


79
80
81
# File 'lib/wukong-storm/storm_invocation.rb', line 79

def kafka_output?
  true                    # only option right now
end

#kafka_spout_optionsArray<Array>

Return Java -D option key-value pairs related to the topology's spout if it is reading from Kafka.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/wukong-storm/storm_invocation.rb', line 284

def kafka_spout_options
  [
   ["wukong.input.type",              'kafka'],
   ["wukong.input.kafka.topic",       settings[:input]],
   ["wukong.input.kafka.partitions",  settings[:kafka_partitions]],
   ["wukong.input.kafka.batch",       settings[:kafka_batch]],
   
   ["wukong.input.parallelism",       settings[:input_parallelism]],
   case
   when settings[:from_beginning]
     ["wukong.input.kafka.offset",      "-2"]
   when settings[:from_end]
     ["wukong.input.kafka.offset",      "-1"]
   when settings[:offset]
     ["wukong.input.kafka.offset",      settings[:offset]]
   else
     # Do *not* set anything and the spout will attempt to
     # resume and, finding no prior offset, will start from the
     # end, as though we'd passed "-1"
   end
  ]
end

#kafka_state_optionsArray<Array>

Return Java -D option key-value pairs related to the final state used by the topology when it is writing to Kafka.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



337
338
339
340
341
# File 'lib/wukong-storm/storm_invocation.rb', line 337

def kafka_state_options
  [
   ["wukong.output.kafka.topic", settings[:output]],
  ]
end

#native_storm_optionsArray<String>

Return Java -D options constructed from mapping the passed in "friendly" options (--timeout) to native, Storm options (topology.message.timeout.secs).

Returns:

  • (Array<String>)

    an array of each -D option



162
163
164
165
166
167
# File 'lib/wukong-storm/storm_invocation.rb', line 162

def native_storm_options
  settings.params_with(:storm).map do |option, value|
    defn = settings.definition_of(option, :description)
    [defn, settings[option.to_sym]]
  end.map { |option, value| java_option(option, value) }
end

#output_uriURI

The input URI for the topology. Will determine the Trident state that will be used.

Returns:

  • (URI)


72
73
74
# File 'lib/wukong-storm/storm_invocation.rb', line 72

def output_uri
  @output_uri ||= URI.parse(settings[:output])
end

#s3_endpointObject

The AWS endpoint used to communicate with AWS for S3 access.

Determined by the AWS region the S3 bucket was declared to be in.



256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/wukong-storm/storm_invocation.rb', line 256

def s3_endpoint
  case settings[:aws_region]
  when 'us-east-1'       then 's3.amazonaws.com'
  when 'us-west-1'       then 's3-us-west-1.amazonaws.com'
  when 'us-west-2'       then 's3-us-west-2.amazonaws.com'
  when /EU/, 'eu-west-1' then 's3-eu-west-1.amazonaws.com'
  when 'ap-southeast-1'  then 's3-ap-southeast-1.amazonaws.com'
  when 'ap-southeast-2'  then 's3-ap-southeast-2.amazonaws.com'
  when 'ap-northeast-1'  then 's3-ap-northeast-1.amazonaws.com'
  when 'sa-east-1'       then 's3-sa-east-1.amazonaws.com'
  end
end

#s3_input?true, false

Does this topology read from Amazon's S3?

Returns:

  • (true, false)


57
58
59
# File 'lib/wukong-storm/storm_invocation.rb', line 57

def s3_input?
  input_uri.scheme == 's3'
end

#s3_spout_optionsArray<Array>

Return Java -D option key-value pairs related to the topology's spout if it is reading from S3.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



239
240
241
242
243
244
245
246
247
248
# File 'lib/wukong-storm/storm_invocation.rb', line 239

def s3_spout_options
  [
   ["wukong.input.blob.type",         "s3"],
   ["wukong.input.blob.path",         input_uri.path.gsub(%r{^/},'')],
   ["wukong.input.blob.s3_bucket",    input_uri.host],
   ["wukong.input.blob.aws_key",      settings[:aws_key]],
   ["wukong.input.blob.aws_secret",   settings[:aws_secret]],
   ["wukong.input.blob.s3_endpoint",  s3_endpoint]
  ]
end

#services_optionsArray<Array>

Return Java -D option key-value pairs related to services used by the topology.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



183
184
185
186
187
188
# File 'lib/wukong-storm/storm_invocation.rb', line 183

def services_options
  [
   ["wukong.kafka.hosts",       settings[:kafka_hosts]],
   ["wukong.zookeeper.hosts",   settings[:zookeeper_hosts]],
  ]
end

#spout_optionsArray<Array>

Return Java -D option key-value pairs related to the topology's spout.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



204
205
206
207
208
209
210
211
# File 'lib/wukong-storm/storm_invocation.rb', line 204

def spout_options
  case
  when blob_input?
    blob_spout_options + (s3_input? ? s3_spout_options : file_spout_options)
  else
    kafka_spout_options
  end
end

#state_optionsArray<Array>

Return Java -D option key-value pairs related to the final state used by the topology.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



326
327
328
329
330
331
# File 'lib/wukong-storm/storm_invocation.rb', line 326

def state_options
  case
  when kafka_output?
    kafka_state_options
  end
end

#storm_kill_commandlineString

Generates a commandline that can be used to kill a running Storm topology based on the given topology name.

Returns:

  • (String)


106
107
108
# File 'lib/wukong-storm/storm_invocation.rb', line 106

def storm_kill_commandline
  "#{storm_runner} kill #{topology_name} #{storm_kill_options} > /dev/null 2>&1"
end

#storm_launch_commandlineString

Generates a commandline that can be used to launch a new Storm topology based on the given dataflow, input and output topics, and settings.

Returns:

  • (String)


92
93
94
95
96
97
98
99
100
# File 'lib/wukong-storm/storm_invocation.rb', line 92

def storm_launch_commandline
  [
   storm_runner,
   "jar #{wukong_topology_submitter_jar}",
   fully_qualified_class_name,
   native_storm_options,
   storm_topology_options,
  ].flatten.compact.join("\ \t\\\n ")
end

#storm_runnerString

Return the path to the storm program.

Will pay attention to --storm_runner and --storm_home options.

Returns:

  • (String)


125
126
127
128
129
130
131
132
133
134
# File 'lib/wukong-storm/storm_invocation.rb', line 125

def storm_runner
  explicit_runner = settings[:storm_runner]
  home_runner     = File.join(settings[:storm_home], 'bin/storm')
  default_runner  = 'storm'
  case
  when explicit_runner then explicit_runner
  when File.exist?(home_runner) then home_runner
  else default_runner
  end
end

#storm_topology_optionsArray<String>

Return Java -D options for Wukong-specific options.

Returns:

  • (Array<String>)


172
173
174
175
176
177
# File 'lib/wukong-storm/storm_invocation.rb', line 172

def storm_topology_options
  (services_options + topology_options + spout_options + dataflow_options + state_options).reject do |pair|
    key, value = pair
    value.nil? || value.to_s.strip.empty?
  end.map { |pair|  java_option(*pair) }.sort
end

#topology_nameString

Return the name of the Storm topology from the given settings and/or commandline args.

Returns:

  • (String)

    the name of the Storm topology



18
19
20
# File 'lib/wukong-storm/storm_invocation.rb', line 18

def topology_name
  settings[:name] || dataflow
end

#topology_optionsArray<Array>

Return Java -D option key-value pairs related to the overall topology.

Returns:

  • (Array<Array>)

    an Array of key-value pairs



194
195
196
197
198
# File 'lib/wukong-storm/storm_invocation.rb', line 194

def topology_options
  [
   ["wukong.topology",          topology_name],
  ]
end

#wu_bolt_commandlineString

Generates the commandline that will be used to launch wu-bolt within each bolt of the Storm topology.

Returns:

  • (String)


114
115
116
117
# File 'lib/wukong-storm/storm_invocation.rb', line 114

def wu_bolt_commandline
  return settings[:bolt_command] if settings[:bolt_command]
  [settings[:command_prefix], 'wu-bolt', dataflow_name, non_wukong_storm_params_string].compact.map(&:to_s).reject(&:empty?).join(' ')
end

#wukong_topology_submitter_jarString

Path to the Java jar file containing the submitter class.

Returns:

  • (String)

See Also:



141
142
143
# File 'lib/wukong-storm/storm_invocation.rb', line 141

def wukong_topology_submitter_jar
  File.expand_path("wukong-storm.jar", File.dirname(__FILE__))
end