Class: BitBroker::ManagerImpl

Inherits:
Object
  • Object
show all
Defined in:
lib/bitbroker/manager_impl.rb

Direct Known Subclasses

Manager

Defined Under Namespace

Classes: FileActivity

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ ManagerImpl

Returns a new instance of ManagerImpl.



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/bitbroker/manager_impl.rb', line 3

def initialize(opts)
  # validate user created arguments
  validate(opts)

  ### prepare brokers
  @config = {
    :mqconfig => opts[:mqconfig],
    :label => opts[:name],
    :dirpath => form_dirpath(opts[:path]),
  }

  @metadata = Metadata.new(@config[:dirpath])

  @publisher = Publisher.new(@config)

  @deficients = @suggestions = []
  @semaphore = Mutex.new

  # internal variable in this class to know who modified/crated file
  @file_activities = []
end

Instance Method Details

#do_start_collectorObject



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/bitbroker/manager_impl.rb', line 87

def do_start_collector
  Thread.new do
    loop do
      deficient = @deficients.first
      if deficient != nil
        candidates = @suggestions.select { |x| x['path'] == deficient['path'] }
        if candidates.size > 0
          candidate = candidates[rand(candidates.size)]

          @metadata.request(@publisher, [candidate], candidate['from'])

          @semaphore.synchronize do
            @suggestions = @suggestions.reject {|x| x['path'] == deficient['path']}
            @deficients.delete(deficient)
          end
        end
      end
      Thread.pass
    end
  end
end

#do_start_data_receiverObject



137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/bitbroker/manager_impl.rb', line 137

def do_start_data_receiver
  Thread.new do
    receiver = Subscriber.new(@config)
    receiver.recv_data do |binary, from|
      path = MessagePack.unpack(binary)['path']

      Log.debug("[ManagerImpl] (data_receiver) path: #{path}")

      @file_activities.push(FileActivity.create(path))

      Solvant.load_binary(@config[:dirpath], binary)
    end
  end
end

#do_start_metadata_receiverObject



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/bitbroker/manager_impl.rb', line 109

def 
  Thread.new do
    receiver = Subscriber.new(@config)
    receiver. do |msg, from|
      case msg['type']
      when Metadata::TYPE_ADVERTISE then
        receive_advertise(msg['data'], from)
      when Metadata::TYPE_REQUEST_ALL then
        receive_request_all(msg['data'], from)
      end
    end
  end
end

#do_start_observerObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/bitbroker/manager_impl.rb', line 32

def do_start_observer
  def handle_add(path)
    Log.debug("[ManagerImpl] (handle_add) path:#{path}")

    rpath = @metadata.get_rpath(path)
    if obj = @file_activities.find {|x| x.path == rpath}
      @file_activities.delete(obj)
    else
      # create metadata info
      @metadata.create(rpath)

      # upload target file
      Solvant.new(@metadata.dir, rpath).upload(@publisher)
    end
  end

  def handle_mod(path)
    Log.debug("[ManagerImpl] (handle_mod) path:#{path}")

    rpath = @metadata.get_rpath(path)
    if obj = @file_activities.find {|x| x.path == rpath}
      @file_activities.delete(obj)
    else
      # upload target file
      Solvant.new(@metadata.dir, rpath).upload(@publisher)

      # update fileinfo
      @metadata.get_with_path(rpath).update

      @metadata.advertise(@publisher)
    end
  end

  def handle_rem(path)
    rpath = @metadata.get_rpath(path)

    #@metadata.remove_with_path(rpath)
    file = @metadata.get_with_path(rpath)
    if file != nil
      Log.debug("[ManagerImpl] (handle_rem) path:#{path}")

      file.remove
      @metadata.advertise(@publisher)
    end
  end

  Thread.new do
    Observer.new(@config[:dirpath]) do |mod, add, rem|
      mod.each {|x| handle_mod(x)}
      add.each {|x| handle_add(x)}
      rem.each {|x| handle_rem(x)}
    end
  end
end

#do_start_p_data_receiverObject



151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/bitbroker/manager_impl.rb', line 151

def do_start_p_data_receiver
  Thread.new do
    receiver = Subscriber.new(@config)
    receiver.recv_p_data do |binary, from|
      path = MessagePack.unpack(binary)['path']

      Log.debug("[ManagerImpl] (p_data_receiver) path: #{path}")

      @file_activities.push(FileActivity.create(path))

      Solvant.load_binary(@config[:dirpath], binary)
    end
  end
end

#do_start_p_metadata_receiverObject



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/bitbroker/manager_impl.rb', line 123

def 
  Thread.new do
    receiver = Subscriber.new(@config)
    receiver. do |msg, from|
      case msg['type']
      when Metadata::TYPE_SUGGESTION then
        receive_suggestion(msg['data'], from)
      when Metadata::TYPE_REQUEST then
        receive_request(msg['data'], from)
      end
    end
  end
end

#form_dirpath(path) ⇒ Object



25
26
27
# File 'lib/bitbroker/manager_impl.rb', line 25

def form_dirpath path
    path[-1] == '/' ? form_dirpath(path.chop) : path
end

#handle_add(path) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/bitbroker/manager_impl.rb', line 33

def handle_add(path)
  Log.debug("[ManagerImpl] (handle_add) path:#{path}")

  rpath = @metadata.get_rpath(path)
  if obj = @file_activities.find {|x| x.path == rpath}
    @file_activities.delete(obj)
  else
    # create metadata info
    @metadata.create(rpath)

    # upload target file
    Solvant.new(@metadata.dir, rpath).upload(@publisher)
  end
