Class: SimpleMapReduce::Server::Job

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
AASM
Defined in:
lib/simple_map_reduce/server/job.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id: nil, map_script:, map_class_name:, reduce_script:, reduce_class_name:, job_input_bucket_name:, job_input_directory_path:, job_output_bucket_name:, job_output_directory_path:, map_worker_url: nil, map_worker: nil, data_store_type: 'default') ⇒ Job

Returns a new instance of Job.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/simple_map_reduce/server/job.rb', line 42

def initialize(id: nil,
               map_script:,
               map_class_name:,
               reduce_script:,
               reduce_class_name:,
               job_input_bucket_name:,
               job_input_directory_path:,
               job_output_bucket_name:,
               job_output_directory_path:,
               map_worker_url: nil,
               map_worker: nil,
               data_store_type: 'default')

  @id = id
  @map_script = map_script&.strip
  @map_class_name = map_class_name&.strip
  @reduce_script = reduce_script&.strip
  @reduce_class_name = reduce_class_name&.strip
  @job_input_bucket_name = job_input_bucket_name&.strip
  @job_input_directory_path = job_input_directory_path&.strip
  @job_output_bucket_name = job_output_bucket_name&.strip
  @job_output_directory_path = job_output_directory_path&.strip
  @map_worker = map_worker
  if @map_worker.nil? && map_worker_url
    @map_worker = SimpleMapReduce::Server::Worker.new(url: map_worker_url)
  end
  @data_store = SimpleMapReduce::DataStoreFactory.create(data_store_type,
                                                         server_url: SimpleMapReduce.job_tracker_url,
                                                         resource_name: 'jobs',
                                                         resource_id: @id)

  unless valid?
    raise ArgumentError, 'invalid Job parameters are detected'
  end
end

Instance Attribute Details

#job_input_bucket_nameObject (readonly)

Returns the value of attribute job_input_bucket_name.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def job_input_bucket_name
  @job_input_bucket_name
end

#job_input_directory_pathObject (readonly)

Returns the value of attribute job_input_directory_path.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def job_input_directory_path
  @job_input_directory_path
end

#job_output_bucket_nameObject (readonly)

Returns the value of attribute job_output_bucket_name.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def job_output_bucket_name
  @job_output_bucket_name
end

#job_output_directory_pathObject (readonly)

Returns the value of attribute job_output_directory_path.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def job_output_directory_path
  @job_output_directory_path
end

#map_class_nameObject (readonly)

Returns the value of attribute map_class_name.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def map_class_name
  @map_class_name
end

#map_scriptObject (readonly)

Returns the value of attribute map_script.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def map_script
  @map_script
end

#map_workerObject (readonly)

Returns the value of attribute map_worker.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def map_worker
  @map_worker
end

#reduce_class_nameObject (readonly)

Returns the value of attribute reduce_class_name.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def reduce_class_name
  @reduce_class_name
end

#reduce_scriptObject (readonly)

Returns the value of attribute reduce_script.



13
14
15
# File 'lib/simple_map_reduce/server/job.rb', line 13

def reduce_script
  @reduce_script
end

Class Method Details

.deserialize(data) ⇒ Object



130
131
132
133
134
# File 'lib/simple_map_reduce/server/job.rb', line 130

def deserialize(data)
  params = Hash[MessagePack.unpack(data).map { |k, v| [k.to_sym, v] }]
  params[:data_store_type] = 'remote'
  new(params)
end

Instance Method Details

#dumpObject



101
102
103
# File 'lib/simple_map_reduce/server/job.rb', line 101

def dump
  to_h.merge(state: state)
end

#idObject



78
79
80
# File 'lib/simple_map_reduce/server/job.rb', line 78

def id
  @id ||= SecureRandom.uuid
end

#map_worker_urlObject



105
106
107
# File 'lib/simple_map_reduce/server/job.rb', line 105

def map_worker_url
  @map_worker&.url
end

#serializeObject



97
98
99
# File 'lib/simple_map_reduce/server/job.rb', line 97

def serialize
  to_h.to_msgpack
end

#to_hObject



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/simple_map_reduce/server/job.rb', line 82

def to_h
  {
    id: id,
    map_script: @map_script,
    map_class_name: @map_class_name,
    reduce_script: @reduce_script,
    reduce_class_name: @reduce_class_name,
    job_input_bucket_name: @job_input_bucket_name,
    job_input_directory_path: @job_input_directory_path,
    job_output_bucket_name: @job_output_bucket_name,
    job_output_directory_path: @job_output_directory_path,
    map_worker_url: @map_worker&.url
  }
end

#update!(event: nil) ⇒ Object

update Job



123
124
125
126
127
# File 'lib/simple_map_reduce/server/job.rb', line 123

def update!(event: nil)
  if event
    public_send(event.to_sym)
  end
end

#valid?Boolean

Returns:

  • (Boolean)


109
110
111
112
113
114
115
116
117
118
# File 'lib/simple_map_reduce/server/job.rb', line 109

def valid?
  !@map_script.to_s.empty? &&
    !@map_class_name.to_s.empty? &&
    !@reduce_script.to_s.empty? &&
    !@reduce_class_name.to_s.empty? &&
    !@job_input_bucket_name.to_s.empty? &&
    !@job_input_directory_path.to_s.empty? &&
    !@job_output_bucket_name.to_s.empty? &&
    !@job_output_directory_path.to_s.empty?
end