53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
# File 'lib/hakuban/manager.rb', line 53
def initialize(contract, block, retry_backoff, terminate_on_deactivation, on_exception_policy)
@contract = contract
object_class = @contract.class.const_get(:ManagedObject)
@objects_mutex = Mutex.new
@existing_objects = {} @active_objects = {}
@event_queue = Queue.new
@ffi_callback = proc { |descriptor, action|
@event_queue << Event.new(action: action, descriptor: descriptor)
}
@ffi_events = @contract.new_callback_event_queue
@ffi_events.callback_register(&@ffi_callback)
@async = async_run {
while @event_queue and event = @event_queue.shift
@objects_mutex.synchronize {
case event.action
when :insert, :change, :remove, :stopped, :timer
object = @existing_objects[event.descriptor] ||= object_class.new(@contract, event.descriptor)
if !object.running and object.async
error = async_join(object.async)
object.async = nil
if object.last_exception_at
interval_since_last_exception = Time.new - object.last_exception_at
heal_multiplier = 10.0
object.current_delay_index -= (interval_since_last_exception / (retry_backoff[-1]*heal_multiplier)).floor
object.current_delay_index = 0 if object.current_delay_index < 0
end
if error.kind_of? HandlerException
case on_exception_policy
when :raise
raise error.exception
when :retry
object.last_exception_at = Time.new
object.earliest_next_run = Time.new + retry_backoff[object.current_delay_index]
lambda_sleep_time = object.earliest_next_run - Time.new
lambda_descriptor = event.descriptor
async_run {
sleep lambda_sleep_time
@event_queue << Event.new(action: :timer, descriptor: lambda_descriptor) if @event_queue
} object.current_delay_index += 1 if object.current_delay_index < retry_backoff.size - 1
end
end
end
if object.check_active
if block and !object.running and (!object.earliest_next_run or Time.new >= object.earliest_next_run)
descriptor_for_lambda = event.descriptor
object.running = true
object.async = async_run do
async_filter_out_stop_exception {
object.run(block)
}
rescue Exception => e
$stderr.puts "Exception in hakuban manager lambda: #{e}"
$stderr.puts "Contract: "+@contract.inspect
$stderr.puts "Descriptor: "+descriptor_for_lambda.inspect
$stderr.puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
HandlerException.new(exception: e) ensure
object.running = false
@event_queue << Event.new(action: :stopped, descriptor: descriptor_for_lambda) if @event_queue
end
end
@active_objects[event.descriptor] ||= object
object.change
else
@active_objects.delete(event.descriptor)
if object.running
object.stop
async_stop(object.async) if terminate_on_deactivation
else
@existing_objects.delete(event.descriptor)
end
end
when :drop
@ffi_events.callback_unregister
@active_objects.clear
@existing_objects.dup.each { |descriptor, object|
if object.running
object.stop
async_stop(object.async) if terminate_on_deactivation
else
@existing_objects.delete(descriptor)
end
}
while @existing_objects.size > 0
event = @event_queue.shift
@existing_objects.delete(event.descriptor) if event.action == :stopped
end
@event_queue.clear
@contract.drop
@contract, @event_queue = nil, nil
break
end
}
end
}
end
|