Class: Tupelo::Client::Worker
Direct Known Subclasses
Defined Under Namespace
Constant Summary collapse
- GET_TUPLESPACE =
"get tuplespace"
Instance Attribute Summary collapse
-
#arc ⇒ Object
readonly
Returns the value of attribute arc.
-
#blobber ⇒ Object
readonly
Returns the value of attribute blobber.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#client_id ⇒ Object
readonly
Returns the value of attribute client_id.
-
#cmd_queue ⇒ Object
readonly
Returns the value of attribute cmd_queue.
-
#delta ⇒ Object
readonly
Returns the value of attribute delta.
-
#global_tick ⇒ Object
readonly
Returns the value of attribute global_tick.
-
#local_tick ⇒ Object
readonly
Returns the value of attribute local_tick.
-
#message_class ⇒ Object
readonly
Returns the value of attribute message_class.
-
#msg_reader_thread ⇒ Object
readonly
Returns the value of attribute msg_reader_thread.
-
#notify_waiters ⇒ Object
readonly
Returns the value of attribute notify_waiters.
-
#prep_waiters ⇒ Object
readonly
Returns the value of attribute prep_waiters.
-
#read_waiters ⇒ Object
readonly
Returns the value of attribute read_waiters.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
-
#start_tick ⇒ Object
readonly
Returns the value of attribute start_tick.
-
#subspaces ⇒ Object
readonly
Returns the value of attribute subspaces.
-
#trans_waiters ⇒ Object
readonly
Returns the value of attribute trans_waiters.
-
#tuplespace ⇒ Object
readonly
Returns the value of attribute tuplespace.
-
#worker_thread ⇒ Object
readonly
Returns the value of attribute worker_thread.
Instance Method Summary collapse
- #<<(cmd) ⇒ Object
- #at(time, &action) ⇒ Object
- #collect_tags(tuple) ⇒ Object
- #handle_client_request(req) ⇒ Object
- #handle_matcher(matcher) ⇒ Object
- #handle_message(msg) ⇒ Object
- #handle_one_request ⇒ Object
- #handle_seq_closed ⇒ Object
- #handle_transaction(t) ⇒ Object
- #handle_unwaiter(unwaiter) ⇒ Object
- #handle_waiter(waiter) ⇒ Object
- #in_thread? ⇒ Boolean
-
#initialize(client) ⇒ Worker
constructor
A new instance of Worker.
-
#is_meta_tuple?(tuple) ⇒ Boolean
Returns true if tuple is subspace metadata.
- #log(*args) ⇒ Object
-
#make_template(obj) ⇒ Object
Used by api to protect worker’s copy from client changes.
- #observe_started_client ⇒ Object
- #pot_for(spec) ⇒ Object
- #read_messages_from_seq ⇒ Object
- #record_history(msg) ⇒ Object
- #rot_for(spec) ⇒ Object
- #run_msg_reader_thread ⇒ Object
- #run_request_loop ⇒ Object
- #run_worker_thread ⇒ Object
- #send_transaction(transaction) ⇒ Object
- #sniff_meta_tuple(tuple) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
-
#stop! ⇒ Object
stop without any remote handshaking.
- #update_to_tick(tick: nil, tags: nil, all: false) ⇒ Object
Constructor Details
#initialize(client) ⇒ Worker
Returns a new instance of Worker.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/tupelo/client/worker.rb', line 68 def initialize client @client = client @seq = nil @arc = nil @log = client.log @client_id = nil @global_tick = nil @start_tick = nil @local_tick = 0 @delta = 0 @cmd_queue = client.make_queue @tuplespace = nil @message_class = client. @blobber = nil @read_waiters = [] @prep_waiters = [] @trans_waiters = [] @notify_waiters = [] @stopping = false @subspaces = [] end |
Instance Attribute Details
#arc ⇒ Object (readonly)
Returns the value of attribute arc.
11 12 13 |
# File 'lib/tupelo/client/worker.rb', line 11 def arc @arc end |
#blobber ⇒ Object (readonly)
Returns the value of attribute blobber.
22 23 24 |
# File 'lib/tupelo/client/worker.rb', line 22 def blobber @blobber end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
9 10 11 |
# File 'lib/tupelo/client/worker.rb', line 9 def client @client end |
#client_id ⇒ Object (readonly)
Returns the value of attribute client_id.
12 13 14 |
# File 'lib/tupelo/client/worker.rb', line 12 def client_id @client_id end |
#cmd_queue ⇒ Object (readonly)
Returns the value of attribute cmd_queue.
19 20 21 |
# File 'lib/tupelo/client/worker.rb', line 19 def cmd_queue @cmd_queue end |
#delta ⇒ Object (readonly)
Returns the value of attribute delta.
16 17 18 |
# File 'lib/tupelo/client/worker.rb', line 16 def delta @delta end |
#global_tick ⇒ Object (readonly)
Returns the value of attribute global_tick.
14 15 16 |
# File 'lib/tupelo/client/worker.rb', line 14 def global_tick @global_tick end |
#local_tick ⇒ Object (readonly)
Returns the value of attribute local_tick.
13 14 15 |
# File 'lib/tupelo/client/worker.rb', line 13 def local_tick @local_tick end |
#message_class ⇒ Object (readonly)
Returns the value of attribute message_class.
21 22 23 |
# File 'lib/tupelo/client/worker.rb', line 21 def @message_class end |
#msg_reader_thread ⇒ Object (readonly)
Returns the value of attribute msg_reader_thread.
17 18 19 |
# File 'lib/tupelo/client/worker.rb', line 17 def msg_reader_thread @msg_reader_thread end |
#notify_waiters ⇒ Object (readonly)
Returns the value of attribute notify_waiters.
26 27 28 |
# File 'lib/tupelo/client/worker.rb', line 26 def notify_waiters @notify_waiters end |
#prep_waiters ⇒ Object (readonly)
Returns the value of attribute prep_waiters.
24 25 26 |
# File 'lib/tupelo/client/worker.rb', line 24 def prep_waiters @prep_waiters end |
#read_waiters ⇒ Object (readonly)
Returns the value of attribute read_waiters.
23 24 25 |
# File 'lib/tupelo/client/worker.rb', line 23 def read_waiters @read_waiters end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
10 11 12 |
# File 'lib/tupelo/client/worker.rb', line 10 def seq @seq end |
#start_tick ⇒ Object (readonly)
Returns the value of attribute start_tick.
15 16 17 |
# File 'lib/tupelo/client/worker.rb', line 15 def start_tick @start_tick end |
#subspaces ⇒ Object (readonly)
Returns the value of attribute subspaces.
27 28 29 |
# File 'lib/tupelo/client/worker.rb', line 27 def subspaces @subspaces end |
#trans_waiters ⇒ Object (readonly)
Returns the value of attribute trans_waiters.
25 26 27 |
# File 'lib/tupelo/client/worker.rb', line 25 def trans_waiters @trans_waiters end |
#tuplespace ⇒ Object (readonly)
Returns the value of attribute tuplespace.
20 21 22 |
# File 'lib/tupelo/client/worker.rb', line 20 def tuplespace @tuplespace end |
#worker_thread ⇒ Object (readonly)
Returns the value of attribute worker_thread.
18 19 20 |
# File 'lib/tupelo/client/worker.rb', line 18 def worker_thread @worker_thread end |
Instance Method Details
#<<(cmd) ⇒ Object
162 163 164 |
# File 'lib/tupelo/client/worker.rb', line 162 def << cmd cmd_queue << cmd end |
#at(time, &action) ⇒ Object
155 156 157 158 159 160 |
# File 'lib/tupelo/client/worker.rb', line 155 def at time, &action @atdo ||= AtDo.new @atdo.at time do cmd_queue << action end end |
#collect_tags(tuple) ⇒ Object
537 538 539 |
# File 'lib/tupelo/client/worker.rb', line 537 def tuple subspaces.select {|subspace| subspace === tuple}.map(&:tag) end |
#handle_client_request(req) ⇒ Object
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 |
# File 'lib/tupelo/client/worker.rb', line 451 def handle_client_request req log.debug {"client requested #{req.inspect}"} case req when raise "only seq can send messages" when Waiter handle_waiter req when Matcher handle_matcher req when Unwaiter handle_unwaiter req when Transaction handle_transaction req when NotifyWaiter notify_waiters.delete req or notify_waiters.push req when Proc req.call else raise "unknown request from client: #{req}" end rescue => ex log.error "error while handling #{req.inspect}: #{ex}" ## Raise an error in the waiter? Need to generalize the mechanism in ## Transaction. end |
#handle_matcher(matcher) ⇒ Object
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 |
# File 'lib/tupelo/client/worker.rb', line 521 def handle_matcher matcher if matcher.all tuplespace.each {|tuple| matcher.gloms tuple} ## maybe should have tuplespace.find_all_matches_for ... ## in case there is an optimization matcher.fails else tuple = tuplespace.find_match_for matcher.template if tuple matcher.peek tuple else matcher.fails end end end |
#handle_message(msg) ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 |
# File 'lib/tupelo/client/worker.rb', line 285 def msg log.debug {"seq sent #{msg.inspect}"} if msg.control? client.handle_ack msg op_type, = msg.control_op case op_type when Funl::SUBSCRIBE_ALL update_to_tick tick: msg.global_tick, all: true when Funl::SUBSCRIBE update_to_tick tick: msg.global_tick, tags: (client. | ) when Funl::UNSUBSCRIBE_ALL update_to_tick tick: msg.global_tick, all: false when Funl::UNSUBSCRIBE update_to_tick tick: msg.global_tick, tags: (client. - ) else raise "Unimplemented: #{msg.inspect}" end return end if !global_tick raise "bug: should have subscribed and received ack before data" end if msg.global_tick < global_tick + 1 log.debug {"discarding redundant message at #{msg.global_tick}"} # due to archiver timing, for example return end @global_tick = msg.global_tick @delta = 0 record_history msg op = msg.blob ? Operation.new(*blobber.load(msg.blob)) : Operation::NOOP ## op.freeze_deeply log.debug {"applying #{op} from client #{msg.client_id}"} notify_waiters.each do |waiter| waiter << [:attempt, msg.global_tick, msg.client_id, op, msg.] end take_tuples = tuplespace.find_distinct_matches_for(op.takes) read_tuples = op.reads.map {|t| tuplespace.find_match_for(t)} succeeded = take_tuples.all? && read_tuples.all? if client.subscribed_all write_tuples = op.writes else write_tuples = op.writes.select do |tuple| subspaces.any? {|subspace| subspace === tuple} end end ## This is duplicated effort: the sender has already done this. ## So maybe the result could be transmitted in the msg protocol? if succeeded log.debug {"inserting #{op.writes}; deleting #{take_tuples}"} tuplespace.transaction inserts: write_tuples, deletes: take_tuples, tick: @global_tick op.writes.each do |tuple| tuple end take_tuples.each do |tuple| if tuple ## do some error checking subspaces.delete_if {|sp| sp.tag == tuple["tag"]} end end end notify_waiters.each do |waiter| waiter << [ succeeded ? :success : :failure, msg.global_tick, msg.client_id, op, msg.] end trans = nil if msg.client_id == client_id trans = trans_waiters.first unless trans and trans.local_tick == msg.local_tick log.error "transaction #{op} out of order in sequence " + trans_waiters.inspect ## exit? wait? end trans_waiters.shift log.debug {"operation belongs to this client: #{trans.inspect}"} end if not take_tuples.empty? if succeeded take_tuples.each do |tuple| prep_waiters.keep_if do |waiter| waiter.unprepare tuple ## optimization: track number of instances of tuple, to avoid ## false positive in #unprepare end end log.debug {trans ? "taking #{take_tuples}" : "client #{msg.client_id} takes #{take_tuples}"} else log.debug { missing = [] take_tuples.each_with_index do |tuple, i| missing << op.takes[i] unless tuple end trans ? "failed to take #{missing}" : "client #{msg.client_id} failed to take #{missing}"} end end if succeeded op.writes.each do |tuple| read_waiters.delete_if do |waiter| waiter.gloms tuple end end op.pulses.each do |tuple| log.debug {"pulsing #{tuple}"} read_waiters.delete_if do |waiter| waiter.gloms tuple end end op.writes.each do |tuple| prep_waiters.keep_if do |waiter| waiter.prepare tuple end end end if trans trans_waiters.delete trans if succeeded trans.done msg.global_tick, take_tuples # note: tuples not frozen else trans.fail (op.takes - take_tuples) + (op.reads - read_tuples) end end end |
#handle_one_request ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/tupelo/client/worker.rb', line 201 def handle_one_request case cmd = cmd_queue.pop when :stop @stopping = true if trans_waiters.empty? throw :done else log.info {"stopping; waiting for #{trans_waiters}"} end when cmd if @stopping if trans_waiters.empty? throw :done else log.info {"stopping; waiting for #{trans_waiters}"} end end else handle_client_request cmd unless @stopping end end |
#handle_seq_closed ⇒ Object
175 176 177 178 |
# File 'lib/tupelo/client/worker.rb', line 175 def handle_seq_closed ## what to do here in general? ## for each waiter, push :stop into queue ? end |
#handle_transaction(t) ⇒ Object
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 |
# File 'lib/tupelo/client/worker.rb', line 485 def handle_transaction t case when t.open? t.prepare prep_waiters << t unless prep_waiters.include? t when t.closed? t.submit prep_waiters.delete t when t.failed? else log.warn "not open or closed or failed: #{t.inspect}" end rescue => ex log.error "error while handling #{t.inspect}: #{ex}" t.error ex end |
#handle_unwaiter(unwaiter) ⇒ Object
502 503 504 505 |
# File 'lib/tupelo/client/worker.rb', line 502 def handle_unwaiter unwaiter waiter = unwaiter.waiter read_waiters.delete waiter or prep_waiters.delete waiter end |
#handle_waiter(waiter) ⇒ Object
507 508 509 510 511 512 513 514 515 516 517 518 519 |
# File 'lib/tupelo/client/worker.rb', line 507 def handle_waiter waiter if waiter.once tuple = tuplespace.find_match_for waiter.template if tuple waiter.peek tuple else read_waiters << waiter end else tuplespace.each {|tuple| waiter.gloms tuple} read_waiters << waiter end end |
#in_thread? ⇒ Boolean
129 130 131 |
# File 'lib/tupelo/client/worker.rb', line 129 def in_thread? Thread.current == worker_thread end |
#is_meta_tuple?(tuple) ⇒ Boolean
Returns true if tuple is subspace metadata.
436 437 438 439 |
# File 'lib/tupelo/client/worker.rb', line 436 def tuple tuple.kind_of? Hash and tuple.key? TUPELO_META_KEY and tuple[TUPELO_META_KEY] == "subspace" end |
#log(*args) ⇒ Object
93 94 95 96 97 98 99 |
# File 'lib/tupelo/client/worker.rb', line 93 def log *args if args.empty? @log else @log.unknown *args end end |
#make_template(obj) ⇒ Object
Used by api to protect worker’s copy from client changes. Also, for serialization types that don’t represent symbols, this converts a template so that it works correctly regardless. So keyword args are very natural: read(k1: val, k2: val)
624 625 626 627 628 |
# File 'lib/tupelo/client/worker.rb', line 624 def make_template obj return obj unless obj.respond_to? :to_ary or obj.respond_to? :to_hash spec = Marshal.load(Marshal.dump(obj)) rot_for(spec).optimize! end |
#observe_started_client ⇒ Object
133 134 135 136 137 138 139 |
# File 'lib/tupelo/client/worker.rb', line 133 def observe_started_client @client_id = client.client_id @blobber = client.blobber @seq = client.seq @arc = client.arc @start_tick = client.start_tick end |
#pot_for(spec) ⇒ Object
634 635 636 |
# File 'lib/tupelo/client/worker.rb', line 634 def pot_for spec PortableObjectTemplate.new(spec, proc {|k| blobber.load(blobber.dump(k))}) end |
#read_messages_from_seq ⇒ Object
180 181 182 183 184 |
# File 'lib/tupelo/client/worker.rb', line 180 def seq.each do |msg| self << msg end end |
#record_history(msg) ⇒ Object
449 |
# File 'lib/tupelo/client/worker.rb', line 449 def record_history msg; end |
#rot_for(spec) ⇒ Object
630 631 632 |
# File 'lib/tupelo/client/worker.rb', line 630 def rot_for spec RubyObjectTemplate.new(spec, proc {|k| blobber.load(blobber.dump(k))}) end |
#run_msg_reader_thread ⇒ Object
166 167 168 169 170 171 172 173 |
# File 'lib/tupelo/client/worker.rb', line 166 def run_msg_reader_thread log.warn "connection to seq closed" handle_seq_closed rescue => ex log.error ex raise end |
#run_request_loop ⇒ Object
193 194 195 196 197 198 199 |
# File 'lib/tupelo/client/worker.rb', line 193 def run_request_loop catch :done do loop do handle_one_request end end end |
#run_worker_thread ⇒ Object
186 187 188 189 190 191 |
# File 'lib/tupelo/client/worker.rb', line 186 def run_worker_thread run_request_loop rescue => ex log.error ex raise end |
#send_transaction(transaction) ⇒ Object
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 |
# File 'lib/tupelo/client/worker.rb', line 541 def send_transaction transaction msg = .new msg.client_id = client_id msg.local_tick = local_tick + 1 msg.global_tick = global_tick msg.delta = delta + 1 # pipelined write/take msg. = transaction. writes = transaction.writes pulses = transaction.pulses takes = transaction.take_tuples_for_remote.compact reads = transaction.read_tuples_for_remote.compact unless msg. = nil [takes, reads].compact.flatten(1).each do |tuple| if = (tuple) unless == d = ( - ) + ( - ) raise TransactionSubspaceError, "tuples crossing subspaces: #{d} in #{transaction.inspect}" end else = (tuple) end end ||= [] = [] [writes, pulses].compact.flatten(1).each do |tuple| |= (tuple) end if takes.empty? and reads.empty? = else d = - unless d.empty? raise TransactionSubspaceError, "writes crossing subspaces: #{d} in #{transaction.inspect}" end end will_get_this_msg = client.subscribed_all || .any? {|tag| client..include? tag} ## optimize unless will_get_this_msg << true # reflect end if not .empty? msg. = log.debug {"tagged transaction: #{}"} end end begin msg.blob = blobber.dump([writes, pulses, takes, reads]) ## optimization: use bitfields to identify which ops are present ## (instead of nils), in one int rescue => ex raise ex, "cannot serialize #{transaction.inspect}: #{ex}" end begin seq << msg rescue => ex raise ex, "cannot send request for #{transaction.inspect}: #{ex}" end @local_tick += 1 @delta += 1 trans_waiters << transaction return msg.local_tick end |
#sniff_meta_tuple(tuple) ⇒ Object
441 442 443 444 445 446 447 |
# File 'lib/tupelo/client/worker.rb', line 441 def tuple if tuple ## do some error checking ## what if subspace already exists? subspaces << Subspace.new(tuple, self) end end |
#start ⇒ Object
114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/tupelo/client/worker.rb', line 114 def start return if @worker_thread log.info "worker starting" observe_started_client @msg_reader_thread = Thread.new do run_msg_reader_thread end @worker_thread = Thread.new do run_worker_thread end end |
#stop ⇒ Object
141 142 143 144 145 146 |
# File 'lib/tupelo/client/worker.rb', line 141 def stop cmd_queue << :stop worker_thread.join if worker_thread ## join(limit)? msg_reader_thread.kill if msg_reader_thread @atdo.stop if @atdo end |
#stop! ⇒ Object
stop without any remote handshaking
149 150 151 152 153 |
# File 'lib/tupelo/client/worker.rb', line 149 def stop! @msg_reader_thread.kill if msg_reader_thread @worker_thread.kill if worker_thread @atdo.stop if @atdo end |
#update_to_tick(tick: nil, tags: nil, all: false) ⇒ Object
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/tupelo/client/worker.rb', line 224 def update_to_tick tick: nil, tags: nil, all: false # At this point we know that the seq messages now accumulating in # cmd_queue are tick+1, tick+2, .... # (or a subset of this sequence if not subscribed_all). # Some of these might get discarded later if archiver is more current. log.debug {"update_to_tick #{tick}"} unless arc if tick > 0 log.warn "no archiver provided; assuming pubsub mode; " + "some client ops (take and local read) will not work" end @global_tick = tick log.info "global_tick = #{global_tick}" return end log.info "requesting tuplespace from arc" subscription_delta = { request_all: all, request_tags: , subscribed_all: client.subscribed_all, subscribed_tags: client. } arc << [GET_TUPLESPACE, subscription_delta, tick] begin tuplespace.clear ## In some cases, we can keep some of it, but the current ## archiver is not smart enough to send exactly the delta. ## Also, might need to abort some current transactions. arc_tick = arc.read[0] log.info "arc says global_tick = #{arc_tick}" done = false count = 0 arc.each do |tuple| if tuple.nil? done = true else raise "bad object stream from archiver" if done tuple tuplespace.insert tuple count += 1 end end unless done raise "did not get all of tuplespace from archiver" ## roll back? end log.info "received tuplespace from arc: #{count} tuples" @global_tick = arc_tick log.info "global_tick = #{global_tick}" end ensure arc.close if arc and not arc.closed? end |