Class: Skynet::Message
- Inherits:
-
Object
show all
- Includes:
- SkynetDebugger
- Defined in:
- lib/skynet/skynet_message.rb,
lib/skynet/skynet_tuplespace_server.rb
Defined Under Namespace
Classes: BadMessage, Payload
Class Attribute Summary collapse
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
#args_pp, #debug, #debug_header, #error, #fatal, included, #info, #log, #printlog, #stderr, #stdout, #warn
Constructor Details
#initialize(opts) ⇒ Message
Returns a new instance of Message.
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/skynet/skynet_message.rb', line 52
def initialize(opts)
if opts.is_a?(Array)
self.class.fields.each_with_index do |field, ii|
self.send("#{field}=",opts[ii] || nil)
end
elsif opts
self.class.fields.each do |field|
value = opts[field] || opts[field.to_s] || nil
self.send("#{field}=",value) if value
end
opts_raw_payload = opts[:raw_payload] || opts["raw_payload"]
if opts_raw_payload
self.raw_payload = opts_raw_payload
end
self.retry ||= 0
end
self.payload
end
|
Class Attribute Details
.fields ⇒ Object
Returns the value of attribute fields.
9
10
11
|
# File 'lib/skynet/skynet_message.rb', line 9
def fields
@fields
end
|
Instance Attribute Details
#payload_type ⇒ Object
Returns the value of attribute payload_type.
33
34
35
|
# File 'lib/skynet/skynet_message.rb', line 33
def payload_type
@payload_type
end
|
#tasktype ⇒ Object
Returns the value of attribute tasktype.
33
34
35
|
# File 'lib/skynet/skynet_message.rb', line 33
def tasktype
@tasktype
end
|
Class Method Details
.error_message(message, error) ⇒ Object
203
204
205
|
# File 'lib/skynet/skynet_message.rb', line 203
def self.error_message(message,error)
result_message(message,error,:result,:error)
end
|
.error_template(message) ⇒ Object
211
212
213
214
215
216
217
218
219
220
221
222
|
# File 'lib/skynet/skynet_message.rb', line 211
def self.error_template(message)
template = {
:tasktype => message.tasktype,
:drburi => message.drburi,
:version => message.version,
:task_id => message.task_id,
:queue_id => message.queue_id
}
fields.collect do |field|
template[field]
end
end
|
.fallback_task_message(message) ⇒ Object
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
# File 'lib/skynet/skynet_message.rb', line 228
def self.fallback_task_message(message)
template = {}
if message.retry
if (message.retry and message.iteration >= message.retry)
template[:iteration] = -1
else
template[:iteration] = message.iteration + 1
end
elsif message.payload_type.to_sym == :master and Skynet::CONFIG[:DEFAULT_MASTER_RETRY] and message.iteration >= Skynet::CONFIG[:DEFAULT_MASTER_RETRY]
template[:iteration] = -1
elsif Skynet::CONFIG[:MAX_RETRIES] and message.iteration >= Skynet::CONFIG[:MAX_RETRIES]
template[:iteration] = -1
else
template[:iteration] = message.iteration + 1
end
template[:expire_time] = Time.now.to_i + message.expiry
fields.each do |field|
template[field] = message.send(field) unless template.has_key?(field)
end
Skynet::Message.new(template)
end
|
.fallback_template(message) ⇒ Object
258
259
260
261
262
263
264
265
266
267
268
269
270
|
# File 'lib/skynet/skynet_message.rb', line 258
def self.fallback_template(message)
template = {
:tasktype => message.tasktype,
:drburi => message.drburi,
:version => message.version,
:task_id => message.task_id,
:queue_id => message.queue_id,
:iteration => (1..Skynet::CONFIG[:MAX_RETRIES]),
}
fields.collect do |field|
template[field]
end
end
|
.new_task_message(task, job) ⇒ Object
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/skynet/skynet_message.rb', line 35
def self.new_task_message(task,job)
self.new(
:job_id => job.job_id,
:expire_time => job.start_after,
:version => job.version,
:queue_id => job.queue_id || 0,
:iteration => 0,
:tasktype => :task,
:task_id => task.task_id,
:payload => task,
:payload_type => task.task_or_master,
:expiry => task.result_timeout,
:name => task.name,
:retry => task.retry
)
end
|
.next_task_template(version = nil, payload_type = nil, queue_id = 0) ⇒ Object
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
# File 'lib/skynet/skynet_message.rb', line 140
def self.next_task_template(version=nil, payload_type=nil, queue_id=0)
template = {
:expire_time => (0 .. Time.now.to_i),
:tasktype => :task,
:queue_id => queue_id,
:version => version,
:payload_type => payload_type,
:iteration => (0..Skynet::CONFIG[:MAX_RETRIES]),
}
fields.collect do |field|
template[field]
end
end
|
.outstanding_results_template(queue_id = 0) ⇒ Object
193
194
195
196
197
198
199
200
201
|
# File 'lib/skynet/skynet_message.rb', line 193
def self.outstanding_results_template(queue_id=0)
template = {
:tasktype => :result,
:queue_id => queue_id
}
fields.collect do |field|
template[field]
end
end
|
.outstanding_tasks_template(iteration = nil, queue_id = 0) ⇒ Object
182
183
184
185
186
187
188
189
190
191
|
# File 'lib/skynet/skynet_message.rb', line 182
def self.outstanding_tasks_template(iteration=nil,queue_id=0)
template = {
:tasktype => :task,
:queue_id => queue_id,
:iteration => iteration
}
fields.collect do |field|
template[field]
end
end
|
.result_message(message, result, tasktype = :result, resulttype = :result) ⇒ Object
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/skynet/skynet_message.rb', line 165
def self.result_message(message,result,tasktype=:result, resulttype=:result)
template = {
:tasktype => tasktype,
:payload => result,
:payload_type => resulttype
}
fields.each do |field|
template[field] = message.send(field) unless template.has_key?(field)
end
new(template)
end
|
.result_template(job_id, tasktype = :result) ⇒ Object
155
156
157
158
159
160
161
162
163
|
# File 'lib/skynet/skynet_message.rb', line 155
def self.result_template(job_id,tasktype=:result)
template = {
:tasktype => tasktype,
:job_id => job_id
}
fields.collect do |field|
template[field]
end
end
|
Instance Method Details
#[](ii) ⇒ Object
110
111
112
|
# File 'lib/skynet/skynet_message.rb', line 110
def [](ii)
send(self.class.fields[ii])
end
|
#error_message(error) ⇒ Object
207
208
209
|
# File 'lib/skynet/skynet_message.rb', line 207
def error_message(error)
self.class.error_message(self,error)
end
|
#error_template ⇒ Object
224
225
226
|
# File 'lib/skynet/skynet_message.rb', line 224
def error_template
self.class.error_template(self)
end
|
#fallback_task_message ⇒ Object
254
255
256
|
# File 'lib/skynet/skynet_message.rb', line 254
def fallback_task_message
self.class.fallback_task_message(self)
end
|
#fallback_template ⇒ Object
272
273
274
|
# File 'lib/skynet/skynet_message.rb', line 272
def fallback_template
self.class.fallback_template(self)
end
|
#fields ⇒ Object
71
72
73
|
# File 'lib/skynet/skynet_message.rb', line 71
def fields
self.class.fields
end
|
#payload ⇒ Object
93
94
95
96
97
98
99
|
# File 'lib/skynet/skynet_message.rb', line 93
def payload
@payload ||= begin
YAML::load(self.raw_payload) if self.raw_payload
rescue Exception => e
raise BadMessage.new("Couldnt marshal payload #{e.inspect} #{e.backtrace.join("\n")}")
end
end
|
#payload=(data) ⇒ Object
88
89
90
91
|
# File 'lib/skynet/skynet_message.rb', line 88
def payload=(data)
@payload = data
self.raw_payload = data.to_yaml if data.respond_to?(:to_yaml) and not payload.kind_of?(Proc)
end
|
#raw_payload ⇒ Object
106
107
108
|
# File 'lib/skynet/skynet_message.rb', line 106
def raw_payload
@raw_payload
end
|
#raw_payload=(data) ⇒ Object
101
102
103
104
|
# File 'lib/skynet/skynet_message.rb', line 101
def raw_payload=(data)
@raw_payload = data
@payload=nil
end
|
#result_message(result, tasktype = :result, resulttype = :result) ⇒ Object
178
179
180
|
# File 'lib/skynet/skynet_message.rb', line 178
def result_message(result,tasktype=:result, resulttype=:result)
self.class.result_message(self,result,tasktype,resulttype)
end
|
#task ⇒ Object
84
85
86
|
# File 'lib/skynet/skynet_message.rb', line 84
def task
payload
end
|
#timeout ⇒ Object
136
137
138
|
# File 'lib/skynet/skynet_message.rb', line 136
def timeout
expire_time * 2
end
|
#to_a ⇒ Object
114
115
116
117
118
|
# File 'lib/skynet/skynet_message.rb', line 114
def to_a
self.class.fields.collect do |field|
self.send(field)
end
end
|
#to_h ⇒ Object
128
129
130
|
# File 'lib/skynet/skynet_message.rb', line 128
def to_h
to_hash
end
|
#to_hash ⇒ Object
120
121
122
123
124
125
126
|
# File 'lib/skynet/skynet_message.rb', line 120
def to_hash
hash = {}
self.class.fields.each do |field|
hash[field] = self.send(field)
end
hash
end
|
#to_s ⇒ Object
132
133
134
|
# File 'lib/skynet/skynet_message.rb', line 132
def to_s
to_a
end
|