Class: Tupelo::Client::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/tupelo/client/worker.rb

Direct Known Subclasses

Archiver::Worker, PersistentArchiver::Worker

Defined Under Namespace

Classes: Operation, Subspace

Constant Summary collapse

GET_TUPLESPACE =
"get tuplespace"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Worker

Returns a new instance of Worker.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/tupelo/client/worker.rb', line 68

def initialize client
  @client = client
  @seq = nil
  @arc = nil
  @log = client.log

  @client_id = nil
  @global_tick = nil
  @start_tick = nil
  @local_tick = 0
  @delta = 0

  @cmd_queue = client.make_queue
  @tuplespace = nil
  @message_class = client.message_class
  @blobber = nil

  @read_waiters = []
  @prep_waiters = []
  @trans_waiters = []
  @notify_waiters = []
  @stopping = false
  @subspaces = []
end

Instance Attribute Details

#arcObject (readonly)

Returns the value of attribute arc.



11
12
13
# File 'lib/tupelo/client/worker.rb', line 11

def arc
  @arc
end

#blobberObject (readonly)

Returns the value of attribute blobber.



22
23
24
# File 'lib/tupelo/client/worker.rb', line 22

def blobber
  @blobber
end

#clientObject (readonly)

Returns the value of attribute client.



9
10
11
# File 'lib/tupelo/client/worker.rb', line 9

def client
  @client
end

#client_idObject (readonly)

Returns the value of attribute client_id.



12
13
14
# File 'lib/tupelo/client/worker.rb', line 12

def client_id
  @client_id
end

#cmd_queueObject (readonly)

Returns the value of attribute cmd_queue.



19
20
21
# File 'lib/tupelo/client/worker.rb', line 19

def cmd_queue
  @cmd_queue
end

#deltaObject (readonly)

Returns the value of attribute delta.



16
17
18
# File 'lib/tupelo/client/worker.rb', line 16

def delta
  @delta
end

#global_tickObject (readonly)

Returns the value of attribute global_tick.



14
15
16
# File 'lib/tupelo/client/worker.rb', line 14

def global_tick
  @global_tick
end

#local_tickObject (readonly)

Returns the value of attribute local_tick.



13
14
15
# File 'lib/tupelo/client/worker.rb', line 13

def local_tick
  @local_tick
end

#message_classObject (readonly)

Returns the value of attribute message_class.



21
22
23
# File 'lib/tupelo/client/worker.rb', line 21

def message_class
  @message_class
end

#msg_reader_threadObject (readonly)

Returns the value of attribute msg_reader_thread.



17
18
19
# File 'lib/tupelo/client/worker.rb', line 17

def msg_reader_thread
  @msg_reader_thread
end

#notify_waitersObject (readonly)

Returns the value of attribute notify_waiters.



26
27
28
# File 'lib/tupelo/client/worker.rb', line 26

def notify_waiters
  @notify_waiters
end

#prep_waitersObject (readonly)

Returns the value of attribute prep_waiters.



24
25
26
# File 'lib/tupelo/client/worker.rb', line 24

def prep_waiters
  @prep_waiters
end

#read_waitersObject (readonly)

Returns the value of attribute read_waiters.



23
24
25
# File 'lib/tupelo/client/worker.rb', line 23

def read_waiters
  @read_waiters
end

#seqObject (readonly)

Returns the value of attribute seq.



10
11
12
# File 'lib/tupelo/client/worker.rb', line 10

def seq
  @seq
end

#start_tickObject (readonly)

Returns the value of attribute start_tick.



15
16
17
# File 'lib/tupelo/client/worker.rb', line 15

def start_tick
  @start_tick
end

#subspacesObject (readonly)

Returns the value of attribute subspaces.



27
28
29
# File 'lib/tupelo/client/worker.rb', line 27

def subspaces
  @subspaces
end

#trans_waitersObject (readonly)

Returns the value of attribute trans_waiters.



25
26
27
# File 'lib/tupelo/client/worker.rb', line 25

def trans_waiters
  @trans_waiters
end

#tuplespaceObject (readonly)

Returns the value of attribute tuplespace.



20
21
22
# File 'lib/tupelo/client/worker.rb', line 20

def tuplespace
  @tuplespace
