Class: Qwe::DB::Server

Inherits:
Object
  • Object
show all
Includes:
DRb::DRbUndumped, Mixins::Process
Defined in:
lib/qwe/db/server.rb

Constant Summary collapse

DEFAULT_PORT =
3228
DEFAULT_HOST =
"druby://localhost"
SPAWN_WORKER_TIMEOUT =
50

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Mixins::Process

#trap_stop_signals

Constructor Details

#initialize(dir: "#{Dir.pwd}/qwe_db", host: DEFAULT_HOST, port: DEFAULT_PORT, require_file: nil, no_jit: false, threads: Etc.nprocessors, gc_interval: 0, detach_timeout: 300) ⇒ Server

Returns a new instance of Server.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/qwe/db/server.rb', line 19

def initialize(
  dir: "#{Dir.pwd}/qwe_db",
  host: DEFAULT_HOST,
  port: DEFAULT_PORT,
  require_file: nil,
  no_jit: false,
  threads: Etc.nprocessors,
  gc_interval: 0,
  detach_timeout: 300
)
  @port = port
  @host = host
  @dir = dir
  @uri = "#{@host}:#{@port}"
  @threads_count = threads

  @worker_args = [RbConfig.ruby]
  @worker_args.push "--jit" unless no_jit
  @worker_args += ["#{__dir__}/../../spawn_worker.rb", uri, dir, gc_interval.to_s, detach_timeout.to_s]
  @worker_args.push require_file if require_file

  @workers = {}
  @records = {}

  log "Initialized server in #{dir}"
end

Instance Attribute Details

#dirObject (readonly)

Returns the value of attribute dir.



13
14
15
# File 'lib/qwe/db/server.rb', line 13

def dir
  @dir
end

#hostObject (readonly)

Returns the value of attribute host.



13
14
15
# File 'lib/qwe/db/server.rb', line 13

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



13
14
15
# File 'lib/qwe/db/server.rb', line 13

def port
  @port
end

#records_lengthObject

Returns the value of attribute records_length.



13
14
15
# File 'lib/qwe/db/server.rb', line 13

def records_length
  @records_length
end

#threads_countObject (readonly)

Returns the value of attribute threads_count.



13
14
15
# File 'lib/qwe/db/server.rb', line 13

def threads_count
  @threads_count
end

#uriObject (readonly)

Returns the value of attribute uri.



13
14
15
# File 'lib/qwe/db/server.rb', line 13

def uri
  @uri
end

Class Method Details

.default_uriObject



15
16
17
# File 'lib/qwe/db/server.rb', line 15

def self.default_uri
  "#{DEFAULT_HOST}:#{DEFAULT_PORT}"
end

.mkdir(d) ⇒ Object



88
89
90
# File 'lib/qwe/db/server.rb', line 88

def self.mkdir(d)
  Dir.mkdir(d) unless Dir.exist?(d)
end

Instance Method Details

#[](id) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/qwe/db/server.rb', line 70

def [](id)
  id = id.to_i
  w = @records[id]
  if w
    if w.is_a?(Thread)
      w.value
    else
      w[id]
    end
  else
    load(id)
  end
end

#allocate_record(pid) ⇒ Object



194
195
196
197
198
199
200
# File 'lib/qwe/db/server.rb', line 194

def allocate_record(pid)
  id = records_length
  @records_length += 1
  @records[id] = @workers[pid]
  log "Allocate record #{id}"
  id
end

#crash_worker(pid, message) ⇒ Object



147
148
149
150
# File 'lib/qwe/db/server.rb', line 147

def crash_worker(pid, message)
  @workers[pid].raise message
  @workers.delete(pid)
end

#create(klass) ⇒ Object



159
160
161
162
163
164
165
166
167
# File 'lib/qwe/db/server.rb', line 159

def create(klass)
  id = records_length
  w = pick_worker
  w.create(id, klass)
  log "Create record #{id} in worker #{w.uri}"
  @records[id] = w
  @records_length += 1
  id
end

