Class: Delayed::Backend::SimpleQueueService::Job

Inherits:
Object
  • Object
show all
Includes:
Base
Defined in:
lib/delayed/backend/simple_queue_service.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data = {}) ⇒ Job

Returns a new instance of Job.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/delayed/backend/simple_queue_service.rb', line 8

def initialize data = {}

  if data.is_a? AWS::SQS::ReceivedMessage
    @sqs_message = data
    data = MultiJson.load(@sqs_message.body)
  end

  # print data.to_yaml + "\n\n"

  data.symbolize_keys!
  payload_obj = data.delete(:payload_object) || data.delete(:handler)

  @attributes = data
  self.payload_object = payload_obj

end

Instance Attribute Details

#handlerObject

Returns the value of attribute handler.



6
7
8
# File 'lib/delayed/backend/simple_queue_service.rb', line 6

def handler
  @handler
end

Class Method Details

.clear_locks!(*args) ⇒ Object

No need to check locks



100
101
102
# File 'lib/delayed/backend/simple_queue_service.rb', line 100

def self.clear_locks!(*args)
  true
end

.db_time_nowObject



104
105
106
# File 'lib/delayed/backend/simple_queue_service.rb', line 104

def self.db_time_now
  Time.now.utc
end

.find_available(worker_name, limit = 5, max_run_time = nil) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/delayed/backend/simple_queue_service.rb', line 108

def self.find_available worker_name, limit = 5, max_run_time = nil
  messages = Delayed::Worker.sqs.receive_message :limit => limit, :wait_time_seconds => 20
  objects = []
  if messages
    messages = [messages] unless messages.is_a? Array
    messages.each do |m|
      objects << Delayed::Backend::SimpleQueueService::Job.new(m)
    end
  end
  # print '.'
  objects
end

Instance Method Details

#attemptsObject



81
82
83
# File 'lib/delayed/backend/simple_queue_service.rb', line 81

def attempts
  @attributes[:attempts] || 0
end

#attempts=(num) ⇒ Object



84
85
86
# File 'lib/delayed/backend/simple_queue_service.rb', line 84

def attempts= num
  @attributes[:attempts] = num
end

#destroyObject



92
93
94
# File 'lib/delayed/backend/simple_queue_service.rb', line 92

def destroy
  @sqs_message.delete if @sqs_message
end

#idObject



25
26
27
# File 'lib/delayed/backend/simple_queue_service.rb', line 25

def id
  @sqs_message.id
end

#last_error=(error) ⇒ Object



77
78
79
# File 'lib/delayed/backend/simple_queue_service.rb', line 77

def last_error= error
  @attributes[:last_error] = error
end

#lock_exclusively!(*args) ⇒ Object

No need to check locks



63
64
65
# File 'lib/delayed/backend/simple_queue_service.rb', line 63

def lock_exclusively!(*args)
  true
end

#payload_object=(object) ⇒ Object



29
30
31
32
33
34
35
36
37
# File 'lib/delayed/backend/simple_queue_service.rb', line 29

def payload_object=(object)
  if object.is_a? String
    @payload_object = YAML.load(object)
    self.handler = object
  else
    @payload_object = object
    self.handler = object.to_yaml
  end
end

#reload(*args) ⇒ Object



72
73
74
75
# File 'lib/delayed/backend/simple_queue_service.rb', line 72

def reload(*args)
  # reset
  super
end

#run_at=(ts) ⇒ Object



88
89
90
# File 'lib/delayed/backend/simple_queue_service.rb', line 88

def run_at= ts
  @attributes[:run_at] = ts
end

#saveObject



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/delayed/backend/simple_queue_service.rb', line 39

def save

  payload = MultiJson.dump(@attributes.merge(:payload_object => YAML.dump(payload_object)))

  #puts "[SAVE] #{payload.inspect}"

  @sqs_message.delete if @sqs_message

  Delayed::Worker.sqs.send_message payload, :delay_seconds => [900,(attempts**3)].min

  true
end

#save!Object



52
53
54
# File 'lib/delayed/backend/simple_queue_service.rb', line 52

def save!
  save
end

#unlock(*args) ⇒ Object

No need to check locks



68
69
70
# File 'lib/delayed/backend/simple_queue_service.rb', line 68

def unlock(*args)
  true
end

#update_attributes(attributes) ⇒ Object



56
57
58
59
60
# File 'lib/delayed/backend/simple_queue_service.rb', line 56

def update_attributes(attributes)
  attributes.symbolize_keys!
  @attributes.merge attributes
  save
end