Class: Qwe::DB::Server
- Inherits:
-
Object
- Object
- Qwe::DB::Server
- Includes:
- DRb::DRbUndumped, Mixins::Process
- Defined in:
- lib/qwe/db/server.rb
Constant Summary collapse
- DEFAULT_PORT =
3228- DEFAULT_HOST =
"druby://localhost"- SPAWN_WORKER_TIMEOUT =
50
Instance Attribute Summary collapse
-
#dir ⇒ Object
readonly
Returns the value of attribute dir.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#records_length ⇒ Object
Returns the value of attribute records_length.
-
#threads_count ⇒ Object
readonly
Returns the value of attribute threads_count.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Class Method Summary collapse
Instance Method Summary collapse
- #[](id) ⇒ Object
- #allocate_record(pid) ⇒ Object
- #crash_worker(pid, message) ⇒ Object
- #create(klass) ⇒ Object
- #del_worker(pid) ⇒ Object
- #destroy(id) ⇒ Object
- #destroy!(id) ⇒ Object
- #detach(id) ⇒ Object
- #detach_record(id) ⇒ Object
-
#initialize(dir: "#{Dir.pwd}/qwe_db", host: DEFAULT_HOST, port: DEFAULT_PORT, require_file: nil, no_jit: false, threads: Etc.nprocessors, gc_interval: 0, detach_timeout: 300) ⇒ Server
constructor
A new instance of Server.
- #load(id) ⇒ Object
- #pick_worker(attempt = 0) ⇒ Object
- #ready? ⇒ Boolean
- #set_worker_uri(uri, pid) ⇒ Object
- #spawn_worker(wait: true) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #workers_length ⇒ Object
Methods included from Mixins::Process
Constructor Details
#initialize(dir: "#{Dir.pwd}/qwe_db", host: DEFAULT_HOST, port: DEFAULT_PORT, require_file: nil, no_jit: false, threads: Etc.nprocessors, gc_interval: 0, detach_timeout: 300) ⇒ Server
Returns a new instance of Server.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/qwe/db/server.rb', line 19 def initialize( dir: "#{Dir.pwd}/qwe_db", host: DEFAULT_HOST, port: DEFAULT_PORT, require_file: nil, no_jit: false, threads: Etc.nprocessors, gc_interval: 0, detach_timeout: 300 ) @port = port @host = host @dir = dir @uri = "#{@host}:#{@port}" @threads_count = threads @worker_args = [RbConfig.ruby] @worker_args.push "--jit" unless no_jit @worker_args += ["#{__dir__}/../../spawn_worker.rb", uri, dir, gc_interval.to_s, detach_timeout.to_s] @worker_args.push require_file if require_file @workers = {} @records = {} log "Initialized server in #{dir}" end |
Instance Attribute Details
#dir ⇒ Object (readonly)
Returns the value of attribute dir.
13 14 15 |
# File 'lib/qwe/db/server.rb', line 13 def dir @dir end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
13 14 15 |
# File 'lib/qwe/db/server.rb', line 13 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
13 14 15 |
# File 'lib/qwe/db/server.rb', line 13 def port @port end |
#records_length ⇒ Object
Returns the value of attribute records_length.
13 14 15 |
# File 'lib/qwe/db/server.rb', line 13 def records_length @records_length end |
#threads_count ⇒ Object (readonly)
Returns the value of attribute threads_count.
13 14 15 |
# File 'lib/qwe/db/server.rb', line 13 def threads_count @threads_count end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
13 14 15 |
# File 'lib/qwe/db/server.rb', line 13 def uri @uri end |
Class Method Details
.default_uri ⇒ Object
15 16 17 |
# File 'lib/qwe/db/server.rb', line 15 def self.default_uri "#{DEFAULT_HOST}:#{DEFAULT_PORT}" end |
.mkdir(d) ⇒ Object
88 89 90 |
# File 'lib/qwe/db/server.rb', line 88 def self.mkdir(d) Dir.mkdir(d) unless Dir.exist?(d) end |
Instance Method Details
#[](id) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/qwe/db/server.rb', line 70 def [](id) id = id.to_i w = @records[id] if w if w.is_a?(Thread) w.value else w[id] end else load(id) end end |
#allocate_record(pid) ⇒ Object
194 195 196 197 198 199 200 |
# File 'lib/qwe/db/server.rb', line 194 def allocate_record(pid) id = records_length @records_length += 1 @records[id] = @workers[pid] log "Allocate record #{id}" id end |
#crash_worker(pid, message) ⇒ Object
147 148 149 150 |
# File 'lib/qwe/db/server.rb', line 147 def crash_worker(pid, ) @workers[pid].raise @workers.delete(pid) end |
#create(klass) ⇒ Object
159 160 161 162 163 164 165 166 167 |
# File 'lib/qwe/db/server.rb', line 159 def create(klass) id = records_length w = pick_worker w.create(id, klass) log "Create record #{id} in worker #{w.uri}" @records[id] = w @records_length += 1 id end |
#del_worker(pid) ⇒ Object
152 153 154 155 156 157 |
# File 'lib/qwe/db/server.rb', line 152 def del_worker(pid) if @workers[pid].is_a?(Thread) @workers[pid].exit end @workers.delete(pid) end |
#destroy(id) ⇒ Object
202 203 204 205 206 207 |
# File 'lib/qwe/db/server.rb', line 202 def destroy(id) destroy!(id) rescue => e log "Can't destroy - #{e.full_message}" nil end |
#destroy!(id) ⇒ Object
209 210 211 212 213 214 |
# File 'lib/qwe/db/server.rb', line 209 def destroy!(id) throw "Record #{id} is in use" if @records[id] FileUtils.rm_r(File.join(dir, "records", id.to_s)) log "Destroyed #{id}" nil end |
#detach(id) ⇒ Object
169 170 171 172 |
# File 'lib/qwe/db/server.rb', line 169 def detach(id) @records[id].detach(id) nil end |
#detach_record(id) ⇒ Object
174 175 176 |
# File 'lib/qwe/db/server.rb', line 174 def detach_record(id) @records.delete(id) end |
#load(id) ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/qwe/db/server.rb', line 178 def load(id) @records[id] = Thread.new do begin w = pick_worker log "Load record #{id} in worker #{w.uri}" obj = w.load(id) rescue Exception => e @records[id] = nil raise e end @records[id] = w obj end @records[id].value end |
#pick_worker(attempt = 0) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/qwe/db/server.rb', line 126 def pick_worker(attempt = 0) pid = @workers.keys.sample if @workers[pid].is_a? Thread @workers[pid].join end @workers[pid] rescue => e log "Error picking worker: #{e.full_message}, attempt #{attempt}" raise "Give up" if attempt > 2 del_worker(pid) spawn_worker pick_worker(attempt + 1) end |
#ready? ⇒ Boolean
66 67 68 |
# File 'lib/qwe/db/server.rb', line 66 def ready? @ready || false end |
#set_worker_uri(uri, pid) ⇒ Object
140 141 142 143 144 145 |
# File 'lib/qwe/db/server.rb', line 140 def set_worker_uri(uri, pid) t = @workers[pid] @workers[pid] = DRbObject.new_with_uri(uri) log "Worker #{pid} is up" t.exit end |
#spawn_worker(wait: true) ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/qwe/db/server.rb', line 114 def spawn_worker(wait: true) pid = spawn(*@worker_args) Process.detach(pid) @workers[pid] = Thread.new do sleep SPAWN_WORKER_TIMEOUT raise "Worker did not start after #{SPAWN_WORKER_TIMEOUT}s timeout" end @workers[pid].join if wait pid end |
#start ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/qwe/db/server.rb', line 46 def start self.class.mkdir(dir) d = File.join(dir, "records") self.class.mkdir(d) @records_length = (Dir.new(d).children.map(&:to_i).max || 0) + 1 @drb = DRb.start_service(uri, self) trap_stop_signals log "Started server at #{@drb.uri}, records_length = #{records_length}, pid = #{Process.pid}" @threads_count.times do spawn_worker(wait: false) end @ready = true @drb.thread.join end |
#stop ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/qwe/db/server.rb', line 97 def stop return if @stopping @stopping = true log "Stopping" @workers.each do |pid, worker| log "Terminate worker #{pid}" Process.kill("TERM", pid) rescue Errno::ESRCH log "Worker #{pid} is already dead" rescue => e log "Error terminating worker #{pid}: #{e.full_message}" end exit end |
#workers_length ⇒ Object
84 85 86 |
# File 'lib/qwe/db/server.rb', line 84 def workers_length @workers.length end |