Class: RedStorm::DSL::DRPCTopology

Inherits:
Topology
  • Object
show all
Defined in:
lib/red_storm/dsl/drpc_topology.rb

Constant Summary

Constants inherited from Topology

Topology::DEFAULT_BOLT_PARALLELISM, Topology::DEFAULT_SPOUT_PARALLELISM

Instance Attribute Summary

Attributes inherited from Topology

#cluster

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Topology

bolt, build_topology, configure, log, on_submit

Class Method Details

.input_bolt(bolt_class, *args, &bolt_block) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/red_storm/dsl/drpc_topology.rb', line 75

def self.input_bolt(bolt_class, *args, &bolt_block)
  set_topology_class!
  options = args.last.is_a?(Hash) ? args.pop : {}
  contructor_args = !args.empty? ? args.pop : []
  bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)

  bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
  raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
  bolt.instance_exec(&bolt_block)
  self.components << bolt
end

.spoutObject



44
45
46
# File 'lib/red_storm/dsl/drpc_topology.rb', line 44

def self.spout
  raise TopologyDefinitionError, "DRPC spout is already defined"
end

Instance Method Details

#start(base_class_path, env) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/red_storm/dsl/drpc_topology.rb', line 48

def start(base_class_path, env)
  builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name)

  self.class.bolts.each do |bolt|
    declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java)
    declarer.addConfigurations(bolt.config)
    bolt.define_grouping(declarer)
  end

  # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
  defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}

  configurator = Configurator.new(defaults)
  configurator.instance_exec(env, &self.class.configure_block)

  drpc = nil
  if env == :local
    drpc = LocalDRPC.new
    submitter = @cluster = LocalCluster.new
    submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc))
  else
    submitter = StormSubmitter
    submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology)
  end
  instance_exec(env, drpc, &self.class.submit_block)
end