end

#worker_threadObject (readonly)

Returns the value of attribute worker_thread.



18
19
20
# File 'lib/tupelo/client/worker.rb', line 18

def worker_thread
  @worker_thread
end

Instance Method Details

#<<(cmd) ⇒ Object



162
163
164
# File 'lib/tupelo/client/worker.rb', line 162

def << cmd
  cmd_queue << cmd
end

#at(time, &action) ⇒ Object



155
156
157
158
159
160
# File 'lib/tupelo/client/worker.rb', line 155

def at time, &action
  @atdo ||= AtDo.new
  @atdo.at time do
    cmd_queue << action
  end
end

#collect_tags(tuple) ⇒ Object



537
538
539
# File 'lib/tupelo/client/worker.rb', line 537

def collect_tags tuple
  subspaces.select {|subspace| subspace === tuple}.map(&:tag)
end

#handle_client_request(req) ⇒ Object



451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
# File 'lib/tupelo/client/worker.rb', line 451

def handle_client_request req
  log.debug {"client requested #{req.inspect}"}

  case req
  when message_class
    raise "only seq can send messages"

  when Waiter
    handle_waiter req

  when Matcher
    handle_matcher req

  when Unwaiter
    handle_unwaiter req

  when Transaction
    handle_transaction req

  when NotifyWaiter
    notify_waiters.delete req or notify_waiters.push req

  when Proc
    req.call

  else
    raise "unknown request from client: #{req}"
  end
rescue => ex
  log.error "error while handling #{req.inspect}: #{ex}"
  ## Raise an error in the waiter? Need to generalize the mechanism in
  ## Transaction.
end

#handle_matcher(matcher) ⇒ Object



521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
# File 'lib/tupelo/client/worker.rb', line 521

def handle_matcher matcher
  if matcher.all
    tuplespace.each {|tuple| matcher.gloms tuple}
      ## maybe should have tuplespace.find_all_matches_for ...
      ## in case there is an optimization
    matcher.fails
  else
    tuple = tuplespace.find_match_for matcher.template
    if tuple
      matcher.peek tuple
    else
      matcher.fails
    end
  end
end

#handle_message(msg) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/tupelo/client/worker.rb', line 285

