Class: Tupelo::Client::Transaction

Inherits:
Object
  • Object
show all
Defined in:
lib/tupelo/util/boolean.rb,
lib/tupelo/client/transaction.rb

Defined Under Namespace

Classes: TransactionThread

Constant Summary collapse

STATES =
[
  OPEN      = :open,    # initial state
  CLOSED    = :closed,  # client thread changes open -> closed
                        # after closed, client cannot touch any state
  PENDING   = :pending, # worker thread changes closed -> pending | failed
  DONE      = :done,    # worker thread changes pending -> done (terminal)
  FAILED    = :failed   # worker thread changes pending -> failed (terminal)
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, deadline: nil) ⇒ Transaction

Returns a new instance of Transaction.



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/tupelo/client/transaction.rb', line 129

def initialize client, deadline: nil
  @client = client
  @worker = client.worker
  @log = client.log
  @deadline = deadline
  @global_tick = nil
  @exception = nil
  @local_tick = nil
  @queue = client.make_queue
  @mutex = Mutex.new
  @writes = []
  @pulses = []
  @take_templates = []
  @read_templates = []
  @take_tuples_for_remote = []
  @take_tuples_for_local = []
  @read_tuples_for_remote = []
  @read_tuples_for_local = []
  @granted_tuples = nil
  @missing = nil
  @tags = nil
  @_take_nowait = nil
  @_read_nowait = nil
  
  if deadline
    worker.at deadline do
      cancel(TimeoutError) if open?
    end
  end
  
  open!
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



93
94
95
# File 'lib/tupelo/client/transaction.rb', line 93

def client
  @client
end

#deadlineObject (readonly)

Returns the value of attribute deadline.



95
96
97
# File 'lib/tupelo/client/transaction.rb', line 95

def deadline
  @deadline
end

#exceptionObject (readonly)

Returns the value of attribute exception.



99
100
101
# File 'lib/tupelo/client/transaction.rb', line 99

def exception
  @exception
end

#global_tickObject (readonly)

Returns the value of attribute global_tick.



97
98
99
# File 'lib/tupelo/client/transaction.rb', line 97

def global_tick
  @global_tick
end

#granted_tuplesObject (readonly)

Returns the value of attribute granted_tuples.



108
109
110
# File 'lib/tupelo/client/transaction.rb', line 108

def granted_tuples
  @granted_tuples
end

#local_tickObject (readonly)

Returns the value of attribute local_tick.



98
99
100
# File 'lib/tupelo/client/transaction.rb', line 98

def local_tick
  @local_tick
end

#missingObject (readonly)

Returns the value of attribute missing.



109
110
111
# File 'lib/tupelo/client/transaction.rb', line 109

def missing
  @missing
end

#pulsesObject (readonly)

Returns the value of attribute pulses.



101
102
103
# File 'lib/tupelo/client/transaction.rb', line 101

def pulses
  @pulses
end

#read_templatesObject (readonly)

Returns the value of attribute read_templates.



103
104
105
# File 'lib/tupelo/client/transaction.rb', line 103

def read_templates
  @read_templates
end

#read_tuples_for_localObject (readonly)

Returns the value of attribute read_tuples_for_local.



107
108
109
# File 'lib/tupelo/client/transaction.rb', line 107

def read_tuples_for_local
  @read_tuples_for_local
end

#read_tuples_for_remoteObject (readonly)

Returns the value of attribute read_tuples_for_remote.



106
107
108
# File 'lib/tupelo/client/transaction.rb', line 106

def read_tuples_for_remote
  @read_tuples_for_remote
end

#statusObject (readonly)

Returns the value of attribute status.



96
97
98
# File 'lib/tupelo/client/transaction.rb', line 96

def status
  @status
end

#tagsObject (readonly)

Returns the value of attribute tags.



110
111
112
# File 'lib/tupelo/client/transaction.rb', line 110

def tags
  @tags
end

#take_templatesObject (readonly)

Returns the value of attribute take_templates.



102
103
104
# File 'lib/tupelo/client/transaction.rb', line 102

def take_templates
  @take_templates
end

#take_tuples_for_localObject (readonly)

Returns the value of attribute take_tuples_for_local.



105
106
107
# File 'lib/tupelo/client/transaction.rb', line 105

def take_tuples_for_local
  @take_tuples_for_local
end

#take_tuples_for_remoteObject (readonly)

Returns the value of attribute take_tuples_for_remote.



104
105
106
# File 'lib/tupelo/client/transaction.rb', line 104

def take_tuples_for_remote
  @take_tuples_for_remote
end

