Class: Tupelo::Client::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/tupelo/client/worker.rb

Direct Known Subclasses

Archiver::Worker, PersistentArchiver::Worker

Defined Under Namespace

Classes: Operation, Subspace

Constant Summary collapse

GET_TUPLESPACE =
"get tuplespace"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Worker

Returns a new instance of Worker.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/tupelo/client/worker.rb', line 72

def initialize client
  @client = client
  @seq = nil
  @arc = nil
  @log = client.log

  @client_id = nil
  @global_tick = nil
  @start_tick = nil
  @local_tick = 0
  @delta = 0

  @cmd_queue = client.make_queue
  @tuplespace = nil
  @message_class = client.message_class
  @blobber = nil

  @read_waiters = []
  @prep_waiters = []
  @trans_waiters = []
  @notify_waiters = []
  @stopping = false
  @subspaces = []
end

Instance Attribute Details

#arcObject (readonly)

Returns the value of attribute arc.



11
12
13
# File 'lib/tupelo/client/worker.rb', line 11

def arc
  @arc
end

#blobberObject (readonly)

Returns the value of attribute blobber.



22
23
24
# File 'lib/tupelo/client/worker.rb', line 22

def blobber
  @blobber
end

#clientObject (readonly)

Returns the value of attribute client.



9
10
11
# File 'lib/tupelo/client/worker.rb', line 9

def client
  @client
end

#client_idObject (readonly)

Returns the value of attribute client_id.



12
13
14
# File 'lib/tupelo/client/worker.rb', line 12

def client_id
  @client_id
end

#cmd_queueObject (readonly)

Returns the value of attribute cmd_queue.



19
20
21
# File 'lib/tupelo/client/worker.rb', line 19

def cmd_queue
  @cmd_queue
end

#deltaObject (readonly)

Returns the value of attribute delta.



16
17
18
# File 'lib/tupelo/client/worker.rb', line 16

def delta
  @delta
end

#global_tickObject (readonly)

Returns the value of attribute global_tick.



14
15
16
# File 'lib/tupelo/client/worker.rb', line 14

def global_tick
  @global_tick
end

#local_tickObject (readonly)

Returns the value of attribute local_tick.



13
14
15
# File 'lib/tupelo/client/worker.rb', line 13

def local_tick
  @local_tick
end

#message_classObject (readonly)

Returns the value of attribute message_class.



21
22
23
# File 'lib/tupelo/client/worker.rb', line 21

def message_class
  @message_class
end

#msg_reader_threadObject (readonly)

Returns the value of attribute msg_reader_thread.



17
18
19
# File 'lib/tupelo/client/worker.rb', line 17

def msg_reader_thread
  @msg_reader_thread
end

#notify_waitersObject (readonly)

Returns the value of attribute notify_waiters.



26
27
28
# File 'lib/tupelo/client/worker.rb', line 26

def notify_waiters
  @notify_waiters
end

#prep_waitersObject (readonly)

Returns the value of attribute prep_waiters.



24
25
26
# File 'lib/tupelo/client/worker.rb', line 24

def prep_waiters
  @prep_waiters
end

#read_waitersObject (readonly)

Returns the value of attribute read_waiters.



23
24
25
# File 'lib/tupelo/client/worker.rb', line 23

def read_waiters
  @read_waiters
end

#seqObject (readonly)

Returns the value of attribute seq.



10
11
12
# File 'lib/tupelo/client/worker.rb', line 10

def seq
  @seq
end

#start_tickObject (readonly)

Returns the value of attribute start_tick.



15
16
17
# File 'lib/tupelo/client/worker.rb', line 15

def start_tick
  @start_tick
end

#subspacesObject (readonly)

Returns the value of attribute subspaces.



27
28
29
# File 'lib/tupelo/client/worker.rb', line 27

def subspaces
  @subspaces
end

#trans_waitersObject (readonly)

Returns the value of attribute trans_waiters.



25
26
27
# File 'lib/tupelo/client/worker.rb', line 25

def trans_waiters
  @trans_waiters
end

#tuplespaceObject (readonly)

