Class: RedStorm::SimpleBolt

Inherits:
Object
  • Object
show all
Defined in:
lib/red_storm/simple_bolt.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#collectorObject (readonly)

Returns the value of attribute collector.



6
7
8
# File 'lib/red_storm/simple_bolt.rb', line 6

def collector
  @collector
end

#configObject (readonly)

Returns the value of attribute config.



6
7
8
# File 'lib/red_storm/simple_bolt.rb', line 6

def config
  @config
end

#contextObject (readonly)

Returns the value of attribute context.



6
7
8
# File 'lib/red_storm/simple_bolt.rb', line 6

def context
  @context
end

Class Method Details

.configure(&configure_block) ⇒ Object



18
19
20
# File 'lib/red_storm/simple_bolt.rb', line 18

def self.configure(&configure_block)
  @configure_block = block_given? ? configure_block : lambda {}
end

.logObject

DSL class methods



10
11
12
# File 'lib/red_storm/simple_bolt.rb', line 10

def self.log
  @log ||= Logger.getLogger(self.name)
end

.on_close(method_name = nil, &close_block) ⇒ Object



34
35
36
# File 'lib/red_storm/simple_bolt.rb', line 34

def self.on_close(method_name = nil, &close_block)
  @close_block = block_given? ? close_block : lambda {self.send(method_name || :on_close)}
end

.on_init(method_name = nil, &on_init_block) ⇒ Object



30
31
32
# File 'lib/red_storm/simple_bolt.rb', line 30

def self.on_init(method_name = nil, &on_init_block)
  @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)}
end

.on_receive(*args, &on_receive_block) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/red_storm/simple_bolt.rb', line 22

def self.on_receive(*args, &on_receive_block)
  options = args.last.is_a?(Hash) ? args.pop : {}
  method_name = args.first

  self.receive_options.merge!(options)
  @on_receive_block = block_given? ? on_receive_block : lambda {|tuple| self.send(method_name || :on_receive, tuple)}
end

.output_fields(*fields) ⇒ Object



14
15
16
# File 'lib/red_storm/simple_bolt.rb', line 14

def self.output_fields(*fields)
  @fields = fields.map(&:to_s)
end

Instance Method Details

#ack(tuple) ⇒ Object



52
53
54
# File 'lib/red_storm/simple_bolt.rb', line 52

def ack(tuple)
  @collector.ack(tuple)
end

#anchored_emit(tuple, *values) ⇒ Object



48
49
50
# File 'lib/red_storm/simple_bolt.rb', line 48

def anchored_emit(tuple, *values)
  @collector.emit(tuple, Values.new(*values)) 
end

#cleanupObject



73
74
75
# File 'lib/red_storm/simple_bolt.rb', line 73

def cleanup
  instance_exec(&self.class.close_block)
end

#declare_output_fields(declarer) ⇒ Object



77
78
79
# File 'lib/red_storm/simple_bolt.rb', line 77

def declare_output_fields(declarer)
  declarer.declare(Fields.new(self.class.fields))
end

#execute(tuple) ⇒ Object

Bolt proxy interface



58
59
60
61
62
63
64
# File 'lib/red_storm/simple_bolt.rb', line 58

def execute(tuple)
  if (output = instance_exec(tuple, &self.class.on_receive_block)) && self.class.emit?
    values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output
    values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)}
    @collector.ack(tuple) if self.class.ack?
  end
end

#get_component_configurationObject



81
82
83
84
85
# File 'lib/red_storm/simple_bolt.rb', line 81

def get_component_configuration
  configurator = Configurator.new
  configurator.instance_exec(&self.class.configure_block)
  configurator.config
end

#logObject

DSL instance methods



40
41
42
# File 'lib/red_storm/simple_bolt.rb', line 40

def log
  self.class.log
end

#prepare(config, context, collector) ⇒ Object



66
67
68
69
70
71
# File 'lib/red_storm/simple_bolt.rb', line 66

def prepare(config, context, collector)
  @collector = collector
  @context = context
  @config = config
  instance_exec(&self.class.on_init_block)
end

#unanchored_emit(*values) ⇒ Object



44
45
46
# File 'lib/red_storm/simple_bolt.rb', line 44

def unanchored_emit(*values)
  @collector.emit(Values.new(*values)) 
end