Class: MapRedus::Process

Inherits:
Object
  • Object
show all
Defined in:
lib/mapredus/process.rb

Overview

This is what keeps track of our map reduce processes

We use a redis key to identify the id of map reduce process the value of the redis object is a json object which contains:

{
  inputter : inputstreamclass,
  mapper : mapclass,
  reducer : reduceclass,
  finalizer : finalizerclass,
  outputter : outputterclass,
  partitioner : <not supported>,
  combiner : <not supported>,
  ordered : true_or_false   ## ensures ordering keys from the map output --> [ order, key, value ],
  synchronous : true_or_false   ## runs the process synchronously or not (generally used for testing)
  result_timeout : lenght of time a result is saved ## 3600 * 24
  key_args : arguments to be added to the key location of the result save (cache location)
  state : the current state of the process (shouldn't be set by the process and starts off as nil)
  type : the original process class ( currently this is needed so we can have namespaces for the result_cache keys )
}

The user has the ability in subclassing this class to create extra features if needed

Constant Summary collapse

READERS =

Public: Keep track of information that may show up as the redis json value

This is so we know exactly what might show up in the json hash
[:pid]
ATTRS =
[:inputter, :mapper, :reducer, :finalizer, :outputter, :ordered, :synchronous, :result_timeout, :key_args, :state, :type]
DEFAULT_TIME =
3600 * 24

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pid, json_info) ⇒ Process

Returns a new instance of Process.



35
36
37
38
# File 'lib/mapredus/process.rb', line 35

def initialize(pid, json_info)
  @pid = pid
  read(json_info)
end

Class Method Details

.createObject

Create sets up a process to be run with the given specification. It saves the information in the FileSystem and returns an instance of the process that run should be called on when running is desired.

Example

process = MapRedus::Process.create
process.run

Returns an instance of the process



309
310
311
312
313
314
315
316
317
# File 'lib/mapredus/process.rb', line 309

def self.create
  new_pid = get_available_pid
  specification = ATTRS.inject({}) do |ret, attr|
    ret[attr] = send(attr)
    ret
  end
  specification[:type] = self
  self.new(new_pid, specification).save
end

.delete_saved_result(*key_args) ⇒ Object

Given a arguments for a result key, delete the result from the filesystem.

Examples

Process.delete_saved_result(key)


413
414
415
# File 'lib/mapredus/process.rb', line 413

def self.delete_saved_result(*key_args)
  FileSystem.del( result_key(*key_args) )
end

.get_available_pidObject

Find out what map reduce processes are out there

Examples

FileSystem::get_available_pid

Returns an avilable pid.



404
405
406
# File 'lib/mapredus/process.rb', line 404

def self.get_available_pid
  FileSystem.incrby(ProcessInfo.processes_count, 1 + rand(20)) 
end

.info(pid) ⇒ Object

This function returns all the redis keys produced associated with a process’s process id.

Example

Process.info(17)

Returns an array of keys associated with the process id.



374
375
376
# File 'lib/mapredus/process.rb', line 374

def self.info(pid)
  FileSystem.keys(ProcessInfo.pid(pid) + "*")
end

.kill(pid) ⇒ Object

Remove redis keys associated with this process if the Master isn’t working.

potentially is very expensive.

Example

Process::kill(pid)
# => true

Returns true on success.



426
427
428
429
430
431
# File 'lib/mapredus/process.rb', line 426

def self.kill(pid)
  num_killed = Master.emancipate(pid)
  proc = Process.open(pid)
  proc.delete if proc
  num_killed
end

.kill_allObject



433
434
435
436
437
438
439
# File 'lib/mapredus/process.rb', line 433

def self.kill_all
  ps.each do |pid|
    kill(pid)
  end
  FileSystem.del(ProcessInfo.processes)
  FileSystem.del(ProcessInfo.processes_count)
end

.open(pid) ⇒ Object

Returns an instance of the process class given the process id. If no such process id exists returns nil.

Example

process = Process.open(17)


383
384
385
386
# File 'lib/mapredus/process.rb', line 383

def self.open(pid)
  spec = Helper.decode( FileSystem.get(ProcessInfo.pid(pid)) )
  spec && self.new( pid, spec )
end

.psObject

Find out what map reduce processes are out there

Examples

FileSystem::ps

Returns a list of the map reduce process ids



394
395
396
# File 'lib/mapredus/process.rb', line 394

def self.ps
  FileSystem.smembers(ProcessInfo.processes)
end

.result_key(*args) ⇒ Object



289
290
291
292
293
# File 'lib/mapredus/process.rb', line 289

def self.result_key(*args)
  key_maker = "#{self.to_s.gsub(/\W/,"_")}_result_cache"
  key_maker = ProcessInfo.respond_to?(key_maker) ? key_maker : "#{MapRedus::Process.to_s.gsub(/\W/,"_")}_result_cache"
  ProcessInfo.send( key_maker, *args )