Returns the value of attribute tuplespace.



20
21
22
# File 'lib/tupelo/client/worker.rb', line 20

def tuplespace
  @tuplespace
end

#worker_threadObject (readonly)

Returns the value of attribute worker_thread.



18
19
20
# File 'lib/tupelo/client/worker.rb', line 18

def worker_thread
  @worker_thread
end

Instance Method Details

#<<(cmd) ⇒ Object



166
167
168
# File 'lib/tupelo/client/worker.rb', line 166

def << cmd
  cmd_queue << cmd
end

#at(time, &action) ⇒ Object



159
160
161
162
163
164
# File 'lib/tupelo/client/worker.rb', line 159

def at time, &action
  @atdo ||= AtDo.new
  @atdo.at time do
    cmd_queue << action
  end
end

#handle_client_request(req) ⇒ Object



453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
# File 'lib/tupelo/client/worker.rb', line 453

def handle_client_request req
  log.debug {"client requested #{req.inspect}"}

  case req
  when message_class
    raise "only seq can send messages"

  when Waiter
    handle_waiter req

  when Matcher
    handle_matcher req

  when Unwaiter
    handle_unwaiter req

  when Transaction
    handle_transaction req

  when NotifyWaiter
    notify_waiters.delete req or notify_waiters.push req

  when Proc
    req.call

  else
    raise "unknown request from client: #{req}"
  end
rescue => ex
  log.error "error while handling #{req.inspect}: #{ex}"
  ## Raise an error in the waiter? Need to generalize the mechanism in
  ## Transaction.
end

#handle_matcher(matcher) ⇒ Object



523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
# File 'lib/tupelo/client/worker.rb', line 523

def handle_matcher matcher
  if matcher.all
    tuplespace.each {|tuple| matcher.gloms tuple}
      ## maybe should have tuplespace.find_all_matches_for ...
      ## in case there is an optimization
    matcher.fails
  else
    tuple = tuplespace.find_match_for waiter.template
    if tuple
      waiter.peek tuple
    else
      matcher.fails
    end
  end
end

#handle_message(msg) ⇒ Object



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# File 'lib/tupelo/client/worker.rb', line 289

