Class: Qwe::DB::Worker

Inherits:
Object
  • Object
show all
Includes:
DRb::DRbUndumped, Mixins::Process
Defined in:
lib/qwe/db/worker.rb

Constant Summary collapse

POLL_INTERVAL =
20

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Mixins::Process

#trap_stop_signals

Constructor Details

#initialize(server_url, server_dir, gc_interval = 0, detach_timeout = 300, requirements = nil) ⇒ Worker

Returns a new instance of Worker.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/qwe/db/worker.rb', line 12

def initialize(server_url, server_dir, gc_interval = 0, detach_timeout = 300, requirements = nil)
  @server_dir = server_dir
  @server = DRbObject.new_with_uri(server_url)
  @gc_interval = gc_interval.to_i
  @detach_timeout = detach_timeout.to_i

  @requirements = requirements
  begin
    require requirements if requirements
  rescue ScriptError, StandardError => e
    server.crash_worker(Process.pid, "Error requiring '#{requirements}': #{e.full_message}")
    exit
  end

  Qwe::Function.compile_all

  @records = {}
  @object_refs = {}
end

Instance Attribute Details

#detach_timeoutObject (readonly)

Returns the value of attribute detach_timeout.



8
9
10
# File 'lib/qwe/db/worker.rb', line 8

def detach_timeout
  @detach_timeout
end

#gc_intervalObject (readonly)

Returns the value of attribute gc_interval.



8
9
10
# File 'lib/qwe/db/worker.rb', line 8

def gc_interval
  @gc_interval
end

#recordsObject (readonly)

Returns the value of attribute records.



8
9
10
# File 'lib/qwe/db/worker.rb', line 8

def records
  @records
end

#requirementsObject (readonly)

Returns the value of attribute requirements.



8
9
10
# File 'lib/qwe/db/worker.rb', line 8

def requirements
  @requirements
end

#serverObject (readonly)

Returns the value of attribute server.



8
9
10
# File 'lib/qwe/db/worker.rb', line 8

def server
  @server
end

#server_dirObject (readonly)

Returns the value of attribute server_dir.



8
9
10
# File 'lib/qwe/db/worker.rb', line 8

def server_dir
  @server_dir
end

Class Method Details

.instanceObject



108
109
110
# File 'lib/qwe/db/worker.rb', line 108

def self.instance
  @@instance
end

.spawnObject



112
113
114
115
# File 'lib/qwe/db/worker.rb', line 112

def self.spawn(*)
  @@instance = Qwe::DB::Worker.new(*)
  @@instance.work
end

Instance Method Details

#[](i) ⇒ Object



90
91
92
# File 'lib/qwe/db/worker.rb', line 90

def [](i)
  @object_refs[i] ||= DRbObject.new(@records[i].object)
end

#allocate_record(zero) ⇒ Object



102
103
104
105
106
# File 'lib/qwe/db/worker.rb', line 102

def allocate_record(zero)
  log "Allocate record for #{zero.class}"
  id = server.allocate_record(Process.pid)
  records[id] = Qwe::DB::Record.new(id, server_dir, create: zero)
end

#create(id, klass) ⇒ Object



81
82
83
# File 'lib/qwe/db/worker.rb', line 81

def create(id, klass)
  @records[id] = Qwe::DB::Record.new(id, server_dir, create: klass)
end

#detach(id, record = nil) ⇒ Object



44
45
46
47
48
49
50
51
52
# File 'lib/qwe/db/worker.rb', line 44

def detach(id, record = nil)
  record ||= @records[id]
  @server.detach_record(id)
  @object_refs.delete(id)
  @records.delete(id)
  record.save
  log "Detach record #{id} from worker #{uri}"
  nil
end

#evaluate(str) ⇒ Object



117
118
119
# File 'lib/qwe/db/worker.rb', line 117

def evaluate(str)
  instance_eval str
end

#load(id) ⇒ Object



85
86
87
88
# File 'lib/qwe/db/worker.rb', line 85

def load(id)
  @records[id] = Qwe::DB::Record.new(id, server_dir)
  DRbObject.new(@records[id].object)
end

#start_gcObject



54
55
56
57
58
59
60
61
62
# File 'lib/qwe/db/worker.rb', line 54

def start_gc
  @gc_thread = Thread.new do
    loop do
      sleep @gc_interval
      next if @records.empty?
      GC.start
    end
  end
end

#start_pollObject



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/qwe/db/worker.rb', line 32

def start_poll
  @poll_thread = Thread.new do
    loop do
      sleep POLL_INTERVAL
      @records.keys.each do |id|
        r = @records[id]
        detach(id, r) if r.should_detach?
      end
    end
  end
end

#stopObject



94
95
96
97
98
99
100
# File 'lib/qwe/db/worker.rb', line 94

def stop
  return if @stopping
  @stopping = true
  log "Stopping"
  @records.keys.each { |id| @records[id].save }
  exit
end

#uriObject



64
65
66
# File 'lib/qwe/db/worker.rb', line 64

def uri
  @drb&.uri
end

#workObject



68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/qwe/db/worker.rb', line 68

def work
  @drb = DRb.start_service("druby://localhost:0", self)
  log "Started worker at #{@drb.uri}, pid = #{Process.pid}, requirements = #{@requirements}"
  @server.set_worker_uri(uri, Process.pid)

  start_poll
  start_gc if @gc_interval > 0

  trap_stop_signals

  @drb.thread.join
end