Class: RedStorm::SimpleTopology
- Inherits:
-
Object
- Object
- RedStorm::SimpleTopology
show all
- Defined in:
- lib/red_storm/simple_topology.rb
Defined Under Namespace
Classes: BoltDefinition, ComponentDefinition, SpoutDefinition
Constant Summary
collapse
- DEFAULT_SPOUT_PARALLELISM =
1
- DEFAULT_BOLT_PARALLELISM =
1
Instance Attribute Summary collapse
-
#cluster ⇒ Object
readonly
LocalCluster reference usable in on_submit block, for example.
Class Method Summary
collapse
Instance Method Summary
collapse
Instance Attribute Details
#cluster ⇒ Object
LocalCluster reference usable in on_submit block, for example
10
11
12
|
# File 'lib/red_storm/simple_topology.rb', line 10
def cluster
@cluster
end
|
Class Method Details
.bolt(bolt_class, options = {}, &bolt_block) ⇒ Object
90
91
92
93
94
95
96
|
# File 'lib/red_storm/simple_topology.rb', line 90
def self.bolt(bolt_class, options = {}, &bolt_block)
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = BoltDefinition.new(bolt_class, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
|
98
99
100
101
102
|
# File 'lib/red_storm/simple_topology.rb', line 98
def self.configure(name = nil, &configure_block)
Configuration.topology_class = self
@topology_name = name if name
@configure_block = configure_block if block_given?
end
|
.log ⇒ Object
79
80
81
|
# File 'lib/red_storm/simple_topology.rb', line 79
def self.log
@log ||= org.apache.log4j.Logger.getLogger(self.name)
end
|
.on_submit(method_name = nil, &submit_block) ⇒ Object
104
105
106
|
# File 'lib/red_storm/simple_topology.rb', line 104
def self.on_submit(method_name = nil, &submit_block)
@submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
end
|
.spout(spout_class, options = {}, &spout_block) ⇒ Object
83
84
85
86
87
88
|
# File 'lib/red_storm/simple_topology.rb', line 83
def self.spout(spout_class, options = {}, &spout_block)
spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)
spout = SpoutDefinition.new(spout_class, spout_options[:id], spout_options[:parallelism])
spout.instance_exec(&spout_block) if block_given?
self.components << spout
end
|
Instance Method Details
#start(base_class_path, env) ⇒ Object
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/red_storm/simple_topology.rb', line 110
def start(base_class_path, env)
self.class.resolve_ids!(self.class.components)
builder = TopologyBuilder.new
self.class.spouts.each do |spout|
declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java)
declarer.addConfigurations(spout.config)
end
self.class.bolts.each do |bolt|
declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
instance_exec(env, &self.class.submit_block)
end
|