#workerObject (readonly)

Returns the value of attribute worker.



94
95
96
# File 'lib/tupelo/client/transaction.rb', line 94

def worker
  @worker
end

#writesObject (readonly)

Returns the value of attribute writes.



100
101
102
# File 'lib/tupelo/client/transaction.rb', line 100

def writes
  @writes
end

Instance Method Details

#abortObject



291
292
293
# File 'lib/tupelo/client/transaction.rb', line 291

def abort
  client.abort
end

#async(&block) ⇒ Object

Raises:

  • (ArgumentError)


362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/tupelo/client/transaction.rb', line 362

def async &block
  raise ArgumentError, "must provide block" unless block
  TransactionThread.new(self) do ## Fiber?
    begin
      val =
        if block.arity == 0
          instance_eval &block
        else
          yield self
        end
      commit.wait
      val
    rescue TransactionFailure => ex
      log.info {"retrying #{t.inspect}: #{ex}"}
      retry
    rescue TransactionAbort
      log.info {"aborting #{t.inspect}"}
    end
  end
end

#cancel(err = TransactionAbort) ⇒ Object

Called by another thread to cancel a waiting transaction.



554
555
556
557
558
559
560
561
562
563
564
565
566
# File 'lib/tupelo/client/transaction.rb', line 554

def cancel err = TransactionAbort
  worker_push do
    raise unless in_worker_thread?
    if not open? or @global_tick or @exception
      log.info {"cancel was applied too late: #{inspect}"}
    else
      @exception = err.new
      failed!
      @queue << false
    end
  end
  nil
end

#check_openObject



214
215
216
217
218
219
220
221
222
223
# File 'lib/tupelo/client/transaction.rb', line 214

def check_open
  if failed?
    # checking this here is mostly a courtesy to client code; it is possible
    # (a benign race condition) for the failure flag to be set later,
    # even while a #write or #take method still has not returned.
    raise exception
  elsif not open?
    raise TransactionStateError, "not open: #{inspect}"
  end
end

#check_tuples(tuples) ⇒ Object

:section: Client methods



207
208
209
210
211
212
# File 'lib/tupelo/client/transaction.rb', line 207

def check_tuples tuples
  tuples.each do |tuple|
    tuple.respond_to?(:size) and tuple.respond_to?(:fetch) or
      raise ArgumentError, "Not a tuple: #{tuple.inspect}"
  end
end

#client_idObject



162
163
164
# File 'lib/tupelo/client/transaction.rb', line 162

def client_id
  client.client_id
end

#commitObject

idempotent



305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/tupelo/client/transaction.rb', line 305

def commit
  if open?
    if @writes.empty? and @pulses.empty? and
       @take_tuples_for_remote.all? {|t| t.nil?} and
       @read_tuples_for_remote.all? {|t| t.nil?}
      @global_tick = worker.global_tick ## ok?
      done!
      log.info {"not committing empty transaction"}
    else
      closed!
      log.info {"committing #{inspect}"}
      worker_push self
    end
  else
    raise exception if failed?
  end
  return self
end

#done(global_tick, granted_tuples) ⇒ Object



522
523
524
525
526
527
528
529
530
531
532
# File 'lib/tupelo/client/transaction.rb', line 522

def done global_tick, granted_tuples
  raise TransactionStateError, "must be pending" unless pending?
  raise unless in_worker_thread?
  raise if @global_tick or @exception

  @global_tick = global_tick
  done!
  @granted_tuples = granted_tuples
  log.info {"done with #{inspect}"}
  @queue << true
end

#error(ex) ⇒ Object



544
545
546
547
548
549
550
551
# File 'lib/tupelo/client/transaction.rb', line 544

def error ex
  raise unless in_worker_thread?
  raise if @global_tick or @exception
  
  @exception = ex
  failed!
  @queue << false
end

#fail(missing) ⇒ Object



534
535
536
537
538
539
540
541
542
# File 'lib/tupelo/client/transaction.rb', line 534

def fail missing
  raise unless in_worker_thread?
  raise if @global_tick or @exception
  
  @missing = missing
  @exception = TransactionFailure
  failed!
  @queue << false
end

#fail!Object

Client may call this before commit. In transaction do…end block, this causes transaction to be re-executed.

Raises:



297
298
299
300
301
302
# File 'lib/tupelo/client/transaction.rb', line 297

def fail!
  raise if in_worker_thread?
  raise unless open?
  failed!
  raise TransactionFailure
end

#in_worker_thread?Boolean

:section: Worker methods

Returns:

  • (Boolean)


