Module: BusinessFlow::ClusterLock::ClassMethods
- Defined in:
- lib/business_flow/cluster_lock.rb
Overview
Defined Under Namespace
Classes: LockFailure, LockInfo
Constant Summary
collapse
- RESULT_FINALIZE =
proc do |cluster_lock_info|
@cluster_lock_info = cluster_lock_info
self
end
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.acquire_lock(flow, lock_info, payload) ⇒ Object
152
153
154
155
156
157
158
159
|
# File 'lib/business_flow/cluster_lock.rb', line 152
def self.acquire_lock(flow, lock_info, payload)
zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers)
lock = flow.instance_variable_set(
:@_business_flow_cluster_lock,
ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name)
)
inner_acquire_lock(zk_connection, lock, payload)
end
|
.cleanup(lock, zk_connection) ⇒ Object
171
172
173
174
|
# File 'lib/business_flow/cluster_lock.rb', line 171
def self.cleanup(lock, zk_connection)
lock.unlock if lock
zk_connection.close! if zk_connection
end
|
.inner_acquire_lock(zk_connection, lock, payload) ⇒ Object
161
162
163
164
165
166
167
168
169
|
# File 'lib/business_flow/cluster_lock.rb', line 161
def self.inner_acquire_lock(zk_connection, lock, payload)
lock_held = lock.lock(wait: false)
payload[:lock_acquired] = lock_held if payload
if !lock_held
zk_connection.close!
raise LockFailure.new(:lock_unavailable, 'the lock was not available')
end
[zk_connection, lock]
end
|
.instrumented_acquire_lock(flow, lock_info) ⇒ Object
145
146
147
148
149
150
|
# File 'lib/business_flow/cluster_lock.rb', line 145
def self.instrumented_acquire_lock(flow, lock_info)
flow.class.instrument(:cluster_lock_setup, flow) do |payload|
payload[:lock_name] = lock_info.lock_name if payload
acquire_lock(flow, lock_info, payload)
end
end
|
.lock_name(flow) ⇒ Object
137
138
139
140
141
142
143
|
# File 'lib/business_flow/cluster_lock.rb', line 137
def self.lock_name(flow)
lock_name = catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s }
if lock_name.nil? || lock_name.length == 0
raise LockFailure.new(:no_lock_name, 'no lock name provided')
end
lock_name
end
|
.with_lock(flow, lock_info, &blk) ⇒ Object
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
|
# File 'lib/business_flow/cluster_lock.rb', line 176
def self.with_lock(flow, lock_info, &blk)
zk_connection, lock =
if !BusinessFlow::ClusterLock.disabled?
instrumented_acquire_lock(flow, lock_info)
end
yield lock_info
rescue ZK::Exceptions::LockAssertionFailedError => exc
raise LockFailure.new(:assert_failed, exc.message)
rescue ZK::Exceptions::OperationTimeOut
ensure
cleanup(lock, zk_connection)
end
|
.zookeeper_server_list(flow) ⇒ Object
128
129
130
131
132
133
134
|
# File 'lib/business_flow/cluster_lock.rb', line 128
def self.zookeeper_server_list(flow)
servers = catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s }
if servers.nil? || servers.length == 0
raise LockFailure.new(:no_servers, 'no zookeeper servers provided')
end
servers
end
|
Instance Method Details
#add_cluster_luck_info_to_result_class ⇒ Object
118
119
120
121
122
123
124
125
|
# File 'lib/business_flow/cluster_lock.rb', line 118
def add_cluster_luck_info_to_result_class
return if @cluster_lock_info_added
result_class = const_get(:Result)
DSL::PublicField.new(:cluster_lock_info).add_to(result_class)
result_class.send(:define_method, :_business_flow_cluster_lock_finalize,
RESULT_FINALIZE)
@cluster_lock_info_added = true
end
|
#build(parameter_object) ⇒ Object
96
97
98
99
|
# File 'lib/business_flow/cluster_lock.rb', line 96
def build(parameter_object)
add_cluster_luck_info_to_result_class
super(parameter_object)
end
|
#default_lock_name ⇒ Object
92
93
94
|
# File 'lib/business_flow/cluster_lock.rb', line 92
def default_lock_name
proc { self.class.name }
end
|
#execute(flow) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/business_flow/cluster_lock.rb', line 101
def execute(flow)
lock_info = LockInfo.new(
ClassMethods.lock_name(flow),
ClassMethods.zookeeper_server_list(flow)
)
ClassMethods.with_lock(flow, lock_info) do
super(flow)._business_flow_cluster_lock_finalize(lock_info)
end
rescue LockFailure => exc
return result_from(exc.add_to(flow))._business_flow_cluster_lock_finalize(lock_info)
end
|
#with_cluster_lock(lock_name = nil, opts = {}, &blk) ⇒ Object
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/business_flow/cluster_lock.rb', line 70
def with_cluster_lock(lock_name = nil, opts = {}, &blk)
if lock_name.is_a?(String)
@lock_name = Step.new(Callable.new(proc { lock_name }), {})
elsif lock_name || blk
@lock_name = Step.new(Callable.new(lock_name || blk),
{ default_output: :lock_name }.merge(opts))
else
@lock_name ||= Step.new(Callable.new(default_lock_name), opts)
end
end
|
#with_zookeeper_servers(servers = nil, opts = {}, &blk) ⇒ Object
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/business_flow/cluster_lock.rb', line 81
def with_zookeeper_servers(servers = nil, opts = {}, &blk)
if servers.is_a?(String)
@zookeeper_servers = Step.new(Callable.new(proc { servers }), {})
elsif servers || blk
@zookeeper_servers = Step.new(Callable.new(servers || blk),
{ default_output: :zookeeper_servers }.merge(opts))
else
@zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts)
end
end
|