Class: LS4::DataServerService

Inherits:
Service show all
Defined in:
lib/ls4/service/data_server.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Service

init

Methods included from EventBus::SingletonMixin

#ebus_bind!, #ebus_connect, extended

Methods included from EventBus::BusMixin

#ebus_all_slots, #ebus_disconnect!

Methods included from EventBus::DeclarerBase::Methods

#connect, #ebus_all_slots, #ebus_call_log, #ebus_call_slots, #ebus_signal_error, #ebus_signal_log, #ebus_signal_slots

Methods included from EventBus::DeclarerBase

#call_slot, #signal_slot

Constructor Details

#initializeDataServerService



22
23
24
25
26
27
28
# File 'lib/ls4/service/data_server.rb', line 22

def initialize
  @self_nid = ConfigBus.self_nid
  @self_rsids = ConfigBus.self_rsids
  @stat_cmd_read = 0
  @stat_cmd_write = 0
  @stat_cmd_delete = 0
end

Instance Attribute Details

#stat_cmd_deleteObject (readonly)

Returns the value of attribute stat_cmd_delete.



147
148
149
# File 'lib/ls4/service/data_server.rb', line 147

def stat_cmd_delete
  @stat_cmd_delete
end

#stat_cmd_readObject (readonly)

Returns the value of attribute stat_cmd_read.



145
146
147
# File 'lib/ls4/service/data_server.rb', line 145

def stat_cmd_read
  @stat_cmd_read
end

#stat_cmd_writeObject (readonly)

Returns the value of attribute stat_cmd_write.



146
147
148
# File 'lib/ls4/service/data_server.rb', line 146

def stat_cmd_write
  @stat_cmd_write
end

Instance Method Details

#on_timerObject



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/ls4/service/data_server.rb', line 127

def on_timer
  nids = []
  @self_rsids.each {|rsid|
    begin
      nids.concat MasterSelectBus.select_master_static(rsid)
    rescue
    end
  }
  done = [@self_nid]
  nids.each {|nid|
    if !done.include?(nid) && !MembershipBus.is_fault(nid)
      session = MembershipBus.get_session_nid(nid)
      SlaveBus.try_replicate(nid, session)
      done << nid
    end
  }
end

#rpc_delete_direct(okey) ⇒ Object



72
73
74
75
76
77
78
79
80
# File 'lib/ls4/service/data_server.rb', line 72

def rpc_delete_direct(okey)
  @stat_cmd_delete += 1
  d = UpdateLogData.new(okey.vtime, okey.key)
  deleted = nil
  UpdateLogBus.append(d.dump) do
    deleted = StorageBus.delete(okey.vtime, okey.key)
  end
  deleted
end

#rpc_exist_direct(okey) ⇒ Object

def rpc_resize_direct(okey, size) # TODO: stat_cmd_resize? # FIXME size field? d = UpdateLogData.new(okey.vtime, okey.key, nil, size) UpdateLogBus.append(d.dump) do StorageBus.resize(okey.vtime, okey.key, size) end nil end



68
69
70
# File 'lib/ls4/service/data_server.rb', line 68

def rpc_exist_direct(okey)
  StorageBus.exist(okey.vtime, okey.key)
end

#rpc_get_direct(okey) ⇒ Object



30
31
32
33
# File 'lib/ls4/service/data_server.rb', line 30

def rpc_get_direct(okey)
  @stat_cmd_read += 1
  StorageBus.get(okey.vtime, okey.key)
end

#rpc_read_direct(okey, offset, size) ⇒ Object



35
36
37
38
# File 'lib/ls4/service/data_server.rb', line 35

def rpc_read_direct(okey, offset, size)
  @stat_cmd_read += 1
  StorageBus.read(okey.vtime, okey.key, offset, size)
end

#rpc_replicate_notify(nid) ⇒ Object



117
118
119
120
121
# File 'lib/ls4/service/data_server.rb', line 117

def rpc_replicate_notify(nid)
  session = MembershipBus.get_session_nid(nid)
  SlaveBus.try_replicate(nid, session)
  nil
end

#rpc_replicate_pull(pos, limit) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/ls4/service/data_server.rb', line 82

def rpc_replicate_pull(pos, limit)
  mkeys = []
  msgs = []
  msize = 0
  while true
    raw, npos = UpdateLogBus.get(pos)
    unless raw
      break
    end
    d = UpdateLogData.load(raw)
    # set or delete
    if mkeys.include?(d.key)
      pos = npos
    else
      if d.offset && d.size
        data = StorageBus.read(d.vtime, d.key, d.offset, d.size)
      else
        data = StorageBus.get(d.vtime, d.key)
        mkeys << d.key
      end
      # data may be null => deleted
      if data
        msgs << [d.vtime, d.key, d.offset, data]
        msize += data.size
      else
        # data is deleted
        msgs << [d.vtime, d.key, 0, nil]
      end
      pos = npos
      break if msize > limit
    end
  end
  [pos, msgs]
end

#rpc_set_direct(okey, data) ⇒ Object



40
41
42
43
44
45
46
47
# File 'lib/ls4/service/data_server.rb', line 40

def rpc_set_direct(okey, data)
  @stat_cmd_write += 1
  d = UpdateLogData.new(okey.vtime, okey.key)
  UpdateLogBus.append(d.dump) do
    StorageBus.set(okey.vtime, okey.key, data)
  end
  nil
end

#stat_db_itemsObject



123
124
125
# File 'lib/ls4/service/data_server.rb', line 123

def stat_db_items
  StorageBus.get_items
end