Class: Qwe::DB::Worker
- Inherits:
-
Object
- Object
- Qwe::DB::Worker
- Includes:
- DRb::DRbUndumped, Mixins::Process
- Defined in:
- lib/qwe/db/worker.rb
Constant Summary collapse
- POLL_INTERVAL =
20
Instance Attribute Summary collapse
-
#detach_timeout ⇒ Object
readonly
Returns the value of attribute detach_timeout.
-
#gc_interval ⇒ Object
readonly
Returns the value of attribute gc_interval.
-
#records ⇒ Object
readonly
Returns the value of attribute records.
-
#requirements ⇒ Object
readonly
Returns the value of attribute requirements.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#server_dir ⇒ Object
readonly
Returns the value of attribute server_dir.
Class Method Summary collapse
Instance Method Summary collapse
- #[](i) ⇒ Object
- #allocate_record(zero) ⇒ Object
- #create(id, klass) ⇒ Object
- #detach(id, record = nil) ⇒ Object
- #evaluate(str) ⇒ Object
-
#initialize(server_url, server_dir, gc_interval = 0, detach_timeout = 300, requirements = nil) ⇒ Worker
constructor
A new instance of Worker.
- #load(id) ⇒ Object
- #start_gc ⇒ Object
- #start_poll ⇒ Object
- #stop ⇒ Object
- #uri ⇒ Object
- #work ⇒ Object
Methods included from Mixins::Process
Constructor Details
#initialize(server_url, server_dir, gc_interval = 0, detach_timeout = 300, requirements = nil) ⇒ Worker
Returns a new instance of Worker.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/qwe/db/worker.rb', line 12 def initialize(server_url, server_dir, gc_interval = 0, detach_timeout = 300, requirements = nil) @server_dir = server_dir @server = DRbObject.new_with_uri(server_url) @gc_interval = gc_interval.to_i @detach_timeout = detach_timeout.to_i @requirements = requirements begin require requirements if requirements rescue ScriptError, StandardError => e server.crash_worker(Process.pid, "Error requiring '#{requirements}': #{e.full_message}") exit end Qwe::Function.compile_all @records = {} @object_refs = {} end |
Instance Attribute Details
#detach_timeout ⇒ Object (readonly)
Returns the value of attribute detach_timeout.
8 9 10 |
# File 'lib/qwe/db/worker.rb', line 8 def detach_timeout @detach_timeout end |
#gc_interval ⇒ Object (readonly)
Returns the value of attribute gc_interval.
8 9 10 |
# File 'lib/qwe/db/worker.rb', line 8 def gc_interval @gc_interval end |
#records ⇒ Object (readonly)
Returns the value of attribute records.
8 9 10 |
# File 'lib/qwe/db/worker.rb', line 8 def records @records end |
#requirements ⇒ Object (readonly)
Returns the value of attribute requirements.
8 9 10 |
# File 'lib/qwe/db/worker.rb', line 8 def requirements @requirements end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
8 9 10 |
# File 'lib/qwe/db/worker.rb', line 8 def server @server end |
#server_dir ⇒ Object (readonly)
Returns the value of attribute server_dir.
8 9 10 |
# File 'lib/qwe/db/worker.rb', line 8 def server_dir @server_dir end |
Class Method Details
.instance ⇒ Object
108 109 110 |
# File 'lib/qwe/db/worker.rb', line 108 def self.instance @@instance end |
.spawn ⇒ Object
112 113 114 115 |
# File 'lib/qwe/db/worker.rb', line 112 def self.spawn(*) @@instance = Qwe::DB::Worker.new(*) @@instance.work end |
Instance Method Details
#[](i) ⇒ Object
90 91 92 |
# File 'lib/qwe/db/worker.rb', line 90 def [](i) @object_refs[i] ||= DRbObject.new(@records[i].object) end |
#allocate_record(zero) ⇒ Object
102 103 104 105 106 |
# File 'lib/qwe/db/worker.rb', line 102 def allocate_record(zero) log "Allocate record for #{zero.class}" id = server.allocate_record(Process.pid) records[id] = Qwe::DB::Record.new(id, server_dir, create: zero) end |
#create(id, klass) ⇒ Object
81 82 83 |
# File 'lib/qwe/db/worker.rb', line 81 def create(id, klass) @records[id] = Qwe::DB::Record.new(id, server_dir, create: klass) end |
#detach(id, record = nil) ⇒ Object
44 45 46 47 48 49 50 51 52 |
# File 'lib/qwe/db/worker.rb', line 44 def detach(id, record = nil) record ||= @records[id] @server.detach_record(id) @object_refs.delete(id) @records.delete(id) record.save log "Detach record #{id} from worker #{uri}" nil end |
#evaluate(str) ⇒ Object
117 118 119 |
# File 'lib/qwe/db/worker.rb', line 117 def evaluate(str) instance_eval str end |
#load(id) ⇒ Object
85 86 87 88 |
# File 'lib/qwe/db/worker.rb', line 85 def load(id) @records[id] = Qwe::DB::Record.new(id, server_dir) DRbObject.new(@records[id].object) end |
#start_gc ⇒ Object
54 55 56 57 58 59 60 61 62 |
# File 'lib/qwe/db/worker.rb', line 54 def start_gc @gc_thread = Thread.new do loop do sleep @gc_interval next if @records.empty? GC.start end end end |
#start_poll ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/qwe/db/worker.rb', line 32 def start_poll @poll_thread = Thread.new do loop do sleep POLL_INTERVAL @records.keys.each do |id| r = @records[id] detach(id, r) if r.should_detach? end end end end |
#stop ⇒ Object
94 95 96 97 98 99 100 |
# File 'lib/qwe/db/worker.rb', line 94 def stop return if @stopping @stopping = true log "Stopping" @records.keys.each { |id| @records[id].save } exit end |
#uri ⇒ Object
64 65 66 |
# File 'lib/qwe/db/worker.rb', line 64 def uri @drb&.uri end |
#work ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/qwe/db/worker.rb', line 68 def work @drb = DRb.start_service("druby://localhost:0", self) log "Started worker at #{@drb.uri}, pid = #{Process.pid}, requirements = #{@requirements}" @server.set_worker_uri(uri, Process.pid) start_poll start_gc if @gc_interval > 0 trap_stop_signals @drb.thread.join end |