end

#handle_mod(path) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/bitbroker/manager_impl.rb', line 48

def handle_mod(path)
  Log.debug("[ManagerImpl] (handle_mod) path:#{path}")

  rpath = @metadata.get_rpath(path)
  if obj = @file_activities.find {|x| x.path == rpath}
    @file_activities.delete(obj)
  else
    # upload target file
    Solvant.new(@metadata.dir, rpath).upload(@publisher)

    # update fileinfo
    @metadata.get_with_path(rpath).update

    @metadata.advertise(@publisher)
  end
end

#handle_rem(path) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/bitbroker/manager_impl.rb', line 65

def handle_rem(path)
  rpath = @metadata.get_rpath(path)

  #@metadata.remove_with_path(rpath)
  file = @metadata.get_with_path(rpath)
  if file != nil
    Log.debug("[ManagerImpl] (handle_rem) path:#{path}")

    file.remove
    @metadata.advertise(@publisher)
  end
end

#has_file?(remote) ⇒ Boolean

Returns:

  • (Boolean)


223
224
225
# File 'lib/bitbroker/manager_impl.rb', line 223

def has_file?(remote)
  @metadata.get_with_path(remote['path']) != nil
end

#receive_advertise(data, from) ⇒ Object



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/bitbroker/manager_impl.rb', line 166

def receive_advertise(data, from)
  def updated?(remote)
    case f = @metadata.get_with_path(remote['path'])
    when nil # this means target file doesn't exist in local.
      true
    else
      f.size != remote['size'] and
      not f.removed?
    end
  end

  def removed?(remote)
    case f = @metadata.get_with_path(remote['path'])
    when nil
      false
    else
      remote['status'].to_i & Metadata::FileInfo::STATUS_REMOVED > 0
    end
  end
  Log.debug("[ManagerImpl] (receive_advertise) <#{from}> data:#{data}")

  deficients = []
  data.each do |remote|
    if removed? remote
      Log.debug("[ManagerImpl] (receive_advertise) remove: #{remote}")

      # set file_activities
      @file_activities.push(FileActivity.remove(remote['path']))

      # remove FileInfo object which metadata has
      @metadata.remove_with_path(remote['path'])

      # remove actual file in local FS
      Solvant.new(@config[:dirpath], remote['path']).remove
    else updated? remote
      deficients.push(remote)

      fpath = "#{@config[:dirpath]}/#{remote['path']}"
      if FileTest.exist? fpath
        Log.debug("[ManagerImpl] trancated(#{fpath}, #{remote['size']})")

        # truncate files when target file is cut down
        File.truncate(fpath, remote['size'])
      end
    end
  end

  # request all deficients files
  @metadata.request_all(@publisher, deficients)

  # record deficient files to get it from remote node
  @semaphore.synchronize do
    @deficients += deficients
  end
end

#receive_request(data, from) ⇒ Object



244
245
246
247
248
249
250
251
252
# File 'lib/bitbroker/manager_impl.rb', line 244

def receive_request(data, from)
  Log.debug("[ManagerImpl] (receive_request) <#{from}> data:#{data}")

  data.each do |remote|
    f = @metadata.get_with_path(remote['path'])

    Solvant.new(@config[:dirpath], f.path).upload_to(@publisher, from)
  end
end

#receive_request_all(data, from) ⇒ Object



222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/bitbroker/manager_impl.rb', line 222

def receive_request_all(data, from)
  def has_file?(remote)
    @metadata.get_with_path(remote['path']) != nil
  end
  Log.debug("[ManagerImpl] (receive_request_all) <#{from}> data:#{data}")

  files = data.map {|f| @metadata.get_with_path(f['path'])}.select{|x| x != nil}
  if files != []
    Log.debug("[ManagerImpl] (receive_request_all) files:#{files}")
    @metadata.suggestion(@publisher, files.map{|x| x.to_h}, from)
  end
end

#receive_suggestion(data, from) ⇒ Object



235
236
237
238
239
240
241
242
# File 'lib/bitbroker/manager_impl.rb', line 235

def receive_suggestion(data, from)
  Log.debug("[ManagerImpl] (receive_suggestion) <#{from}> data:#{data}")

  data.each {|x| x['from'] = from}
  @semaphore.synchronize do
    @suggestions += data
  end
end

#removed?(remote) ⇒ Boolean

Returns:

  • (Boolean)


177
178
179
180
181
182
183
184
# File 'lib/bitbroker/manager_impl.rb', line 177

def removed?(remote)
  case f = @metadata.get_with_path(remote['path'])
  when nil
    false
  else
    remote['status'].to_i & Metadata::FileInfo::STATUS_REMOVED > 0
  end
end

#updated?(remote) ⇒ Boolean

Returns:

  • (Boolean)


167
168
169
170
171
172
173
174
175
# File 'lib/bitbroker/manager_impl.rb', line 167

def updated?(remote)
  case f = @metadata.get_with_path(remote['path'])
  when nil # this means target file doesn't exist in local.
    true
  else
    f.size != remote['size'] and
    not f.removed?
  end
end

#validate(opts) ⇒ Object



28
29
30
# File 'lib/bitbroker/manager_impl.rb', line 28

def validate(opts)
  raise InvalidArgument("Specified path is not directory") unless File.directory?(opts[:path])
end