Class: SimpleMapReduce::Server::Job

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
AASM
Defined in:
lib/simple_map_reduce/server/job.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, id: nil, map_worker_url: nil, map_worker: nil, data_store_type: 'default') ⇒ Job

Returns a new instance of Job.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/simple_map_reduce/server/job.rb', line 42

def initialize(map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:,
               job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:,
               id: nil,
               map_worker_url: nil,
               map_worker: nil,
               data_store_type: 'default')

  @id = id
  @map_script = map_script&.strip
  @map_class_name = map_class_name&.strip
  @reduce_script = reduce_script&.strip
  @reduce_class_name = reduce_class_name&.strip
  @job_input_bucket_name = job_input_bucket_name&.strip
  @job_input_directory_path = job_input_directory_path&.strip
  @job_output_bucket_name = job_output_bucket_name&.strip
  @job_output_directory_path = job_output_directory_path&.strip
  @map_worker = map_worker
  if @map_worker.nil? && map_worker_url
    @map_worker = SimpleMapReduce::Server::Worker.new(url: map_worker_url)
  end
  @data_store = SimpleMapReduce::DataStoreFactory.create(data_store_type,
                                                         server_url: SimpleMapReduce.job_tracker_url,
                                                         resource_name: 'jobs',
                                                         resource_id: @id)

  unless valid?
    raise ArgumentError, 'invalid Job parameters are detected'
  end
end

Instance Attribute Details

#job_input_bucket_nameObject (readonly)

Returns the value of attribute job_input_bucket_name.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def job_input_bucket_name
  @job_input_bucket_name
end

#job_input_directory_pathObject (readonly)

Returns the value of attribute job_input_directory_path.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def job_input_directory_path
  @job_input_directory_path
end

#job_output_bucket_nameObject (readonly)

Returns the value of attribute job_output_bucket_name.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def job_output_bucket_name
  @job_output_bucket_name
end

#job_output_directory_pathObject (readonly)

Returns the value of attribute job_output_directory_path.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def job_output_directory_path
  @job_output_directory_path
end

#map_class_nameObject (readonly)

Returns the value of attribute map_class_name.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def map_class_name
  @map_class_name
end

#map_scriptObject (readonly)

Returns the value of attribute map_script.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def map_script
  @map_script
end

#map_workerObject (readonly)

Returns the value of attribute map_worker.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def map_worker
  @map_worker
end

#reduce_class_nameObject (readonly)

Returns the value of attribute reduce_class_name.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def reduce_class_name
  @reduce_class_name
end

#reduce_scriptObject (readonly)

Returns the value of attribute reduce_script.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def reduce_script
  @reduce_script
end

Class Method Details

.deserialize(data) ⇒ Object



124
125
126
127
128
# File 'lib/simple_map_reduce/server/job.rb', line 124

def deserialize(data)
  params = MessagePack.unpack(data).transform_keys(&:to_sym)
  params[:data_store_type] = 'remote'
  new(params)
end

Instance Method Details

#dumpObject



95
96
97
# File 'lib/simple_map_reduce/server/job.rb', line 95

def dump
  to_h.merge(state: state)
end

#idObject



72
73
74
# File 'lib/simple_map_reduce/server/job.rb', line 72

def id
  @id ||= SecureRandom.uuid
end

#map_worker_urlObject



99
100
101
# File 'lib/simple_map_reduce/server/job.rb', line 99

def map_worker_url
  @map_worker&.url
end

#serializeObject



91
92
93
# File 'lib/simple_map_reduce/server/job.rb', line 91

def serialize
  to_h.to_msgpack
end

#to_hObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/simple_map_reduce/server/job.rb', line 76

def to_h
  {
    id: id,
    map_script: @map_script,
    map_class_name: @map_class_name,
    reduce_script: @reduce_script,
    reduce_class_name: @reduce_class_name,
    job_input_bucket_name: @job_input_bucket_name,
    job_input_directory_path: @job_input_directory_path,
    job_output_bucket_name: @job_output_bucket_name,
    job_output_directory_path: @job_output_directory_path,
    map_worker_url: @map_worker&.url
  }
end

#update!(event: nil) ⇒ Object

update Job



117
118
119
120
121
# File 'lib/simple_map_reduce/server/job.rb', line 117

def update!(event: nil)
  if event
    public_send(event.to_sym)
  end
end

#valid?Boolean

Returns:

  • (Boolean)


103
104
105
106
107
108
109
110
111
112
# File 'lib/simple_map_reduce/server/job.rb', line 103

def valid?
  !@map_script.to_s.empty? &&
    !@map_class_name.to_s.empty? &&
    !@reduce_script.to_s.empty? &&
    !@reduce_class_name.to_s.empty? &&
    !@job_input_bucket_name.to_s.empty? &&
    !@job_input_directory_path.to_s.empty? &&
    !@job_output_bucket_name.to_s.empty? &&
    !@job_output_directory_path.to_s.empty?
end