end

.set_result_key(key_struct) ⇒ Object



295
296
297
# File 'lib/mapredus/process.rb', line 295

def self.set_result_key(key_struct)
  MapRedus.redefine_redis_key( "#{self.to_s.gsub(/\W/,"_")}_result_cache", key_struct )
end

Instance Method Details

#delete(safe = true) ⇒ Object

This will not delete if the master is working It can’t get ahold of the files to shred while the master is working

if safe is set to false, this will delete all the redis stores associated with this process, but will not kill the process from the queue, if it is on the queue. The process operations will fail to work when its data is deleted

Examples

delete(safe)
# => true or false

Returns true as long as the master is not working.



102
103
104
105
106
107
108
109
110
# File 'lib/mapredus/process.rb', line 102

def delete(safe = true)
  return false if (safe && Master.working?(@pid))
  FileSystem.keys("mapredus:process:#{@pid}*").each do |k|
    FileSystem.del(k)
  end        
  FileSystem.srem(ProcessInfo.processes, @pid)
  FileSystem.set(ProcessInfo.processes_count, 0) if( 0 == FileSystem.scard(ProcessInfo.processes) )
  true
end

#each_key_nonreduced_valueObject

Iterates through the key, values

Example

each_key_nonreduced_value(pid)

Returns nothing.



132
133
134
135
136
137
138
# File 'lib/mapredus/process.rb', line 132

def each_key_nonreduced_value
  map_keys.each do |key|
    map_values(key).each do |value|
      yield key, value
    end
  end
end

#each_key_reduced_valueObject

Iterates through the key, values

Example

each_key_reduced_value(pid)

Returns nothing.



118
119
120
121
122
123
124
# File 'lib/mapredus/process.rb', line 118

def each_key_reduced_value
  map_keys.each do |key|
    reduce_values(key).each do |value|
      yield key, value
    end
  end
end

#emit(key, reduce_val) ⇒ Object

The emission associated with a reduce. Currently all reduced values are pushed onto a redis list. It may be the case that we want to directly use a different redis type given the kind of reduce we are doing. Often a reduce only returns one value, so instead of a rpush, we should do a set.

Examples

emit(key, reduced_value)

Returns “OK” on success.



214
215
216
217
# File 'lib/mapredus/process.rb', line 214

def emit(key, reduce_val)
  hashed_key = Helper.hash(key)
  FileSystem.rpush( ProcessInfo.reduce(@pid, hashed_key), reduce_val )
end

#emit_intermediate(*key_value) ⇒ Object

Emissions, when we get map/reduce results back we emit these to be stored in our file system (redis)

key_value - The key, value

Examples

emit_intermediate(key, value)
# => if an ordering is required
emit_intermediate(rank, key, value)

Returns the true on success.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/mapredus/process.rb', line 185

def emit_intermediate(*key_value)
  if( not @ordered )
    key, value = key_value
    FileSystem.sadd( ProcessInfo.keys(@pid), key )
    hashed_key = Helper.hash(key)
    FileSystem.rpush( ProcessInfo.map(@pid, hashed_key), value )
  else
    # if there's an order for the process then we should use a zset above
    # ordered process's map emits [rank, key, value]
    #
    rank, key, value = key_value
    FileSystem.zadd( ProcessInfo.keys(@pid), rank, key )
    hashed_key = Helper.hash(key)
    FileSystem.rpush( ProcessInfo.map(@pid, hashed_key), value )
  end
  raise "Key Collision: key:#{key}, #{key.class} => hashed key:#{hashed_key}" if key_collision?(hashed_key, key)
  true
end

#json_helper(json_info, key) ⇒ Object



55
56
57
# File 'lib/mapredus/process.rb', line 55

def json_helper(json_info, key)
  json_info[key.to_s] || json_info[key.to_sym]
end

#key_collision?(hashed_key, key) ⇒ Boolean

Returns:

  • (Boolean)


219
220
221
222
# File 'lib/mapredus/process.rb', line 219

def key_collision?(hashed_key, key)
  not ( FileSystem.setnx( ProcessInfo.hash_to_key(@pid, hashed_key), key ) ||
        FileSystem.get( ProcessInfo.hash_to_key(@pid, hashed_key) ) == key.to_s )
end

#map_keysObject

Keys that the map operation produced

Examples

map_keys
# =>

Returns the Keys.



231
232
233
234
235
236
237
# File 'lib/mapredus/process.rb', line 231

def map_keys
  if( not @ordered )
    FileSystem.smembers( ProcessInfo.keys(@pid) )
  else
    FileSystem.zrange( ProcessInfo.keys(@pid), 0, -1 )
  end
end

#map_values(key) ⇒ Object

values that the map operation produced, for a key

Examples

map_values(key)
# =>

Returns the values.



254
255
256
257
# File 'lib/mapredus/process.rb', line 254

