Class: BitBroker::ManagerImpl
- Inherits:
-
Object
- Object
- BitBroker::ManagerImpl
- Defined in:
- lib/bitbroker/manager_impl.rb
Direct Known Subclasses
Defined Under Namespace
Classes: FileActivity
Instance Method Summary collapse
- #do_start_collector ⇒ Object
- #do_start_data_receiver ⇒ Object
- #do_start_metadata_receiver ⇒ Object
- #do_start_observer ⇒ Object
- #do_start_p_data_receiver ⇒ Object
- #do_start_p_metadata_receiver ⇒ Object
- #form_dirpath(path) ⇒ Object
- #handle_add(path) ⇒ Object
- #handle_mod(path) ⇒ Object
- #handle_rem(path) ⇒ Object
- #has_file?(remote) ⇒ Boolean
-
#initialize(opts) ⇒ ManagerImpl
constructor
A new instance of ManagerImpl.
- #receive_advertise(data, from) ⇒ Object
- #receive_request(data, from) ⇒ Object
- #receive_request_all(data, from) ⇒ Object
- #receive_suggestion(data, from) ⇒ Object
- #removed?(remote) ⇒ Boolean
- #updated?(remote) ⇒ Boolean
- #validate(opts) ⇒ Object
Constructor Details
#initialize(opts) ⇒ ManagerImpl
Returns a new instance of ManagerImpl.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/bitbroker/manager_impl.rb', line 3 def initialize(opts) # validate user created arguments validate(opts) ### prepare brokers @config = { :mqconfig => opts[:mqconfig], :label => opts[:name], :dirpath => form_dirpath(opts[:path]), } @metadata = Metadata.new(@config[:dirpath]) @publisher = Publisher.new(@config) @deficients = @suggestions = [] @semaphore = Mutex.new # internal variable in this class to know who modified/crated file @file_activities = [] end |
Instance Method Details
#do_start_collector ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/bitbroker/manager_impl.rb', line 87 def do_start_collector Thread.new do loop do deficient = @deficients.first if deficient != nil candidates = @suggestions.select { |x| x['path'] == deficient['path'] } if candidates.size > 0 candidate = candidates[rand(candidates.size)] @metadata.request(@publisher, [candidate], candidate['from']) @semaphore.synchronize do @suggestions = @suggestions.reject {|x| x['path'] == deficient['path']} @deficients.delete(deficient) end end end Thread.pass end end end |
#do_start_data_receiver ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/bitbroker/manager_impl.rb', line 137 def do_start_data_receiver Thread.new do receiver = Subscriber.new(@config) receiver.recv_data do |binary, from| path = MessagePack.unpack(binary)['path'] Log.debug("[ManagerImpl] (data_receiver) path: #{path}") @file_activities.push(FileActivity.create(path)) Solvant.load_binary(@config[:dirpath], binary) end end end |
#do_start_metadata_receiver ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/bitbroker/manager_impl.rb', line 109 def Thread.new do receiver = Subscriber.new(@config) receiver. do |msg, from| case msg['type'] when Metadata::TYPE_ADVERTISE then receive_advertise(msg['data'], from) when Metadata::TYPE_REQUEST_ALL then receive_request_all(msg['data'], from) end end end end |
#do_start_observer ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/bitbroker/manager_impl.rb', line 32 def do_start_observer def handle_add(path) Log.debug("[ManagerImpl] (handle_add) path:#{path}") rpath = @metadata.get_rpath(path) if obj = @file_activities.find {|x| x.path == rpath} @file_activities.delete(obj) else # create metadata info @metadata.create(rpath) # upload target file Solvant.new(@metadata.dir, rpath).upload(@publisher) end end def handle_mod(path) Log.debug("[ManagerImpl] (handle_mod) path:#{path}") rpath = @metadata.get_rpath(path) if obj = @file_activities.find {|x| x.path == rpath} @file_activities.delete(obj) else # upload target file Solvant.new(@metadata.dir, rpath).upload(@publisher) # update fileinfo @metadata.get_with_path(rpath).update @metadata.advertise(@publisher) end end def handle_rem(path) rpath = @metadata.get_rpath(path) #@metadata.remove_with_path(rpath) file = @metadata.get_with_path(rpath) if file != nil Log.debug("[ManagerImpl] (handle_rem) path:#{path}") file.remove @metadata.advertise(@publisher) end end Thread.new do Observer.new(@config[:dirpath]) do |mod, add, rem| mod.each {|x| handle_mod(x)} add.each {|x| handle_add(x)} rem.each {|x| handle_rem(x)} end end end |
#do_start_p_data_receiver ⇒ Object
151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/bitbroker/manager_impl.rb', line 151 def do_start_p_data_receiver Thread.new do receiver = Subscriber.new(@config) receiver.recv_p_data do |binary, from| path = MessagePack.unpack(binary)['path'] Log.debug("[ManagerImpl] (p_data_receiver) path: #{path}") @file_activities.push(FileActivity.create(path)) Solvant.load_binary(@config[:dirpath], binary) end end end |
#do_start_p_metadata_receiver ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/bitbroker/manager_impl.rb', line 123 def Thread.new do receiver = Subscriber.new(@config) receiver. do |msg, from| case msg['type'] when Metadata::TYPE_SUGGESTION then receive_suggestion(msg['data'], from) when Metadata::TYPE_REQUEST then receive_request(msg['data'], from) end end end end |
#form_dirpath(path) ⇒ Object
25 26 27 |
# File 'lib/bitbroker/manager_impl.rb', line 25 def form_dirpath path path[-1] == '/' ? form_dirpath(path.chop) : path end |
#handle_add(path) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/bitbroker/manager_impl.rb', line 33 def handle_add(path) Log.debug("[ManagerImpl] (handle_add) path:#{path}") rpath = @metadata.get_rpath(path) if obj = @file_activities.find {|x| x.path == rpath} @file_activities.delete(obj) else # create metadata info @metadata.create(rpath) # upload target file Solvant.new(@metadata.dir, rpath).upload(@publisher) end end |
#handle_mod(path) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/bitbroker/manager_impl.rb', line 48 def handle_mod(path) Log.debug("[ManagerImpl] (handle_mod) path:#{path}") rpath = @metadata.get_rpath(path) if obj = @file_activities.find {|x| x.path == rpath} @file_activities.delete(obj) else # upload target file Solvant.new(@metadata.dir, rpath).upload(@publisher) # update fileinfo @metadata.get_with_path(rpath).update @metadata.advertise(@publisher) end end |
#handle_rem(path) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/bitbroker/manager_impl.rb', line 65 def handle_rem(path) rpath = @metadata.get_rpath(path) #@metadata.remove_with_path(rpath) file = @metadata.get_with_path(rpath) if file != nil Log.debug("[ManagerImpl] (handle_rem) path:#{path}") file.remove @metadata.advertise(@publisher) end end |
#has_file?(remote) ⇒ Boolean
223 224 225 |
# File 'lib/bitbroker/manager_impl.rb', line 223 def has_file?(remote) @metadata.get_with_path(remote['path']) != nil end |
#receive_advertise(data, from) ⇒ Object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/bitbroker/manager_impl.rb', line 166 def receive_advertise(data, from) def updated?(remote) case f = @metadata.get_with_path(remote['path']) when nil # this means target file doesn't exist in local. true else f.size != remote['size'] and not f.removed? end end def removed?(remote) case f = @metadata.get_with_path(remote['path']) when nil false else remote['status'].to_i & Metadata::FileInfo::STATUS_REMOVED > 0 end end Log.debug("[ManagerImpl] (receive_advertise) <#{from}> data:#{data}") deficients = [] data.each do |remote| if removed? remote Log.debug("[ManagerImpl] (receive_advertise) remove: #{remote}") # set file_activities @file_activities.push(FileActivity.remove(remote['path'])) # remove FileInfo object which metadata has @metadata.remove_with_path(remote['path']) # remove actual file in local FS Solvant.new(@config[:dirpath], remote['path']).remove else updated? remote deficients.push(remote) fpath = "#{@config[:dirpath]}/#{remote['path']}" if FileTest.exist? fpath Log.debug("[ManagerImpl] trancated(#{fpath}, #{remote['size']})") # truncate files when target file is cut down File.truncate(fpath, remote['size']) end end end # request all deficients files @metadata.request_all(@publisher, deficients) # record deficient files to get it from remote node @semaphore.synchronize do @deficients += deficients end end |
#receive_request(data, from) ⇒ Object
244 245 246 247 248 249 250 251 252 |
# File 'lib/bitbroker/manager_impl.rb', line 244 def receive_request(data, from) Log.debug("[ManagerImpl] (receive_request) <#{from}> data:#{data}") data.each do |remote| f = @metadata.get_with_path(remote['path']) Solvant.new(@config[:dirpath], f.path).upload_to(@publisher, from) end end |
#receive_request_all(data, from) ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/bitbroker/manager_impl.rb', line 222 def receive_request_all(data, from) def has_file?(remote) @metadata.get_with_path(remote['path']) != nil end Log.debug("[ManagerImpl] (receive_request_all) <#{from}> data:#{data}") files = data.map {|f| @metadata.get_with_path(f['path'])}.select{|x| x != nil} if files != [] Log.debug("[ManagerImpl] (receive_request_all) files:#{files}") @metadata.suggestion(@publisher, files.map{|x| x.to_h}, from) end end |
#receive_suggestion(data, from) ⇒ Object
235 236 237 238 239 240 241 242 |
# File 'lib/bitbroker/manager_impl.rb', line 235 def receive_suggestion(data, from) Log.debug("[ManagerImpl] (receive_suggestion) <#{from}> data:#{data}") data.each {|x| x['from'] = from} @semaphore.synchronize do @suggestions += data end end |
#removed?(remote) ⇒ Boolean
177 178 179 180 181 182 183 184 |
# File 'lib/bitbroker/manager_impl.rb', line 177 def removed?(remote) case f = @metadata.get_with_path(remote['path']) when nil false else remote['status'].to_i & Metadata::FileInfo::STATUS_REMOVED > 0 end end |
#updated?(remote) ⇒ Boolean
167 168 169 170 171 172 173 174 175 |
# File 'lib/bitbroker/manager_impl.rb', line 167 def updated?(remote) case f = @metadata.get_with_path(remote['path']) when nil # this means target file doesn't exist in local. true else f.size != remote['size'] and not f.removed? end end |
#validate(opts) ⇒ Object
28 29 30 |
# File 'lib/bitbroker/manager_impl.rb', line 28 def validate(opts) raise InvalidArgument("Specified path is not directory") unless File.directory?(opts[:path]) end |