Module: Dummer::Worker

Defined in:
lib/dummer/worker.rb

Constant Summary collapse

BIN_NUM =
10

Instance Method Summary collapse

Instance Method Details

#initializeObject



7
8
9
# File 'lib/dummer/worker.rb', line 7

def initialize
  reload
end

#reloadObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/dummer/worker.rb', line 11

def reload
  setting = config[:setting]
  @generator = Generator.new(setting)
  @rate = setting.rate

  if host = setting.host and port = setting.port
    require 'fluent-logger'
    @client = Fluent::Logger::FluentLogger.new(nil, :host => host, :port => port)
  elsif output = setting.output
    if output.respond_to?(:write) and output.respond_to?(:close)
      @file = output
    else
      @file = open(output, (File::WRONLY | File::APPEND | File::CREAT))
      @file.sync = true
    end
  else
    raise ConfigError.new("Config parameter `output`, or `host` and `port` do not exist")
  end

  @write_proc =
    if @client
      Proc.new {|num| num.times { @client.post(@generator.tag, @generator.record) } }
    else # @file
      Proc.new {|num| num.times { @file.write @generator.message } }
    end
end

#runObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/dummer/worker.rb', line 38

def run
  batch_num    = (@rate / BIN_NUM).to_i
  residual_num = (@rate % BIN_NUM)
  while !@stop
    current_time = Time.now.to_i
    BIN_NUM.times do
      break unless (!@stop && Time.now.to_i <= current_time)
      wait(0.1) { @write_proc.call(batch_num) }
    end
    @write_proc.call(residual_num)
    # wait for next second
    while !@stop && Time.now.to_i <= current_time
      sleep 0.01
    end
  end
ensure
  @file.close if @file
end

#stopObject



57
58
59
# File 'lib/dummer/worker.rb', line 57

def stop
  @stop = true
end