Class: WorkerScoreboard

Inherits:
Object
  • Object
show all
Defined in:
lib/worker_scoreboard.rb,
lib/worker_scoreboard/version.rb

Defined Under Namespace

Classes: Remover

Constant Summary collapse

VERSION =
"0.0.1"

Instance Method Summary collapse

Constructor Details

#initialize(base_dir) ⇒ WorkerScoreboard

Returns a new instance of WorkerScoreboard.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/worker_scoreboard.rb', line 5

def initialize(base_dir)
  fail 'mandatory parameter:base_dir is missing' if base_dir.nil?
  @base_dir = base_dir

  @fh = nil
  @id_for_fh = nil

  # [path, filehandler]
  @data = [build_filename, @fh]
  @clean_proc = Remover.new(@data)
  ObjectSpace.define_finalizer(self, @clean_proc)

  unless Dir.exist? @base_dir
    Dir.mkdir @base_dir or fail "failed to create directory:#{@base_dir}:#{$!}"
  end
  begin
    File.unlink build_filename
  rescue
  end
end

Instance Method Details

#cleanupObject



66
67
68
# File 'lib/worker_scoreboard.rb', line 66

def cleanup
  for_all
end

#read_allObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/worker_scoreboard.rb', line 47

def read_all
  ret = {}
  for_all do |id, fh|
    10.times do
      fh.seek 0 or raise "seek failed:#{$!}"
      data = fh.read
      break if data.length < 16 + 4
      md5 = data[0, 16]
      size = data[16, 4].unpack("N*")
      status = data[20, size[0]]
      next if Digest::MD5.digest(status) != md5
      ret[id] = status
      break
    end
    #warn "failed to read status of id:#{id}, skipping"
  end
  ret
end

#update(status) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/worker_scoreboard.rb', line 26

def update(status)
  if !@fh.nil? && @id_for_fh != worker_id
    @fh.close
    @fh = nil
  end

  if @fh.nil?
    filename = build_filename
    tmp_filename = "#{filename}.tmp"
    f = File.open(tmp_filename, 'wb')
    f.flush
    f.flock File::LOCK_EX or raise "failed to flock LOCK_EX file:#{@fn}.tmp:#{$!}"
    File.rename(tmp_filename, filename) or raise "failed to rename file:#{@fn.tmp} to #{@fn}:#{$!}"
    @fh = f
    @id_for_fh = worker_id
  end
  @fh.seek 0 or raise "seek failed:#{$!}";
  @fh.write("#{Digest::MD5.digest(status)}#{[status.length].pack("N*")}#{status}")
  @fh.flush
end