def handle_message msg
  log.debug {"seq sent #{msg.inspect}"}

  if msg.control?
    client.handle_ack msg
    op_type, tags = msg.control_op
    case op_type
    when Funl::SUBSCRIBE_ALL
      update_to_tick tick: msg.global_tick, all: true
    when Funl::SUBSCRIBE
      update_to_tick tick: msg.global_tick,
        tags: (client.subscribed_tags | tags)
    when Funl::UNSUBSCRIBE_ALL
      update_to_tick tick: msg.global_tick, all: false
    when Funl::UNSUBSCRIBE
      update_to_tick tick: msg.global_tick,
        tags: (client.subscribed_tags - tags)
    else
      raise "Unimplemented: #{msg.inspect}"
    end
    return
  end

  if !global_tick
    raise "bug: should have subscribed and received ack before data"
  end

  if msg.global_tick < global_tick + 1
    log.debug {"discarding redundant message at #{msg.global_tick}"}
      # due to archiver timing, for example
    return
  end

  @global_tick = msg.global_tick
  @delta = 0

  record_history msg
  op = msg.blob ? Operation.new(*blobber.load(msg.blob)) : Operation::NOOP
    ## op.freeze_deeply
  log.debug {"applying #{op} from client #{msg.client_id}"}

  notify_waiters.each do |waiter|
    waiter << [:attempt, msg.global_tick, msg.client_id, op, msg.tags]
  end

  take_tuples = tuplespace.find_distinct_matches_for(op.takes)
  read_tuples = op.reads.map {|t| tuplespace.find_match_for(t)}
  succeeded = take_tuples.all? && read_tuples.all?

  if client.subscribed_all
    write_tuples = op.writes
  else
    write_tuples = op.writes.select do |tuple|
      subspaces.any? {|subspace| subspace === tuple}
    end
  end
  ## This is duplicated effort: the sender has already done this.
  ## So maybe the result could be transmitted in the msg protocol?

  if succeeded
    log.debug {"inserting #{op.writes}; deleting #{take_tuples}"}
    tuplespace.transaction inserts: write_tuples, deletes: take_tuples,
      tick: @global_tick
  
    op.writes.each do |tuple|
      sniff_meta_tuple tuple
    end

    take_tuples.each do |tuple|
      if is_meta_tuple? tuple
        ## do some error checking
        subspaces.delete_if {|sp| sp.tag == tuple["tag"]}
      end
    end
  end

  notify_waiters.each do |waiter|
    waiter << [
      succeeded ?  :success : :failure,
      msg.global_tick, msg.client_id, op, msg.tags]
  end

  trans = nil
  if msg.client_id == client_id
    trans = trans_waiters.first
    unless trans and trans.local_tick == msg.local_tick
      log.error "transaction #{op} out of order in sequence " +
        trans_waiters.inspect
      ## exit? wait?
    end
    trans_waiters.shift
    log.debug {"operation belongs to this client: #{trans.inspect}"}
  end

  if not take_tuples.empty?
    if succeeded
      take_tuples.each do |tuple|
        prep_waiters.keep_if do |waiter|
          waiter.unprepare tuple
          ## optimization: track number of instances of tuple, to avoid
          ## false positive in #unprepare
        end
      end

      log.debug {trans ? "taking #{take_tuples}" :
        "client #{msg.client_id} takes #{take_tuples}"}

    else
      log.debug {
        missing = []
        take_tuples.each_with_index do |tuple, i|
          missing << op.takes[i] unless tuple
        end
        trans ? "failed to take #{missing}" :
        "client #{msg.client_id} failed to take #{missing}"}
    end
  end

  if succeeded
    op.writes.each do |tuple|
      read_waiters.delete_if do |waiter|
        waiter.gloms tuple
      end
    end

    op.pulses.each do |tuple|
      log.debug {"pulsing #{tuple}"}
      read_waiters.delete_if do |waiter|
        waiter.gloms tuple
      end
    end

    op.writes.each do |tuple|
      prep_waiters.keep_if do |waiter|
        waiter.prepare tuple
      end
    end
  end

  if trans
    trans_waiters.delete trans

    if succeeded
      trans.done msg.global_tick, take_tuples # note: tuples not frozen
    else
      trans.fail (op.takes - take_tuples) + (op.reads - read_tuples)
    end
  end
end

#handle_one_requestObject



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/tupelo/client/worker.rb', line 201

def handle_one_request
  case cmd = cmd_queue.pop
  when :stop
    @stopping = true
    if trans_waiters.empty?
      throw :done
    else
      log.info {"stopping; waiting for #{trans_waiters}"}
    end
  when message_class
    handle_message cmd
    if @stopping
      if trans_waiters.empty?
        throw :done
      else
        log.info {"stopping; waiting for #{trans_waiters}"}
      end
    end
  else
    handle_client_request cmd unless @stopping
  end
end

#handle_seq_closedObject



175
176
177
178
# File 'lib/tupelo/client/worker.rb', line 175

def handle_seq_closed
  ## what to do here in general?
  ## for each waiter, push :stop into queue ?
end

#handle_transaction(t) ⇒ Object



485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
# File 'lib/tupelo/client/worker.rb', line 485

def handle_transaction t
  case
  when t.open?
    t.prepare
    prep_waiters << t unless prep_waiters.include? t
  when t.closed?
    t.submit
    prep_waiters.delete t
  when t.failed?
  else
    log.warn "not open or closed or failed: #{t.inspect}"
  end
rescue => ex
  log.error "error while handling #{t.inspect}: #{ex}"
  t.error ex
end

#handle_unwaiter(unwaiter) ⇒ Object



502
503
504
505
# File 'lib/tupelo/client/worker.rb', line 502

def handle_unwaiter unwaiter
  waiter = unwaiter.waiter
  read_waiters.delete waiter or prep_waiters.delete waiter
end

#handle_waiter(waiter) ⇒ Object



507
508
509
510
511
512
513
514
515
516
517
518
519
# File 'lib/tupelo/client/worker.rb', line 507

def handle_waiter waiter
  if waiter.once
    tuple = tuplespace.find_match_for waiter.template
    if tuple
      waiter.peek tuple
    else
      read_waiters << waiter
    end
  else
    tuplespace.each {|tuple| waiter.gloms tuple}
    read_waiters << waiter
  end