def handle_message msg
  log.debug {"seq sent #{msg.inspect}"}

  if msg.control?
    client.handle_ack msg
    op_type, tags = msg.control_op
    case op_type
    when Funl::SUBSCRIBE_ALL
      update_to_tick tick: msg.global_tick, all: true
    when Funl::SUBSCRIBE
      update_to_tick tick: msg.global_tick,
        tags: (client.subscribed_tags | tags)
    when Funl::UNSUBSCRIBE_ALL
      update_to_tick tick: msg.global_tick, all: false
    when Funl::UNSUBSCRIBE
      update_to_tick tick: msg.global_tick,
        tags: (client.subscribed_tags - tags)
    else
      raise "Unimplemented: #{msg.inspect}"
    end
    return
  end

  if !global_tick
    raise "bug: should have subscribed and received ack before data"
  end

  if msg.global_tick < global_tick + 1
    log.debug {"discarding redundant message at #{msg.global_tick}"}
      # due to archiver timing, for example
    return
  end

  @global_tick = msg.global_tick
  @delta = 0

  record_history msg
  op = msg.blob ? Operation.new(*blobber.load(msg.blob)) : Operation::NOOP
    ## op.freeze_deeply
  log.debug {"applying #{op} from client #{msg.client_id}"}

  notify_waiters.each do |waiter|
    waiter << [:attempt, msg.global_tick, msg.client_id, op, msg.tags]
  end

  granted_tuples = tuplespace.find_distinct_matches_for(op.takes)
  read_tuples = op.reads.map {|t| tuplespace.find_match_for(t)}

  succeeded = !op.atomic || (granted_tuples.all? && read_tuples.all?)
  take_tuples = granted_tuples.compact

  if client.subscribed_all
    write_tuples = op.writes
  else
    write_tuples = op.writes.select do |tuple|
      subspaces.any? {|subspace| subspace === tuple}
    end
  end
  ## This is duplicated effort: the sender has already done this.
  ## So maybe the result could be transmitted in the msg protocol?

  if succeeded
    log.debug {"inserting #{op.writes}; deleting #{take_tuples}"}
    tuplespace.transaction inserts: write_tuples, deletes: take_tuples,
      tick: @global_tick
  
    op.writes.each do |tuple|
      sniff_meta_tuple tuple
    end

    take_tuples.each do |tuple|
      ### abstract this out
      if tuple.kind_of? Hash and tuple.key? "__tupelo__"
        if tuple["__tupelo__"] == "subspace" # tuple is subspace metatdata
          ## do some error checking
          subspaces.delete_if {|sp| sp.tag == tuple["tag"]}
        end
      end
    end
  end

  notify_waiters.each do |waiter|
    waiter << [
      succeeded ?  :success : :failure,
      msg.global_tick, msg.client_id, op, msg.tags]
  end

  trans = nil
  if msg.client_id == client_id
    trans = trans_waiters.first
    unless trans and trans.local_tick == msg.local_tick
      log.error "transaction #{op} out of order in sequence " +
        trans_waiters.inspect
      ## exit? wait?
    end
    trans_waiters.shift
    log.debug {"operation belongs to this client: #{trans.inspect}"}
  end

  if not take_tuples.empty?
    if succeeded
      take_tuples.each do |tuple|
        prep_waiters.keep_if do |waiter|
          waiter.unprepare tuple
          ## optimization: track number of instances of tuple, to avoid
          ## false positive in #unprepare
        end
      end

      log.debug {trans ? "taking #{granted_tuples}" :
        "client #{msg.client_id} takes #{granted_tuples}"}

    else
      log.debug {
        missing = op.takes - take_tuples
        trans ? "failed to take #{missing}" :
        "client #{msg.client_id} failed to take #{missing}"}
    end
  end

  if succeeded
    op.writes.each do |tuple|
      read_waiters.delete_if do |waiter|
        waiter.gloms tuple
      end
    end

    op.pulses.each do |tuple|
      log.debug {"pulsing #{tuple}"}
      read_waiters.delete_if do |waiter|
        waiter.gloms tuple
      end
    end

    op.writes.each do |tuple|
      prep_waiters.keep_if do |waiter|
        waiter.prepare tuple
      end
    end
  end

  if trans
    trans_waiters.delete trans

    if succeeded
      trans.done msg.global_tick, granted_tuples # note: tuples not frozen
    else
      trans.fail (op.takes - take_tuples) + (op.reads - read_tuples)
    end
  end
end

#handle_one_requestObject



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/tupelo/client/worker.rb', line 205

def handle_one_request
  case cmd = cmd_queue.pop
  when :stop
    @stopping = true
    if trans_waiters.empty?
      throw :done
    else
      log.info {"stopping; waiting for #{trans_waiters}"}
    end
  when message_class
    handle_message cmd
    if @stopping
      if trans_waiters.empty?
        throw :done
      else
        log.info {"stopping; waiting for #{trans_waiters}"}
      end
    end
  else
    handle_client_request cmd unless @stopping
  end
end

#handle_seq_closedObject



179
180
181
182
# File 'lib/tupelo/client/worker.rb', line 179

def handle_seq_closed
  ## what to do here in general?
  ## for each waiter, push :stop into queue ?
end

#handle_transaction(t) ⇒ Object



487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
# File 'lib/tupelo/client/worker.rb', line 487

def handle_transaction t
  case
  when t.open?
    t.prepare
    prep_waiters << t unless prep_waiters.include? t
  when t.closed?
    t.submit
    prep_waiters.delete t
  when t.failed?
  else
    log.warn "not open or closed or failed: #{t.inspect}"
  end
rescue => ex
  log.error "error while handling #{t.inspect}: #{ex}"
  t.error ex
end

#handle_unwaiter(unwaiter) ⇒ Object



504
505
506
507
# File 'lib/tupelo/client/worker.rb', line 504

def handle_unwaiter unwaiter
  waiter = unwaiter.waiter
  read_waiters.delete waiter or prep_waiters.delete waiter
