Class: Hakuban::ObjectManager

Inherits:
Object
  • Object
show all
Defined in:
lib/hakuban/manager.rb

Direct Known Subclasses

AsyncObjectManager, ThreadObjectManager

Defined Under Namespace

Classes: Event, HandlerException

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(contract, block, retry_backoff, terminate_on_deactivation, on_exception_policy) ⇒ ObjectManager

Returns a new instance of ObjectManager.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/hakuban/manager.rb', line 53

def initialize(contract, block, retry_backoff, terminate_on_deactivation, on_exception_policy)
	@contract = contract
	object_class = @contract.class.const_get(:ManagedObject)
	@objects_mutex = Mutex.new
	@existing_objects = {}  #TODO: turn this into a WeakMap, to keep object if external code still holds reference to it
	@active_objects = {}
	@event_queue = Queue.new

	# This callback gets called from a separate thread, with no async reactor running.
	# So, we only use it to forward actions to the main thread.
	@ffi_callback = proc { |descriptor, action|
		@event_queue << Event.new(action: action, descriptor: descriptor)
	}
	@ffi_events = @contract.new_callback_event_queue
	@ffi_events.callback_register(&@ffi_callback)

	@async = async_run {
		while @event_queue and event = @event_queue.shift
			@objects_mutex.synchronize {
				case event.action
				when :insert, :change, :remove, :stopped, :timer
					object = @existing_objects[event.descriptor] ||= object_class.new(@contract, event.descriptor)
					if !object.running and object.async
						error = async_join(object.async)
						object.async = nil
						if object.last_exception_at
							interval_since_last_exception = Time.new - object.last_exception_at
							heal_multiplier = 10.0
							object.current_delay_index -= (interval_since_last_exception / (retry_backoff[-1]*heal_multiplier)).floor
							object.current_delay_index = 0  if object.current_delay_index < 0
						end
						if error.kind_of? HandlerException
							case on_exception_policy
							when :raise
								raise error.exception
							when :retry
								object.last_exception_at = Time.new
								object.earliest_next_run = Time.new + retry_backoff[object.current_delay_index]
								lambda_sleep_time = object.earliest_next_run - Time.new
								lambda_descriptor = event.descriptor
								async_run { 
									sleep lambda_sleep_time
									@event_queue << Event.new(action: :timer, descriptor: lambda_descriptor)  if @event_queue
								}  #TODO: do we have to join this?
								object.current_delay_index += 1  if object.current_delay_index < retry_backoff.size - 1
							#when :ignore_failing_descriptor
							#when :drop_contract
							end
						end
					end
					if object.check_active
						if block and !object.running and (!object.earliest_next_run or Time.new >= object.earliest_next_run)
							descriptor_for_lambda = event.descriptor
							object.running = true
					 		object.async = async_run do
								async_filter_out_stop_exception {
					 				object.run(block)
								}
							rescue Exception => e
								$stderr.puts "Exception in hakuban manager lambda: #{e}"
								$stderr.puts "Contract: "+@contract.inspect
								$stderr.puts "Descriptor: "+descriptor_for_lambda.inspect
								$stderr.puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
								HandlerException.new(exception: e)  # exception has to wrapped here to stop async from throwing on join
							ensure
								object.running = false
					 			@event_queue << Event.new(action: :stopped, descriptor: descriptor_for_lambda)  if @event_queue
							end
						end
						@active_objects[event.descriptor] ||= object
						object.change
					else
						@active_objects.delete(event.descriptor)
						if object.running
							object.stop
							async_stop(object.async)  if terminate_on_deactivation
						else
							@existing_objects.delete(event.descriptor)
						end
					end
				when :drop
					@ffi_events.callback_unregister
					@active_objects.clear
					@existing_objects.dup.each { |descriptor, object| 
						if object.running
							object.stop
							async_stop(object.async)  if terminate_on_deactivation
						else
							@existing_objects.delete(descriptor)
						end
					}
					while @existing_objects.size > 0
						event = @event_queue.shift
						@existing_objects.delete(event.descriptor)  if event.action == :stopped
					end
					@event_queue.clear
					@contract.drop
					@contract, @event_queue = nil, nil
					break
				end
			}
		end
	}
end

Instance Attribute Details

#contractObject (readonly)

Returns the value of attribute contract.



47
48
49
# File 'lib/hakuban/manager.rb', line 47

def contract
  @contract
end

Instance Method Details

#dropObject



173
174
175
176
# File 'lib/hakuban/manager.rb', line 173

def drop
	drop_nonblock
	async_join(@async)
end

#drop_nonblockObject



179
180
181
182
183
# File 'lib/hakuban/manager.rb', line 179

def drop_nonblock
	if @contract
		@event_queue << Event.new(action: :drop)
	end
end

#objectObject



166
167
168
169
170
# File 'lib/hakuban/manager.rb', line 166

def object
	@objects_mutex.synchronize {
		@active_objects.values.first
	}
end

#objectsObject



159
160
161
162
163
# File 'lib/hakuban/manager.rb', line 159

def objects
	@objects_mutex.synchronize {
		@active_objects.dup
	}
end