Module: Roma::Command::VnodeCommandReceiver

Included in:
BackgroundCommandReceiver
Defined in:
lib/roma/command/vn_command_receiver.rb

Instance Method Summary collapse

Instance Method Details

#ev_reqpushv(s) ⇒ Object

reqpushv <vnode-id> <node-id> <is primary?> src dst

|<-['reqpushv <vn> <nid> <p?>\r\n']         |
|                           ['PUSHED'\r\n]->|


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/roma/command/vn_command_receiver.rb', line 109

def ev_reqpushv(s)
  if s.length!=4
    send_data("CLIENT_ERROR usage:reqpushv vnode-id node-id primary-flag(true/false)\r\n")
    return
  end
  if(@stats.run_iterate_storage || @stats.run_join || @stats.run_balance)
    @log.warn("reqpushv rejected:#{s}")
    send_data("REJECTED\r\n")
    return
  end
  Roma::AsyncProcess::queue.push(Roma::AsyncMessage.new('reqpushv',[s[1],s[2],s[3]]))
  send_data("PUSHED\r\n")
rescue =>e
  @log.error("#{e}\n#{$@}")
end

#ev_spushv(s) ⇒ Object

spushv <hash-name> <vnode-id> src dst

|  ['spushv' <hname> <vn>\r\n]->|
|<-['READY'\r\n]                |
|                 [<dumpdata>]->|
|                       :       |
|                       :       |
|              [<end of dump>]->|
|<-['STORED'\r\n]               |


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/roma/command/vn_command_receiver.rb', line 30

def ev_spushv(s)
  if s.length != 3
    @log.error("#{__method__}:wrong number of arguments(#{s})")
    return send_data("CLIENT_ERROR Wrong number of arguments.\r\n")
  end
  if @stats.spushv_protection
    @log.info("#{__method__}:In spushv_protection")
    return send_data("SERVER_ERROR In spushv_protection.\r\n")          
  end
  @stats.run_receive_a_vnode["#{s[1]}_#{s[2]}"] = true

  $roma.stop_clean_up

  send_data("READY\r\n")

  count = rcount = 0
  @log.debug("#{__method__}:#{s.inspect} received.")
  loop {
    context_bin = read_bytes(20, @stats.spushv_read_timeout)
    vn, last, clk, expt, klen = context_bin.unpack('NNNNN')
    break if klen == 0 # end of dump ?
    k = read_bytes(klen)
    vlen_bin = read_bytes(4, @stats.spushv_read_timeout)
    vlen, =  vlen_bin.unpack('N')
    if vlen != 0
      if klen > @stats.spushv_klength_warn
        @log.warn("#{__method__}:Too long key: key = #{k}")
      end
      if vlen > @stats.spushv_vlength_warn
        @log.warn("#{__method__}:Too long value: key = #{k} vlen = #{vlen}")
      end
      v = read_bytes(vlen, @stats.spushv_read_timeout)

      createhash(s[1]) unless @storages[s[1]]
      if @storages[s[1]].load_stream_dump(vn, last, clk, expt, k, v)
        count += 1
#              @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.")
      else
        rcount += 1
#              @log.warn("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.")
      end
    else
      createhash(s[1]) unless @storages[s[1]]
      if @storages[s[1]].load_stream_dump(vn, last, clk, expt, k, nil)
#              @log.debug("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was stored.")
        count += 1
      else
        rcount += 1
#              @log.warn("#{__method__}:[#{vn} #{last} #{clk} #{expt} #{k}] was rejected.")
      end
    end
  }
  if @stats.spushv_protection
    @log.info("#{__method__}:Canceled because of spushv_protection")
    send_data("CANCELED\r\n")
  else
    send_data("STORED\r\n")
  end
  @log.debug("#{__method__}:#{s[1]}_#{s[2]} #{count} keys loaded.")
  @log.debug("#{__method__}:#{s[1]}_#{s[2]} #{rcount} keys rejected.") if rcount > 0
rescue Storage::StorageException => e
  @log.error("#{e.inspect} #{$@}")
  close_connection
  if Config.const_defined?(:STORAGE_EXCEPTION_ACTION) &&
      Config::STORAGE_EXCEPTION_ACTION == :shutdown
    @log.error("#{__method__}:Romad will be stop.")
    @stop_event_loop = true
  end
rescue => e
  @log.error("#{e} #{$@}")
ensure
  @stats.run_receive_a_vnode.delete("#{s[1]}_#{s[2]}") if s.length == 3
  @stats.last_clean_up = Time.now
end

#ev_spushv_protection(s) ⇒ Object

spushv <true/false>



10
11
12
13
14
15
16
17
18
19
# File 'lib/roma/command/vn_command_receiver.rb', line 10

def ev_spushv_protection(s)
  if s.length == 1
    send_data("#{@stats.spushv_protection}\r\n")
  elsif s.length == 2
    @stats.spushv_protection = (s[1] == 'true')
    send_data("#{@stats.spushv_protection}\r\n")
  else
    send_data("COMMAND ERROR\r\n")
  end
end

#req_push_a_vnode(vn, src_nid, is_primary) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/roma/command/vn_command_receiver.rb', line 125

def req_push_a_vnode(vn, src_nid, is_primary)
  con = Roma::Messaging::ConPool.instance.get_connection(src_nid)
  con.write("reqpushv #{vn} #{@nid} #{is_primary}\r\n")
  res = con.gets # receive 'PUSHED\r\n' | 'REJECTED\r\n'
  Roma::Messaging::ConPool.instance.return_connection(src_nid,con)
  # waiting for pushv
  count = 0
  while @rttable.search_nodes(vn).include?(@nid)==false && count < 300
    sleep 0.1
    count += 1
  end
rescue =>e
  @log.error("#{e}\n#{$@}")
  @rttable.proc_failed(src_nid)
  false
end