end

#handle_waiter(waiter) ⇒ Object



509
510
511
512
513
514
515
516
517
518
519
520
521
# File 'lib/tupelo/client/worker.rb', line 509

def handle_waiter waiter
  if waiter.once
    tuple = tuplespace.find_match_for waiter.template
    if tuple
      waiter.peek tuple
    else
      read_waiters << waiter
    end
  else
    tuplespace.each {|tuple| waiter.gloms tuple}
    read_waiters << waiter
  end
end

#in_thread?Boolean

Returns:

  • (Boolean)


133
134
135
# File 'lib/tupelo/client/worker.rb', line 133

def in_thread?
  Thread.current == worker_thread
end

#log(*args) ⇒ Object



97
98
99
100
101
102
103
# File 'lib/tupelo/client/worker.rb', line 97

def log *args
  if args.empty?
    @log
  else
    @log.unknown *args
  end
end

#make_template(obj) ⇒ Object

Used by api to protect worker’s copy from client changes. Also, for serialization types that don’t represent symbols, this converts a template so that it works correctly regardless. So keyword args are very natural: read(k1: val, k2: val)



606
607
608
609
610
# File 'lib/tupelo/client/worker.rb', line 606

def make_template obj
  return obj unless obj.respond_to? :to_ary or obj.respond_to? :to_hash
  spec = Marshal.load(Marshal.dump(obj))
  rot_for(spec).optimize!
end

#observe_started_clientObject



137
138
139
140
141
142
143
# File 'lib/tupelo/client/worker.rb', line 137

def observe_started_client
  @client_id = client.client_id
  @blobber = client.blobber
  @seq = client.seq
  @arc = client.arc
  @start_tick = client.start_tick
end

#pot_for(spec) ⇒ Object



616
617
618
# File 'lib/tupelo/client/worker.rb', line 616

def pot_for spec
  PortableObjectTemplate.new(spec, proc {|k| blobber.load(blobber.dump(k))})
end

#read_messages_from_seqObject



184
185
186
187
188
# File 'lib/tupelo/client/worker.rb', line 184

def read_messages_from_seq
  seq.each do |msg|
    self << msg
  end
end

#record_history(msg) ⇒ Object



451
# File 'lib/tupelo/client/worker.rb', line 451

def record_history msg; end

#rot_for(spec) ⇒ Object



612
613
614
# File 'lib/tupelo/client/worker.rb', line 612

def rot_for spec
  RubyObjectTemplate.new(spec, proc {|k| blobber.load(blobber.dump(k))})
end

#run_msg_reader_threadObject



170
171
172
173
174
175
176
177
# File 'lib/tupelo/client/worker.rb', line 170

def run_msg_reader_thread
  read_messages_from_seq
  log.warn "connection to seq closed"
  handle_seq_closed
rescue => ex
  log.error ex
  raise
end

#run_request_loopObject



197
198
199
200
201
202
203
# File 'lib/tupelo/client/worker.rb', line 197

def run_request_loop
  catch :done do
    loop do
      handle_one_request
    end
  end
end

#run_worker_threadObject



190
191
192
193
194
195
# File 'lib/tupelo/client/worker.rb', line 190

def run_worker_thread
  run_request_loop
rescue => ex
  log.error ex
  raise
end

#send_transaction(transaction) ⇒ Object



539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
# File 'lib/tupelo/client/worker.rb', line 539

