Class: RedStorm::SimpleTopology

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/simple_topology.rb

Defined Under Namespace

Classes: BoltDefinition, ComponentDefinition, SpoutDefinition

Constant Summary collapse

DEFAULT_SPOUT_PARALLELISM =
1
DEFAULT_BOLT_PARALLELISM =
1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#clusterObject (readonly)

LocalCluster reference usable in on_submit block, for example



10
11
12
# File 'lib/red_storm/simple_topology.rb', line 10

def cluster
  @cluster
end

Class Method Details

.bolt(bolt_class, options = {}, &bolt_block) ⇒ Object



90
91
92
93
94
95
96
# File 'lib/red_storm/simple_topology.rb', line 90

def self.bolt(bolt_class, options = {}, &bolt_block)
  bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
  bolt = BoltDefinition.new(bolt_class, bolt_options[:id], bolt_options[:parallelism])
  raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
  bolt.instance_exec(&bolt_block)
  self.components << bolt
end

.configure(name = nil, &configure_block) ⇒ Object



98
99
100
101
102
# File 'lib/red_storm/simple_topology.rb', line 98

def self.configure(name = nil, &configure_block)
  Configuration.topology_class = self
  @topology_name = name if name
  @configure_block = configure_block if block_given?
end

.logObject



79
80
81
# File 'lib/red_storm/simple_topology.rb', line 79

def self.log
  @log ||= org.apache.log4j.Logger.getLogger(self.name)
end

.on_submit(method_name = nil, &submit_block) ⇒ Object



104
105
106
# File 'lib/red_storm/simple_topology.rb', line 104

def self.on_submit(method_name = nil, &submit_block)
  @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
end

.spout(spout_class, options = {}, &spout_block) ⇒ Object



83
84
85
86
87
88
# File 'lib/red_storm/simple_topology.rb', line 83

def self.spout(spout_class, options = {}, &spout_block)
  spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)
  spout = SpoutDefinition.new(spout_class, spout_options[:id], spout_options[:parallelism])
  spout.instance_exec(&spout_block) if block_given?
  self.components << spout
end

Instance Method Details

#start(base_class_path, env) ⇒ Object

topology proxy interface



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/red_storm/simple_topology.rb', line 110

def start(base_class_path, env)
  self.class.resolve_ids!(self.class.components)

  builder = TopologyBuilder.new
  self.class.spouts.each do |spout|
    declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java)
    declarer.addConfigurations(spout.config)
  end
  self.class.bolts.each do |bolt|
    declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java)
    declarer.addConfigurations(bolt.config)
    bolt.define_grouping(declarer)
  end

  # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
  defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}

  configurator = Configurator.new(defaults)
  configurator.instance_exec(env, &self.class.configure_block)
 
  submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
  submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
  instance_exec(env, &self.class.submit_block)
end