Class: VCAP::Services::Base::AsyncJob::Serialization::SerializationJob

Inherits:
Object
  • Object
show all
Includes:
Resque::Plugins::Status, VCAP::Services::Base::AsyncJob::Serialization, VCAP::Services::Base::AsyncJob::Snapshot, Error
Defined in:
lib/base/job/serialization.rb

Constant Summary

Constants included from VCAP::Services::Base::AsyncJob::Snapshot

VCAP::Services::Base::AsyncJob::Snapshot::FILTER_KEYS, VCAP::Services::Base::AsyncJob::Snapshot::MAX_NAME_LENGTH, VCAP::Services::Base::AsyncJob::Snapshot::SNAPSHOT_ID, VCAP::Services::Base::AsyncJob::Snapshot::SNAPSHOT_KEY_PREFIX

Constants included from VCAP::Services::Base::AsyncJob::Serialization

SERIALIZATION_KEY_PREFIX

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Error

#failure, #internal_fail, #parse_msg, #success, #timeout_fail

Methods included from VCAP::Services::Base::AsyncJob::Snapshot

#client, #delete_snapshot, #filter_keys, #fmt_time, #new_snapshot_id, redis_connect, redis_init, #save_snapshot, #service_snapshots, #service_snapshots_count, #snapshot_details, #snapshot_filepath, #update_name

Methods included from VCAP::Services::Base::AsyncJob::Serialization

#fmt_error, redis_connect, #redis_key

Constructor Details

#initialize(*args) ⇒ SerializationJob

Returns a new instance of SerializationJob.



58
59
60
61
62
63
64
# File 'lib/base/job/serialization.rb', line 58

def initialize(*args)
  super(*args)
  parse_config
  init_worker_logger
  Serialization.redis_connect
  Snapshot.redis_connect
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



34
35
36
# File 'lib/base/job/serialization.rb', line 34

def name
  @name
end

Class Method Details

.queue_lookup_keyObject



43
44
45
# File 'lib/base/job/serialization.rb', line 43

def queue_lookup_key
  :node_id
end

.select_queue(*args) ⇒ Object



47
48
49
50
51
52
53
54
55
# File 'lib/base/job/serialization.rb', line 47

def select_queue(*args)
  result = nil
  args.each do |arg|
    result = arg[queue_lookup_key]if (arg.is_a? Hash )&& (arg.has_key?(queue_lookup_key))
  end
  @logger = Config.logger
  @logger.info("Select queue #{result} for job #{self.class} with args:#{args.inspect}") if @logger
  result
end

Instance Method Details

#create_lockObject



66
67
68
69
70
71
# File 'lib/base/job/serialization.rb', line 66

def create_lock
  lock_name = "lock:lifecycle:#{name}"
  ttl = @config["job_ttl"] || 600
  lock = Lock.new(lock_name, :logger => @logger, :ttl => ttl)
  lock
end

#delete_download_token(name, snapshot_id) ⇒ Object



96
97
98
99
100
# File 'lib/base/job/serialization.rb', line 96

def delete_download_token(name, snapshot_id)
  snapshot = snapshot_details(name, snapshot_id)
  res = snapshot.delete("token")
  save_snapshot(name, snapshot) if res
end

#get_dump_path(name, snapshot_id) ⇒ Object



119
120
121
# File 'lib/base/job/serialization.rb', line 119

def get_dump_path(name, snapshot_id)
  snapshot_filepath(@config["snapshots_base_dir"], @config["service_name"], name, snapshot_id)
end

#handle_error(e) ⇒ Object



77
78
79
80
81
82
# File 'lib/base/job/serialization.rb', line 77

def handle_error(e)
  @logger.error("Error in #{self.class} uuid:#{@uuid}: #{fmt_error(e)}")
  err = (e.instance_of?(ServiceError)? e : ServiceError.new(ServiceError::INTERNAL_ERROR)).to_hash
  err_msg = Yajl::Encoder.encode(err["msg"])
  failed(err_msg)
end

#init_worker_loggerObject



73
74
75
# File 'lib/base/job/serialization.rb', line 73

def init_worker_logger
  @logger = Config.logger
end

#parse_configObject



102
103
104
105
# File 'lib/base/job/serialization.rb', line 102

def parse_config
  @config = Yajl::Parser.parse(ENV['WORKER_CONFIG'])
  raise "Need environment variable: WORKER_CONFIG" unless @config
end

#required_options(*args) ⇒ Object

Raises:

  • (ArgumentError)


84
85
86
87
# File 'lib/base/job/serialization.rb', line 84

def required_options(*args)
  missing_opts = args.select{|arg| !options.has_key? arg.to_s}
  raise ArgumentError, "Missing #{missing_opts.join(', ')} in options: #{options.inspect}" unless missing_opts.empty?
end

#snapshot_filename(name, snapshot_id) ⇒ Object

The name for the saved snapshot file. Subclass can override this method to customize file name.



115
116
117
# File 'lib/base/job/serialization.rb', line 115

def snapshot_filename(name, snapshot_id)
  "#{name}.gz"
end

#update_download_token(name, snapshot_id, token) ⇒ Object

Update the download token for a service snapshot



90
91
92
93
94
# File 'lib/base/job/serialization.rb', line 90

def update_download_token(name, snapshot_id, token)
  snapshot = snapshot_details(name, snapshot_id)
  snapshot["token"] = token
  save_snapshot(name, snapshot)
end

#validate_input(files, manifest) ⇒ Object

Validate the serialized data file. Sub class should override this method to supply specific validation.



109
110
111
112
# File 'lib/base/job/serialization.rb', line 109

def validate_input(files, manifest)
  raise "Doesn't contains any snapshot file." if files.empty?
  true
end