def send_transaction transaction
  msg = message_class.new
  msg.client_id = client_id
  msg.local_tick = local_tick + 1
  msg.global_tick = global_tick
  msg.delta = delta + 1 # pipelined write/take
  msg.tags = transaction.tags

  writes = transaction.writes
  pulses = transaction.pulses
  takes = transaction.take_tuples_for_remote.compact
  reads = transaction.read_tuples_for_remote.compact
  
  unless msg.tags
    tags = []
    tuples = [writes, pulses, takes, reads].compact.flatten(1)
    subspaces.each do |subspace|
      tuples.each do |tuple|
        if subspace === tuple
          tags << subspace.tag
          break
        end
      end
    end

    will_get_this_msg = client.subscribed_all ||
      tags.any? {|tag| client.subscribed_tags.include? tag} ## optimize

    unless will_get_this_msg
      tags << true # reflect
    end

    if not tags.empty?
      msg.tags = tags
      log.debug {"tagged transaction: #{tags}"}
    end
  end

  begin
    msg.blob = blobber.dump([
      transaction.atomic,
      writes, pulses, takes, reads
    ])
    ## optimization: use bitfields to identify which ops are present
    ## (instead of nils), and combine this with atomic flag in one int
  rescue => ex
    raise ex, "cannot serialize #{transaction.inspect}: #{ex}"
  end

  begin
    seq << msg
  rescue => ex
    raise ex, "cannot send request for #{transaction.inspect}: #{ex}"
  end

  @local_tick += 1
  @delta += 1

  trans_waiters << transaction

  return msg.local_tick
end

#sniff_meta_tuple(tuple) ⇒ Object



441
442
443
444
445
446
447
448
449
# File 'lib/tupelo/client/worker.rb', line 441

def sniff_meta_tuple tuple
  if tuple.kind_of? Hash and tuple.key? "__tupelo__"
    if tuple["__tupelo__"] == "subspace" # tuple is subspace metatdata
      ## do some error checking
      ## what if subspace already exists?
      subspaces << Subspace.new(tuple, self)
    end
  end
end

#startObject



118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/tupelo/client/worker.rb', line 118

def start
  return if @worker_thread

  log.info "worker starting"
  observe_started_client

  @msg_reader_thread = Thread.new do
    run_msg_reader_thread
  end

  @worker_thread = Thread.new do
    run_worker_thread
  end
end

#stopObject



145
146
147
148
149
150
# File 'lib/tupelo/client/worker.rb', line 145

def stop
  cmd_queue << :stop
  worker_thread.join if worker_thread ## join(limit)?
  msg_reader_thread.kill if msg_reader_thread
  @atdo.stop if @atdo
end

#stop!Object

stop without any remote handshaking



153
154
155
156
157
# File 'lib/tupelo/client/worker.rb', line 153

def stop!
  @msg_reader_thread.kill if msg_reader_thread
  @worker_thread.kill if worker_thread
  @atdo.stop if @atdo
end

#update_to_tick(tick: nil, tags: nil, all: false) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/tupelo/client/worker.rb', line 228

def update_to_tick tick: nil, tags: nil, all: false
  # At this point we know that the seq messages now accumulating in
  # cmd_queue are tick+1, tick+2, ....
  # (or a subset of this sequence if not subscribed_all).
  # Some of these might get discarded later if archiver is more current.
  log.debug {"update_to_tick #{tick}"}

  unless arc
    if tick > 0
      log.warn "no archiver provided; assuming pubsub mode; " +
        "some client ops (take and local read) will not work"
    end
    @global_tick = tick
    log.info "global_tick = #{global_tick}"
    return
  end

  log.info "requesting tuplespace from arc"
  subscription_delta = {
    request_all: all,
    request_tags: tags,
    subscribed_all: client.subscribed_all,
    subscribed_tags: client.subscribed_tags
  }
  arc << [GET_TUPLESPACE, subscription_delta, tick]

  begin
    tuplespace.clear
      ## in some cases, we can keep some of it, but the current
      ## archiver is not smart enough to send exactly the delta
      ### abort all current transactions???

    arc_tick = arc.read[0]
    log.info "arc says global_tick = #{arc_tick}"

    done = false
    count = 0
    arc.each do |tuple|
      if tuple.nil?
        done = true
      else
        raise "bad object stream from archiver" if done
        sniff_meta_tuple tuple
        tuplespace.insert tuple
        count += 1
      end
    end
    unless done
      raise "did not get all of tuplespace from archiver" ## roll back?
    end

    log.info "received tuplespace from arc: #{count} tuples"

    @global_tick = arc_tick
    log.info "global_tick = #{global_tick}"
  end

ensure
  arc.close if arc and not arc.closed?
end