end

#in_thread?Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/tupelo/client/worker.rb', line 129

def in_thread?
  Thread.current == worker_thread
end

#is_meta_tuple?(tuple) ⇒ Boolean

Returns true if tuple is subspace metadata.

Returns:

  • (Boolean)


436
437
438
439
# File 'lib/tupelo/client/worker.rb', line 436

def is_meta_tuple? tuple
  tuple.kind_of? Hash and tuple.key? TUPELO_META_KEY and
    tuple[TUPELO_META_KEY] == "subspace"
end

#log(*args) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/tupelo/client/worker.rb', line 93

def log *args
  if args.empty?
    @log
  else
    @log.unknown *args
  end
end

#make_template(obj) ⇒ Object

Used by api to protect worker’s copy from client changes. Also, for serialization types that don’t represent symbols, this converts a template so that it works correctly regardless. So keyword args are very natural: read(k1: val, k2: val)



624
625
626
627
628
# File 'lib/tupelo/client/worker.rb', line 624

def make_template obj
  return obj unless obj.respond_to? :to_ary or obj.respond_to? :to_hash
  spec = Marshal.load(Marshal.dump(obj))
  rot_for(spec).optimize!
end

#observe_started_clientObject



133
134
135
136
137
138
139
# File 'lib/tupelo/client/worker.rb', line 133

def observe_started_client
  @client_id = client.client_id
  @blobber = client.blobber
  @seq = client.seq
  @arc = client.arc
  @start_tick = client.start_tick
end

#pot_for(spec) ⇒ Object



634
635
636
# File 'lib/tupelo/client/worker.rb', line 634

def pot_for spec
  PortableObjectTemplate.new(spec, proc {|k| blobber.load(blobber.dump(k))})
end

#read_messages_from_seqObject



180
181
182
183
184
# File 'lib/tupelo/client/worker.rb', line 180

def read_messages_from_seq
  seq.each do |msg|
    self << msg
  end
end

#record_history(msg) ⇒ Object



449
# File 'lib/tupelo/client/worker.rb', line 449

def record_history msg; end

#rot_for(spec) ⇒ Object



630
631
632
# File 'lib/tupelo/client/worker.rb', line 630

def rot_for spec
  RubyObjectTemplate.new(spec, proc {|k| blobber.load(blobber.dump(k))})
end

#run_msg_reader_threadObject



166
167
168
169
170
171
172
173
# File 'lib/tupelo/client/worker.rb', line 166

def run_msg_reader_thread
  read_messages_from_seq
  log.warn "connection to seq closed"
  handle_seq_closed
rescue => ex
  log.error ex
  raise
end

#run_request_loopObject



193
194
195
196
197
198
199
# File 'lib/tupelo/client/worker.rb', line 193

def run_request_loop
  catch :done do
    loop do
      handle_one_request
    end
  end
end

#run_worker_threadObject



186
187
188
189
190
191
# File 'lib/tupelo/client/worker.rb', line 186

def run_worker_thread
  run_request_loop
rescue => ex
  log.error ex
  raise
end

#send_transaction(transaction) ⇒ Object



541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
# File 'lib/tupelo/client/worker.rb', line 541