385
386
387
# File 'lib/tupelo/client/transaction.rb', line 385

def in_worker_thread?
  worker.in_thread?
end

#inspectObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/tupelo/client/transaction.rb', line 178

def inspect
  stat_extra =
    case
    when pending?
      "at local_tick: #{local_tick}"
    when done?
      "at global_tick: #{global_tick}"
    end
  
  stat = [status, stat_extra].compact.join(" ")
  
  ops = [ ["write", writes], ["pulse", pulses],
        ["take", take_templates], ["read", read_templates] ]
        ## exclude templates that were satisfied locally by writes
  ops.map! do |label, tuples|
    ["#{label} #{tuples.map(&:inspect).join(", ")}"] unless tuples.empty?
  end
  ops.compact!
  ops << "missing: #{missing}" if missing

  ## show take/read tuples too?
  ## show current tick, if open or closed
  ## show nowait
  
  "<#{self.class} #{stat} #{ops.join('; ')}>"
end

#log(*args) ⇒ Object



170
171
172
173
174
175
176
# File 'lib/tupelo/client/transaction.rb', line 170

def log *args
  if args.empty?
    @log
  else
    @log.unknown *args
  end
end

#or(*templates) ⇒ Object Also known as: match_any



21
22
23
# File 'lib/tupelo/util/boolean.rb', line 21

def or *templates
  client.or *templates
end

#prepare(new_tuple = nil) ⇒ Object



389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
# File 'lib/tupelo/client/transaction.rb', line 389

def prepare new_tuple = nil
  return false if closed? or failed? # might change during this method
  raise unless in_worker_thread?

  if new_tuple
    return true if take_tuples_for_local.all? and read_tuples_for_local.all?

    take_tuples_for_local.each_with_index do |tuple, i|
      if not tuple and take_templates[i] === new_tuple
        take_tuples_for_local[i] = new_tuple
        take_tuples_for_remote[i] = new_tuple
        log.debug {"prepared #{inspect} with #{new_tuple}"}
        break
      end
    end

    read_tuples_for_local.each_with_index do |tuple, i|
      if not tuple and read_templates[i] === new_tuple
        read_tuples_for_local[i] = new_tuple
        read_tuples_for_remote[i] = new_tuple
        log.debug {"prepared #{inspect} with #{new_tuple}"}
      end
    end

  else
    ## optimization: use tuple cache
    skip = nil
    (take_tuples_for_local.size...take_templates.size).each do |i|
      template = take_templates[i]
      
      if wt = @writes.find {|tuple| template === tuple}
        take_tuples_for_remote[i] = nil
        take_tuples_for_local[i] = wt.dup
        @writes.delete wt
        next
      end

      take_tuples_for_local[i] = take_tuples_for_remote[i] =
        worker.tuplespace.find_match_for(template,
          distinct_from: take_tuples_for_local)
      
      if take_tuples_for_local[i]
        log.debug {"prepared #{inspect} with #{take_tuples_for_local[i]}"}
      else
        if @_take_nowait and @_take_nowait[i]
          (skip ||= []) << i
        end
      end
    end

    skip and skip.reverse_each do |i|
      take_tuples_for_local.delete_at i
      take_tuples_for_remote.delete_at i
      take_templates.delete_at i
      @_take_nowait.delete i
    end
    
    skip = nil
    (read_tuples_for_local.size...read_templates.size).each do |i|
      template = read_templates[i]

      if wt = @writes.find {|tuple| template === tuple}
        read_tuples_for_remote[i] = nil
        read_tuples_for_local[i] = wt.dup
        next
      end

      read_tuples_for_local[i] = read_tuples_for_remote[i] =
        worker.tuplespace.find_match_for(template,
          distinct_from: take_tuples_for_local)
      
      if read_tuples_for_local[i]
        log.debug {"prepared #{inspect} with #{read_tuples_for_local[i]}"}
      else
        if @_read_nowait and @_read_nowait[i]
          (skip ||= []) << i
        end
      end
    end

    skip and skip.reverse_each do |i|
      read_tuples_for_local.delete_at i
      read_tuples_for_remote.delete_at i
      read_templates.delete_at i
      @_read_nowait.delete i
    end
  end
  
  ## convert cancelling write/take to pulse
  ## convert cancelling take/write to read
  
  if take_tuples_for_local.all? and read_tuples_for_local.all?
    @queue << true
    log.debug {
      "prepared #{inspect}, " +
      "take tuples: #{take_tuples_for_local}, " +
      "read tuples: #{read_tuples_for_local}"}
  end
  
  return true
end

#pulse(*tuples) ⇒ Object



