Module: BusinessFlow::ClusterLock::ClassMethods

Defined in:
lib/business_flow/cluster_lock.rb

Overview

DSL Methods

Defined Under Namespace

Classes: LockFailure, LockInfo

Constant Summary collapse

RESULT_FINALIZE =
proc do |cluster_lock_info|
  @cluster_lock_info = cluster_lock_info
  self
end

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.acquire_lock(flow, lock_info, payload) ⇒ Object



152
153
154
155
156
157
158
159
# File 'lib/business_flow/cluster_lock.rb', line 152

def self.acquire_lock(flow, lock_info, payload)
  zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers)
  lock = flow.instance_variable_set(
    :@_business_flow_cluster_lock,
    ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name)
  )
  inner_acquire_lock(zk_connection, lock, payload)
end

.cleanup(lock, zk_connection) ⇒ Object



171
172
173
174
# File 'lib/business_flow/cluster_lock.rb', line 171

def self.cleanup(lock, zk_connection)
  lock.unlock if lock
  zk_connection.close! if zk_connection
end

.inner_acquire_lock(zk_connection, lock, payload) ⇒ Object



161
162
163
164
165
166
167
168
169
# File 'lib/business_flow/cluster_lock.rb', line 161

def self.inner_acquire_lock(zk_connection, lock, payload)
  lock_held = lock.lock(wait: false)
  payload[:lock_acquired] = lock_held if payload
  if !lock_held
    zk_connection.close!
    raise LockFailure.new(:lock_unavailable, 'the lock was not available')
  end
  [zk_connection, lock]
end

.instrumented_acquire_lock(flow, lock_info) ⇒ Object



145
146
147
148
149
150
# File 'lib/business_flow/cluster_lock.rb', line 145

def self.instrumented_acquire_lock(flow, lock_info)
  flow.class.instrument(:cluster_lock_setup, flow) do |payload|
    payload[:lock_name] = lock_info.lock_name if payload
    acquire_lock(flow, lock_info, payload)
  end
end

.lock_name(flow) ⇒ Object

:reek:NilCheck



137
138
139
140
141
142
143
# File 'lib/business_flow/cluster_lock.rb', line 137

def self.lock_name(flow)
  lock_name = catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s }
  if lock_name.nil? || lock_name.length == 0
    raise LockFailure.new(:no_lock_name, 'no lock name provided')
  end
  lock_name
end

.with_lock(flow, lock_info, &blk) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/business_flow/cluster_lock.rb', line 176

def self.with_lock(flow, lock_info, &blk)
  zk_connection, lock =
    if !BusinessFlow::ClusterLock.disabled?
      instrumented_acquire_lock(flow, lock_info)
    end
  yield lock_info
rescue ZK::Exceptions::LockAssertionFailedError => exc
  # This would occur if we asserted a cluster lock while executing the flow.
  # This will have set an error on the flow, so we can carry on.
  raise LockFailure.new(:assert_failed, exc.message)
rescue ZK::Exceptions::OperationTimeOut
  # Sometimes this happens. Just let the ensure block take care of everything
ensure
  cleanup(lock, zk_connection)
end

.zookeeper_server_list(flow) ⇒ Object

:reek:NilCheck



128
129
130
131
132
133
134
# File 'lib/business_flow/cluster_lock.rb', line 128

def self.zookeeper_server_list(flow)
  servers = catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s }
  if servers.nil? || servers.length == 0
    raise LockFailure.new(:no_servers, 'no zookeeper servers provided')
  end
  servers
end

Instance Method Details

#add_cluster_luck_info_to_result_classObject



118
119
120
121
122
123
124
125
# File 'lib/business_flow/cluster_lock.rb', line 118

def add_cluster_luck_info_to_result_class
  return if @cluster_lock_info_added
  result_class = const_get(:Result)
  DSL::PublicField.new(:cluster_lock_info).add_to(result_class)
  result_class.send(:define_method, :_business_flow_cluster_lock_finalize,
                    RESULT_FINALIZE)
  @cluster_lock_info_added = true
end

#build(parameter_object) ⇒ Object



96
97
98
99
# File 'lib/business_flow/cluster_lock.rb', line 96

def build(parameter_object)
  add_cluster_luck_info_to_result_class
  super(parameter_object)
end

#default_lock_nameObject



92
93
94
# File 'lib/business_flow/cluster_lock.rb', line 92

def default_lock_name
  proc { self.class.name }
end

#execute(flow) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/business_flow/cluster_lock.rb', line 101

def execute(flow)
  lock_info = LockInfo.new(
    ClassMethods.lock_name(flow),
    ClassMethods.zookeeper_server_list(flow)
  )
  ClassMethods.with_lock(flow, lock_info) do
    super(flow)._business_flow_cluster_lock_finalize(lock_info)
  end
rescue LockFailure => exc
  return result_from(exc.add_to(flow))._business_flow_cluster_lock_finalize(lock_info)
end

#with_cluster_lock(lock_name = nil, opts = {}, &blk) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/business_flow/cluster_lock.rb', line 70

def with_cluster_lock(lock_name = nil, opts = {}, &blk)
  if lock_name.is_a?(String)
    @lock_name = Step.new(Callable.new(proc { lock_name }), {})
  elsif lock_name || blk
    @lock_name = Step.new(Callable.new(lock_name || blk),
                          { default_output: :lock_name }.merge(opts))
  else
    @lock_name ||= Step.new(Callable.new(default_lock_name), opts)
  end
end

#with_zookeeper_servers(servers = nil, opts = {}, &blk) ⇒ Object



81
82
83
84
85
86
87
88
89
90
# File 'lib/business_flow/cluster_lock.rb', line 81

def with_zookeeper_servers(servers = nil, opts = {}, &blk)
  if servers.is_a?(String)
    @zookeeper_servers = Step.new(Callable.new(proc { servers }), {})
  elsif servers || blk
    @zookeeper_servers = Step.new(Callable.new(servers || blk),
                                  { default_output: :zookeeper_servers }.merge(opts))
  else
    @zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts)
  end
end