def send_transaction transaction
  msg = message_class.new
  msg.client_id = client_id
  msg.local_tick = local_tick + 1
  msg.global_tick = global_tick
  msg.delta = delta + 1 # pipelined write/take
  msg.tags = transaction.tags

  writes = transaction.writes
  pulses = transaction.pulses
  takes = transaction.take_tuples_for_remote.compact
  reads = transaction.read_tuples_for_remote.compact
  
  unless msg.tags
    tags = nil
    [takes, reads].compact.flatten(1).each do |tuple|
      if tags
        tuple_tags = collect_tags(tuple)
        unless tuple_tags == tags
          d = (tuple_tags - tags) + (tags - tuple_tags)
          raise TransactionSubspaceError,
            "tuples crossing subspaces: #{d} in #{transaction.inspect}"
        end
      else
        tags = collect_tags(tuple)
      end
    end
    tags ||= []

    write_tags = []
    [writes, pulses].compact.flatten(1).each do |tuple|
      write_tags |= collect_tags(tuple)
    end

    if takes.empty? and reads.empty?
      tags = write_tags
    else
      d = write_tags - tags
      unless d.empty?
        raise TransactionSubspaceError,
          "writes crossing subspaces: #{d} in #{transaction.inspect}"
      end
    end

    will_get_this_msg = client.subscribed_all ||
      tags.any? {|tag| client.subscribed_tags.include? tag} ## optimize

    unless will_get_this_msg
      tags << true # reflect
    end

    if not tags.empty?
      msg.tags = tags
      log.debug {"tagged transaction: #{tags}"}
    end
  end

  begin
    msg.blob = blobber.dump([writes, pulses, takes, reads])
    ## optimization: use bitfields to identify which ops are present
    ## (instead of nils), in one int
  rescue => ex
    raise ex, "cannot serialize #{transaction.inspect}: #{ex}"
  end

  begin
    seq << msg
  rescue => ex
    raise ex, "cannot send request for #{transaction.inspect}: #{ex}"
  end

  @local_tick += 1
  @delta += 1

  trans_waiters << transaction

  return msg.local_tick
end

#sniff_meta_tuple(tuple) ⇒ Object



441
442
443
444
445
446
447
# File 'lib/tupelo/client/worker.rb', line 441

def sniff_meta_tuple tuple
  if is_meta_tuple? tuple
    ## do some error checking
    ## what if subspace already exists?
    subspaces << Subspace.new(tuple, self)
  end
end

#startObject



114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/tupelo/client/worker.rb', line 114

def start
  return if @worker_thread

  log.info "worker starting"
  observe_started_client

  @msg_reader_thread = Thread.new do
    run_msg_reader_thread
  end

  @worker_thread = Thread.new do
    run_worker_thread
  end
end

#stopObject



141
142
143
144
145
146
# File 'lib/tupelo/client/worker.rb', line 141

def stop
  cmd_queue << :stop
  worker_thread.join if worker_thread ## join(limit)?
  msg_reader_thread.kill if msg_reader_thread
  @atdo.stop if @atdo
end

#stop!Object

stop without any remote handshaking



149
150
151
152
153
# File 'lib/tupelo/client/worker.rb', line 149

def stop!
  @msg_reader_thread.kill if msg_reader_thread
  @worker_thread.kill if worker_thread
  @atdo.stop if @atdo
end

#update_to_tick(tick: nil, tags: nil, all: false) ⇒ Object



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/tupelo/client/worker.rb', line 224

def update_to_tick tick: nil, tags: nil, all: false
  # At this point we know that the seq messages now accumulating in
  # cmd_queue are tick+1, tick+2, ....
  # (or a subset of this sequence if not subscribed_all).
  # Some of these might get discarded later if archiver is more current.
  log.debug {"update_to_tick #{tick}"}

  unless arc
    if tick > 0
      log.warn "no archiver provided; assuming pubsub mode; " +
        "some client ops (take and local read) will not work"
    end
    @global_tick = tick
    log.info "global_tick = #{global_tick}"
    return
  end

  log.info "requesting tuplespace from arc"
  subscription_delta = {
    request_all: all,
    request_tags: tags,
    subscribed_all: client.subscribed_all,
    subscribed_tags: client.subscribed_tags
  }
  arc << [GET_TUPLESPACE, subscription_delta, tick]

  begin
    tuplespace.clear
      ## In some cases, we can keep some of it, but the current
      ## archiver is not smart enough to send exactly the delta.
      ## Also, might need to abort some current transactions.

    arc_tick = arc.read[0]
    log.info "arc says global_tick = #{arc_tick}"

    done = false
    count = 0
    arc.each do |tuple|
      if tuple.nil?
        done = true
      else
        raise "bad object stream from archiver" if done
        sniff_meta_tuple tuple
        tuplespace.insert tuple
        count += 1
      end
    end
    unless done
      raise "did not get all of tuplespace from archiver" ## roll back?
    end

    log.info "received tuplespace from arc: #{count} tuples"

    @global_tick = arc_tick
    log.info "global_tick = #{global_tick}"
  end

ensure
  arc.close if arc and not arc.closed?
end