def map_values(key)
  hashed_key = Helper.hash(key)
  FileSystem.lrange( ProcessInfo.map(@pid, hashed_key), 0, -1 )
end

#next_stateObject

Change the process state if the process is not running and is not synchronous

Examples

process.next_state(pid)

returns the state that the process switched to (or stays the same)



161
162
163
164
165
166
167
168
169
# File 'lib/mapredus/process.rb', line 161

def next_state
  if((not running?) and (not @synchronous))
    new_state = STATE_MACHINE[self.state]
    update(:state => new_state)
    method = "enslave_#{new_state}".to_sym
    Master.send(method, self) if( Master.respond_to?(method) )
    new_state
  end
end

#num_keysObject



239
240
241
242
243
244
245
# File 'lib/mapredus/process.rb', line 239

def num_keys()
  if( not @ordered )
    FileSystem.scard( ProcessInfo.keys(@pid) )
  else
    FileSystem.zcard( ProcessInfo.keys(@pid) )
  end
end

#num_values(key) ⇒ Object



259
260
261
262
# File 'lib/mapredus/process.rb', line 259

def num_values(key)
  hashed_key = Helper.hash(key)
  FileSystem.llen( ProcessInfo.map(@pid, hashed_key) )
end

#read(json_info) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/mapredus/process.rb', line 40

def read(json_info)
  @inputter = Helper.class_get(json_helper(json_info, :inputter))
  @mapper = Helper.class_get(json_helper(json_info, :mapper))
  @reducer = Helper.class_get(json_helper(json_info, :reducer))
  @finalizer = Helper.class_get(json_helper(json_info, :finalizer))
  @ordered = json_helper(json_info, :ordered)
  @synchronous = json_helper(json_info, :synchronous)
  @result_timeout = json_helper(json_info, :result_timeout) || DEFAULT_TIME
  @key_args = json_helper(json_info, :key_args) || []
  @state = json_helper(json_info, :state) || NOT_STARTED
  @outputter = json_helper(json_info, :outputter)
  @outputter = @outputter ? Helper.class_get(@outputter) : MapRedus::Outputter
  @type = Helper.class_get(json_helper(json_info, :type) || Process)
end

#reduce_values(key) ⇒ Object

values that the reduce operation produced, for a key

Examples

reduce_values(key)
# =>

Returns the values.



271
272
273
274
# File 'lib/mapredus/process.rb', line 271

def reduce_values(key)
  hashed_key = Helper.hash(key)
  FileSystem.lrange( ProcessInfo.reduce(@pid, hashed_key), 0, -1 )
end

#reloadObject



85
86
87
88
# File 'lib/mapredus/process.rb', line 85

def reload
  read(Helper.decode(FileSystem.get(ProcessInfo.pid(@pid))))
  self
end

#result_key(*args) ⇒ Object

functions to manage the location of the result in the FileSystem

Examples

process.result_key(extra, arguments)
Process.result_key(all, needed, arguments)
# => "something:that:uses:the:extra:arguments"

SomeProcessSubclass.set_result_key("something:ARG:something:VAR")
# sets the result key for (CAPITAL require arguments to fill in the values)


285
286
287
# File 'lib/mapredus/process.rb', line 285

def result_key(*args)
  Helper.class_get(@type).result_key(*[@key_args, args].flatten)
end

#run(data_object, synchronous = false) ⇒ Object



140
141
142
143
144
# File 'lib/mapredus/process.rb', line 140

def run( data_object, synchronous = false )
  update(:synchronous => synchronous)
  Master.mapreduce( self, data_object )
  true
end

#running?Boolean

TODO: Should also have some notion of whether the process is completed or not since the master might not be working, but the process is not yet complete so it is still running

Returns:

  • (Boolean)


150
151
152
# File 'lib/mapredus/process.rb', line 150

def running?
  Master.working?(@pid)
end

#saveObject



72
73
74
75
76
# File 'lib/mapredus/process.rb', line 72

def save
  FileSystem.sadd( ProcessInfo.processes, @pid ) 
  FileSystem.save( ProcessInfo.pid(@pid), to_json )
  self
end

#to_hashObject



61
62
63
64
65
66
# File 'lib/mapredus/process.rb', line 61

def to_hash
  (ATTRS + READERS).inject({}) do |h, attr|
    h[attr] = send(attr)
    h 
  end
end

#to_jsonObject



68
69
70
# File 'lib/mapredus/process.rb', line 68

def to_json
  Helper.encode(to_hash)
end

#to_sObject



59
# File 'lib/mapredus/process.rb', line 59

def to_s; to_json; end

#update(attrs = {}) ⇒ Object



78
79
80
81
82
83
# File 'lib/mapredus/process.rb', line 78

def update(attrs = {})
  attrs.each do |attr, val|
    send("#{attr}=", val)
  end
  save
end