235
236
237
238
239
240
241
# File 'lib/tupelo/client/transaction.rb', line 235

def pulse *tuples
  check_open
  check_tuples tuples
  blobber = worker.blobber
  @pulses.concat tuples.map {|t| blobber.load(blobber.dump(t))}
  nil
end

#read(template_spec) ⇒ Object

transaction applies only if template has a match



268
269
270
271
272
273
274
275
276
# File 'lib/tupelo/client/transaction.rb', line 268

def read template_spec
  check_open
  template = worker.make_template(template_spec)
  @read_templates << template
  log.debug {"asking worker to read #{template_spec.inspect}"}
  worker_push self
  wait
  return read_tuples_for_local.last
end

#read_nowait(template_spec) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/tupelo/client/transaction.rb', line 278

def read_nowait template_spec
  check_open
  template = worker.make_template(template_spec)
  @_read_nowait ||= {}
  i = @read_templates.size
  @_read_nowait[i] = true
  @read_templates << template
  log.debug {"asking worker to read #{template_spec.inspect}"}
  worker_push self
  wait
  return read_tuples_for_local[i]
end

#submitObject



514
515
516
517
518
519
520
# File 'lib/tupelo/client/transaction.rb', line 514

def submit
  raise TransactionStateError, "must be closed" unless closed?
  raise unless in_worker_thread?

  @local_tick = worker.send_transaction self
  pending!
end

#subspace(tag) ⇒ Object



166
167
168
# File 'lib/tupelo/client/transaction.rb', line 166

def subspace tag
  client.subspace tag
end

#take(template_spec) ⇒ Object

raises TransactionFailure



244
245
246
247
248
249
250
251
252
# File 'lib/tupelo/client/transaction.rb', line 244

def take template_spec
  check_open
  template = worker.make_template(template_spec)
  @take_templates << template
  log.debug {"asking worker to take #{template_spec.inspect}"}
  worker_push self
  wait
  return take_tuples_for_local.last
end

#take_nowait(template_spec) ⇒ Object



254
255
256
257
258
259
260
261
262
263
264
265
# File 'lib/tupelo/client/transaction.rb', line 254

def take_nowait template_spec
  check_open
  template = worker.make_template(template_spec)
  @_take_nowait ||= {}
  i = @take_templates.size
  @_take_nowait[i] = true
  @take_templates << template
  log.debug {"asking worker to take_nowait #{template_spec.inspect}"}
  worker_push self
  wait
  return take_tuples_for_local[i]
end

#unprepare(missing_tuple) ⇒ Object



491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
# File 'lib/tupelo/client/transaction.rb', line 491

def unprepare missing_tuple
  return false if closed? or failed? # might change during this method
  raise unless in_worker_thread?

  @take_tuples_for_remote.each do |tuple|
    if tuple == missing_tuple ## might be false positive, but ok
      fail [missing_tuple]
        ## optimization: manage tuple cache
      return false
    end
  end

  @read_tuples_for_remote.each do |tuple|
    if tuple == missing_tuple ## might be false positive, but ok
      fail [missing_tuple]
      return false
    end
  end

  ## redo the conversions etc
  return true
end

#valueObject



347
348
349
350
# File 'lib/tupelo/client/transaction.rb', line 347

def value
  wait
  granted_tuples
end

#waitObject



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/tupelo/client/transaction.rb', line 328

def wait
  return self if done?
  raise exception if failed?
  
  log.debug {"waiting for #{inspect}"}
  @queue.pop
  log.debug {"finished waiting for #{inspect}"}

  return self if done? or open?
  raise exception if failed?
  log.error inspect
  raise "bug: #{inspect}"

rescue TransactionAbort, Interrupt, TimeoutError => ex ## others?
  worker_push Unwaiter.new(self)
  cstr = "client #{client_id} (#{log.progname})"
  raise ex.class, "#{ex.message}: #{cstr} waiting for #{inspect}"
end

#worker_push(event = Proc.new) ⇒ Object



324
325
326
# File 'lib/tupelo/client/transaction.rb', line 324

def worker_push event=Proc.new
  worker << event
end

#write(*tuples) ⇒ Object



225
226
227
228
229
230
231
232
233
# File 'lib/tupelo/client/transaction.rb', line 225

def write *tuples
  check_open
  check_tuples tuples
  blobber = worker.blobber
  @writes.concat tuples.map {|t| blobber.load(blobber.dump(t))}
    # this is both to de-alias (esp. in case of marshal or yaml) and
    # to convert symbols to strings (in case of msgpack or json)
  nil
end