Class: LS4::DataServerService
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#initialize ⇒ DataServerService
constructor
A new instance of DataServerService.
-
#on_timer ⇒ Object
-
#rpc_delete_direct(okey) ⇒ Object
-
#rpc_exist_direct(okey) ⇒ Object
def rpc_resize_direct(okey, size) # TODO: stat_cmd_resize? # FIXME size field? d = UpdateLogData.new(okey.vtime, okey.key, nil, size) UpdateLogBus.append(d.dump) do StorageBus.resize(okey.vtime, okey.key, size) end nil end.
-
#rpc_get_direct(okey) ⇒ Object
-
#rpc_read_direct(okey, offset, size) ⇒ Object
-
#rpc_replicate_notify(nid) ⇒ Object
-
#rpc_replicate_pull(pos, limit) ⇒ Object
-
#rpc_set_direct(okey, data) ⇒ Object
-
#stat_db_items ⇒ Object
Methods inherited from Service
init
#ebus_bind!, #ebus_connect, extended
#ebus_all_slots, #ebus_disconnect!
#connect, #ebus_all_slots, #ebus_call_log, #ebus_call_slots, #ebus_signal_error, #ebus_signal_log, #ebus_signal_slots
#call_slot, #signal_slot
Constructor Details
22
23
24
25
26
27
28
|
# File 'lib/ls4/service/data_server.rb', line 22
def initialize
@self_nid = ConfigBus.self_nid
@self_rsids = ConfigBus.self_rsids
@stat_cmd_read = 0
@stat_cmd_write = 0
@stat_cmd_delete = 0
end
|
Instance Attribute Details
#stat_cmd_delete ⇒ Object
Returns the value of attribute stat_cmd_delete.
147
148
149
|
# File 'lib/ls4/service/data_server.rb', line 147
def stat_cmd_delete
@stat_cmd_delete
end
|
#stat_cmd_read ⇒ Object
Returns the value of attribute stat_cmd_read.
145
146
147
|
# File 'lib/ls4/service/data_server.rb', line 145
def stat_cmd_read
@stat_cmd_read
end
|
#stat_cmd_write ⇒ Object
Returns the value of attribute stat_cmd_write.
146
147
148
|
# File 'lib/ls4/service/data_server.rb', line 146
def stat_cmd_write
@stat_cmd_write
end
|
Instance Method Details
#on_timer ⇒ Object
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
# File 'lib/ls4/service/data_server.rb', line 127
def on_timer
nids = []
@self_rsids.each {|rsid|
begin
nids.concat MasterSelectBus.select_master_static(rsid)
rescue
end
}
done = [@self_nid]
nids.each {|nid|
if !done.include?(nid) && !MembershipBus.is_fault(nid)
session = MembershipBus.get_session_nid(nid)
SlaveBus.try_replicate(nid, session)
done << nid
end
}
end
|
#rpc_delete_direct(okey) ⇒ Object
72
73
74
75
76
77
78
79
80
|
# File 'lib/ls4/service/data_server.rb', line 72
def rpc_delete_direct(okey)
@stat_cmd_delete += 1
d = UpdateLogData.new(okey.vtime, okey.key)
deleted = nil
UpdateLogBus.append(d.dump) do
deleted = StorageBus.delete(okey.vtime, okey.key)
end
deleted
end
|
#rpc_exist_direct(okey) ⇒ Object
def rpc_resize_direct(okey, size) # TODO: stat_cmd_resize? # FIXME size field? d = UpdateLogData.new(okey.vtime, okey.key, nil, size) UpdateLogBus.append(d.dump) do StorageBus.resize(okey.vtime, okey.key, size) end nil end
68
69
70
|
# File 'lib/ls4/service/data_server.rb', line 68
def rpc_exist_direct(okey)
StorageBus.exist(okey.vtime, okey.key)
end
|
#rpc_get_direct(okey) ⇒ Object
30
31
32
33
|
# File 'lib/ls4/service/data_server.rb', line 30
def rpc_get_direct(okey)
@stat_cmd_read += 1
StorageBus.get(okey.vtime, okey.key)
end
|
#rpc_read_direct(okey, offset, size) ⇒ Object
35
36
37
38
|
# File 'lib/ls4/service/data_server.rb', line 35
def rpc_read_direct(okey, offset, size)
@stat_cmd_read += 1
StorageBus.read(okey.vtime, okey.key, offset, size)
end
|
#rpc_replicate_notify(nid) ⇒ Object
117
118
119
120
121
|
# File 'lib/ls4/service/data_server.rb', line 117
def rpc_replicate_notify(nid)
session = MembershipBus.get_session_nid(nid)
SlaveBus.try_replicate(nid, session)
nil
end
|
#rpc_replicate_pull(pos, limit) ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
# File 'lib/ls4/service/data_server.rb', line 82
def rpc_replicate_pull(pos, limit)
mkeys = []
msgs = []
msize = 0
while true
raw, npos = UpdateLogBus.get(pos)
unless raw
break
end
d = UpdateLogData.load(raw)
if mkeys.include?(d.key)
pos = npos
else
if d.offset && d.size
data = StorageBus.read(d.vtime, d.key, d.offset, d.size)
else
data = StorageBus.get(d.vtime, d.key)
mkeys << d.key
end
if data
msgs << [d.vtime, d.key, d.offset, data]
msize += data.size
else
msgs << [d.vtime, d.key, 0, nil]
end
pos = npos
break if msize > limit
end
end
[pos, msgs]
end
|
#rpc_set_direct(okey, data) ⇒ Object
40
41
42
43
44
45
46
47
|
# File 'lib/ls4/service/data_server.rb', line 40
def rpc_set_direct(okey, data)
@stat_cmd_write += 1
d = UpdateLogData.new(okey.vtime, okey.key)
UpdateLogBus.append(d.dump) do
StorageBus.set(okey.vtime, okey.key, data)
end
nil
end
|
#stat_db_items ⇒ Object
123
124
125
|
# File 'lib/ls4/service/data_server.rb', line 123
def stat_db_items
StorageBus.get_items
end
|