#del_worker(pid) ⇒ Object



152
153
154
155
156
157
# File 'lib/qwe/db/server.rb', line 152

def del_worker(pid)
  if @workers[pid].is_a?(Thread)
    @workers[pid].exit
  end
  @workers.delete(pid)
end

#destroy(id) ⇒ Object



202
203
204
205
206
207
# File 'lib/qwe/db/server.rb', line 202

def destroy(id)
  destroy!(id)
rescue => e
  log "Can't destroy - #{e.full_message}"
  nil
end

#destroy!(id) ⇒ Object



209
210
211
212
213
214
# File 'lib/qwe/db/server.rb', line 209

def destroy!(id)
  throw "Record #{id} is in use" if @records[id]
  FileUtils.rm_r(File.join(dir, "records", id.to_s))
  log "Destroyed #{id}"
  nil
end

#detach(id) ⇒ Object



169
170
171
172
# File 'lib/qwe/db/server.rb', line 169

def detach(id)
  @records[id].detach(id)
  nil
end

#detach_record(id) ⇒ Object



174
175
176
# File 'lib/qwe/db/server.rb', line 174

def detach_record(id)
  @records.delete(id)
end

#load(id) ⇒ Object



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/qwe/db/server.rb', line 178

def load(id)
  @records[id] = Thread.new do
    begin
      w = pick_worker
      log "Load record #{id} in worker #{w.uri}"
      obj = w.load(id)
    rescue Exception => e
      @records[id] = nil
      raise e
    end
    @records[id] = w
    obj
  end
  @records[id].value
end

#pick_worker(attempt = 0) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/qwe/db/server.rb', line 126

def pick_worker(attempt = 0)
  pid = @workers.keys.sample
  if @workers[pid].is_a? Thread
    @workers[pid].join
  end
  @workers[pid]
rescue => e
  log "Error picking worker: #{e.full_message}, attempt #{attempt}"
  raise "Give up" if attempt > 2
  del_worker(pid)
  spawn_worker
  pick_worker(attempt + 1)
end

#ready?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/qwe/db/server.rb', line 66

def ready?
  @ready || false
end

#set_worker_uri(uri, pid) ⇒ Object



140
141
142
143
144
145
# File 'lib/qwe/db/server.rb', line 140

def set_worker_uri(uri, pid)
  t = @workers[pid]
  @workers[pid] = DRbObject.new_with_uri(uri)
  log "Worker #{pid} is up"
  t.exit
end

#spawn_worker(wait: true) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/qwe/db/server.rb', line 114

def spawn_worker(wait: true)
  pid = spawn(*@worker_args)
  Process.detach(pid)

  @workers[pid] = Thread.new do
    sleep SPAWN_WORKER_TIMEOUT
    raise "Worker did not start after #{SPAWN_WORKER_TIMEOUT}s timeout"
  end
  @workers[pid].join if wait
  pid
end

#startObject



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/qwe/db/server.rb', line 46

def start
  self.class.mkdir(dir)
  d = File.join(dir, "records")
  self.class.mkdir(d)
  @records_length = (Dir.new(d).children.map(&:to_i).max || 0) + 1
  @drb = DRb.start_service(uri, self)

  trap_stop_signals

  log "Started server at #{@drb.uri}, records_length = #{records_length}, pid = #{Process.pid}"

  @threads_count.times do
    spawn_worker(wait: false)
  end

  @ready = true

  @drb.thread.join
end

#stopObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/qwe/db/server.rb', line 97

def stop
  return if @stopping
  @stopping = true
  log "Stopping"
  @workers.each do |pid, worker|
    log "Terminate worker #{pid}"
    Process.kill("TERM", pid)
  rescue Errno::ESRCH
    log "Worker #{pid} is already dead"
  rescue => e
    log "Error terminating worker #{pid}: #{e.full_message}"
  end
  exit
end

#workers_lengthObject



84
85
86
# File 'lib/qwe/db/server.rb', line 84

def workers_length
  @workers.length
end