Module: Wukong::Storm::StormInvocation
- Included in:
- StormRunner
- Defined in:
- lib/wukong-storm/storm_invocation.rb
Overview
This module defines several methods that generate command lines
that interact with Storm using the storm
program.
Constant Summary collapse
- TOPOLOGY_SUBMITTER_CLASS =
The default Java Submitter class.
"com.infochimps.wukong.storm.TopologySubmitter"
Instance Method Summary collapse
-
#blob_input? ⇒ true, false
Does this topology read from a filesystem?.
-
#blob_spout_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the topology's spout if it is reading from a generic filesystem. -
#dataflow_name ⇒ String
Name of the Wukong dataflow to be launched.
-
#dataflow_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the Wukong dataflow run by the topology. -
#file_input? ⇒ true, false
Does this topology read from a local filesystem?.
-
#file_spout_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the topology's spout if it is reading from a local file. -
#fully_qualified_class_name ⇒ Object
Returns the fully qualified name of the Java submitter class.
-
#input_uri ⇒ URI
The input URI for the topology.
-
#kafka_input? ⇒ true, false
Does this topology read from Kafka?.
-
#kafka_output? ⇒ true, false
Does this topology write to Kafka?.
-
#kafka_spout_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the topology's spout if it is reading from Kafka. -
#kafka_state_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the final state used by the topology when it is writing to Kafka. -
#native_storm_options ⇒ Array<String>
Return Java
-D
options constructed from mapping the passed in "friendly" options (--timeout
) to native, Storm options (topology.message.timeout.secs
). -
#output_uri ⇒ URI
The input URI for the topology.
-
#s3_endpoint ⇒ Object
The AWS endpoint used to communicate with AWS for S3 access.
-
#s3_input? ⇒ true, false
Does this topology read from Amazon's S3?.
-
#s3_spout_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the topology's spout if it is reading from S3. -
#services_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to services used by the topology. -
#spout_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the topology's spout. -
#state_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the final state used by the topology. -
#storm_kill_commandline ⇒ String
Generates a commandline that can be used to kill a running Storm topology based on the given topology name.
-
#storm_launch_commandline ⇒ String
Generates a commandline that can be used to launch a new Storm topology based on the given dataflow, input and output topics, and settings.
-
#storm_runner ⇒ String
Return the path to the
storm
program. -
#storm_topology_options ⇒ Array<String>
Return Java
-D
options for Wukong-specific options. -
#topology_name ⇒ String
Return the name of the Storm topology from the given settings and/or commandline args.
-
#topology_options ⇒ Array<Array>
Return Java
-D
option key-value pairs related to the overall topology. -
#wu_bolt_commandline ⇒ String
Generates the commandline that will be used to launch wu-bolt within each bolt of the Storm topology.
-
#wukong_topology_submitter_jar ⇒ String
Path to the Java jar file containing the submitter class.
Instance Method Details
#blob_input? ⇒ true, false
Does this topology read from a filesystem?
50 51 52 |
# File 'lib/wukong-storm/storm_invocation.rb', line 50 def blob_input? s3_input? || file_input? end |
#blob_spout_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the
topology's spout if it is reading from a generic filesystem.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/wukong-storm/storm_invocation.rb', line 217 def [ ["wukong.input.type", "blob"], ].tap do |so| so << ["wukong.input.blob.marker", settings[:offset]] if settings[:offset] so << case when settings[:from_beginning] ["wukong.input.blob.start", "EARLIEST"] when settings[:from_end] ["wukong.input.blob.start", "LATEST"] when settings[:offset] ["wukong.input.blob.start", "EXPLICIT"] else ["wukong.input.blob.start", "RESUME"] end end end |
#dataflow_name ⇒ String
Name of the Wukong dataflow to be launched.
Obtained from either the first non-option argument passed to
wu-storm
or the --run
option.
28 29 30 |
# File 'lib/wukong-storm/storm_invocation.rb', line 28 def dataflow_name args.first || settings[:run] end |
#dataflow_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the Wukong
dataflow run by the topology.
311 312 313 314 315 316 317 318 319 320 |
# File 'lib/wukong-storm/storm_invocation.rb', line 311 def [ ["wukong.directory", Dir.pwd], ["wukong.dataflow", dataflow_name], ["wukong.command", wu_bolt_commandline], ["wukong.parallelism", settings[:parallelism]], ].tap do |opts| opts << ["wukong.environment", settings[:environment]] if settings[:environment] end end |
#file_input? ⇒ true, false
Does this topology read from a local filesystem?
64 65 66 |
# File 'lib/wukong-storm/storm_invocation.rb', line 64 def file_input? input_uri.scheme == 'file' end |
#file_spout_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the
topology's spout if it is reading from a local file.
273 274 275 276 277 278 |
# File 'lib/wukong-storm/storm_invocation.rb', line 273 def [ ["wukong.input.blob.type", "file"], ["wukong.input.blob.path", input_uri.path], ] end |
#fully_qualified_class_name ⇒ Object
Returns the fully qualified name of the Java submitter class.
153 154 155 |
# File 'lib/wukong-storm/storm_invocation.rb', line 153 def fully_qualified_class_name TOPOLOGY_SUBMITTER_CLASS end |
#input_uri ⇒ URI
The input URI for the topology. Will determine the Trident spout that will be used.
36 37 38 |
# File 'lib/wukong-storm/storm_invocation.rb', line 36 def input_uri @input_uri ||= URI.parse(settings[:input]) end |
#kafka_input? ⇒ true, false
Does this topology read from Kafka?
43 44 45 |
# File 'lib/wukong-storm/storm_invocation.rb', line 43 def kafka_input? ! blob_input? end |
#kafka_output? ⇒ true, false
Does this topology write to Kafka?
79 80 81 |
# File 'lib/wukong-storm/storm_invocation.rb', line 79 def kafka_output? true # only option right now end |
#kafka_spout_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the
topology's spout if it is reading from Kafka.
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 |
# File 'lib/wukong-storm/storm_invocation.rb', line 284 def [ ["wukong.input.type", 'kafka'], ["wukong.input.kafka.topic", settings[:input]], ["wukong.input.kafka.partitions", settings[:kafka_partitions]], ["wukong.input.kafka.batch", settings[:kafka_batch]], ["wukong.input.parallelism", settings[:input_parallelism]], case when settings[:from_beginning] ["wukong.input.kafka.offset", "-2"] when settings[:from_end] ["wukong.input.kafka.offset", "-1"] when settings[:offset] ["wukong.input.kafka.offset", settings[:offset]] else # Do *not* set anything and the spout will attempt to # resume and, finding no prior offset, will start from the # end, as though we'd passed "-1" end ] end |
#kafka_state_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the final
state used by the topology when it is writing to Kafka.
337 338 339 340 341 |
# File 'lib/wukong-storm/storm_invocation.rb', line 337 def [ ["wukong.output.kafka.topic", settings[:output]], ] end |
#native_storm_options ⇒ Array<String>
Return Java -D
options constructed from mapping the passed
in "friendly" options (--timeout
) to native, Storm options
(topology.message.timeout.secs
).
162 163 164 165 166 167 |
# File 'lib/wukong-storm/storm_invocation.rb', line 162 def settings.params_with(:storm).map do |option, value| defn = settings.definition_of(option, :description) [defn, settings[option.to_sym]] end.map { |option, value| java_option(option, value) } end |
#output_uri ⇒ URI
The input URI for the topology. Will determine the Trident state that will be used.
72 73 74 |
# File 'lib/wukong-storm/storm_invocation.rb', line 72 def output_uri @output_uri ||= URI.parse(settings[:output]) end |
#s3_endpoint ⇒ Object
The AWS endpoint used to communicate with AWS for S3 access.
Determined by the AWS region the S3 bucket was declared to be in.
256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/wukong-storm/storm_invocation.rb', line 256 def s3_endpoint case settings[:aws_region] when 'us-east-1' then 's3.amazonaws.com' when 'us-west-1' then 's3-us-west-1.amazonaws.com' when 'us-west-2' then 's3-us-west-2.amazonaws.com' when /EU/, 'eu-west-1' then 's3-eu-west-1.amazonaws.com' when 'ap-southeast-1' then 's3-ap-southeast-1.amazonaws.com' when 'ap-southeast-2' then 's3-ap-southeast-2.amazonaws.com' when 'ap-northeast-1' then 's3-ap-northeast-1.amazonaws.com' when 'sa-east-1' then 's3-sa-east-1.amazonaws.com' end end |
#s3_input? ⇒ true, false
Does this topology read from Amazon's S3?
57 58 59 |
# File 'lib/wukong-storm/storm_invocation.rb', line 57 def s3_input? input_uri.scheme == 's3' end |
#s3_spout_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the
topology's spout if it is reading from S3.
239 240 241 242 243 244 245 246 247 248 |
# File 'lib/wukong-storm/storm_invocation.rb', line 239 def [ ["wukong.input.blob.type", "s3"], ["wukong.input.blob.path", input_uri.path.gsub(%r{^/},'')], ["wukong.input.blob.s3_bucket", input_uri.host], ["wukong.input.blob.aws_key", settings[:aws_key]], ["wukong.input.blob.aws_secret", settings[:aws_secret]], ["wukong.input.blob.s3_endpoint", s3_endpoint] ] end |
#services_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to services
used by the topology.
183 184 185 186 187 188 |
# File 'lib/wukong-storm/storm_invocation.rb', line 183 def [ ["wukong.kafka.hosts", settings[:kafka_hosts]], ["wukong.zookeeper.hosts", settings[:zookeeper_hosts]], ] end |
#spout_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the
topology's spout.
204 205 206 207 208 209 210 211 |
# File 'lib/wukong-storm/storm_invocation.rb', line 204 def case when blob_input? + (s3_input? ? : ) else end end |
#state_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the final
state used by the topology.
326 327 328 329 330 331 |
# File 'lib/wukong-storm/storm_invocation.rb', line 326 def case when kafka_output? end end |
#storm_kill_commandline ⇒ String
Generates a commandline that can be used to kill a running Storm topology based on the given topology name.
106 107 108 |
# File 'lib/wukong-storm/storm_invocation.rb', line 106 def storm_kill_commandline "#{storm_runner} kill #{topology_name} #{} > /dev/null 2>&1" end |
#storm_launch_commandline ⇒ String
Generates a commandline that can be used to launch a new Storm topology based on the given dataflow, input and output topics, and settings.
92 93 94 95 96 97 98 99 100 |
# File 'lib/wukong-storm/storm_invocation.rb', line 92 def storm_launch_commandline [ storm_runner, "jar #{wukong_topology_submitter_jar}", fully_qualified_class_name, , , ].flatten.compact.join("\ \t\\\n ") end |
#storm_runner ⇒ String
Return the path to the storm
program.
Will pay attention to --storm_runner
and --storm_home
options.
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/wukong-storm/storm_invocation.rb', line 125 def storm_runner explicit_runner = settings[:storm_runner] home_runner = File.join(settings[:storm_home], 'bin/storm') default_runner = 'storm' case when explicit_runner then explicit_runner when File.exist?(home_runner) then home_runner else default_runner end end |
#storm_topology_options ⇒ Array<String>
Return Java -D
options for Wukong-specific options.
172 173 174 175 176 177 |
# File 'lib/wukong-storm/storm_invocation.rb', line 172 def ( + + + + ).reject do |pair| key, value = pair value.nil? || value.to_s.strip.empty? end.map { |pair| java_option(*pair) }.sort end |
#topology_name ⇒ String
Return the name of the Storm topology from the given settings and/or commandline args.
18 19 20 |
# File 'lib/wukong-storm/storm_invocation.rb', line 18 def topology_name settings[:name] || dataflow end |
#topology_options ⇒ Array<Array>
Return Java -D
option key-value pairs related to the overall
topology.
194 195 196 197 198 |
# File 'lib/wukong-storm/storm_invocation.rb', line 194 def [ ["wukong.topology", topology_name], ] end |
#wu_bolt_commandline ⇒ String
Generates the commandline that will be used to launch wu-bolt within each bolt of the Storm topology.
114 115 116 117 |
# File 'lib/wukong-storm/storm_invocation.rb', line 114 def wu_bolt_commandline return settings[:bolt_command] if settings[:bolt_command] [settings[:command_prefix], 'wu-bolt', dataflow_name, non_wukong_storm_params_string].compact.map(&:to_s).reject(&:empty?).join(' ') end |
#wukong_topology_submitter_jar ⇒ String
Path to the Java jar file containing the submitter class.
141 142 143 |
# File 'lib/wukong-storm/storm_invocation.rb', line 141 def wukong_topology_submitter_jar File.("wukong-storm.jar", File.dirname(__FILE__)) end |