Class: Tupelo::Client::Worker
Direct Known Subclasses
Defined Under Namespace
Constant Summary collapse
- GET_TUPLESPACE =
"get tuplespace"
Instance Attribute Summary collapse
-
#arc ⇒ Object
readonly
Returns the value of attribute arc.
-
#blobber ⇒ Object
readonly
Returns the value of attribute blobber.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#client_id ⇒ Object
readonly
Returns the value of attribute client_id.
-
#cmd_queue ⇒ Object
readonly
Returns the value of attribute cmd_queue.
-
#delta ⇒ Object
readonly
Returns the value of attribute delta.
-
#global_tick ⇒ Object
readonly
Returns the value of attribute global_tick.
-
#local_tick ⇒ Object
readonly
Returns the value of attribute local_tick.
-
#message_class ⇒ Object
readonly
Returns the value of attribute message_class.
-
#msg_reader_thread ⇒ Object
readonly
Returns the value of attribute msg_reader_thread.
-
#notify_waiters ⇒ Object
readonly
Returns the value of attribute notify_waiters.
-
#prep_waiters ⇒ Object
readonly
Returns the value of attribute prep_waiters.
-
#read_waiters ⇒ Object
readonly
Returns the value of attribute read_waiters.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
-
#start_tick ⇒ Object
readonly
Returns the value of attribute start_tick.
-
#subspaces ⇒ Object
readonly
Returns the value of attribute subspaces.
-
#trans_waiters ⇒ Object
readonly
Returns the value of attribute trans_waiters.
-
#tuplespace ⇒ Object
readonly
Returns the value of attribute tuplespace.
-
#worker_thread ⇒ Object
readonly
Returns the value of attribute worker_thread.
Instance Method Summary collapse
- #<<(cmd) ⇒ Object
- #at(time, &action) ⇒ Object
- #handle_client_request(req) ⇒ Object
- #handle_matcher(matcher) ⇒ Object
- #handle_message(msg) ⇒ Object
- #handle_one_request ⇒ Object
- #handle_seq_closed ⇒ Object
- #handle_transaction(t) ⇒ Object
- #handle_unwaiter(unwaiter) ⇒ Object
- #handle_waiter(waiter) ⇒ Object
- #in_thread? ⇒ Boolean
-
#initialize(client) ⇒ Worker
constructor
A new instance of Worker.
- #log(*args) ⇒ Object
-
#make_template(obj) ⇒ Object
Used by api to protect worker’s copy from client changes.
- #observe_started_client ⇒ Object
- #pot_for(spec) ⇒ Object
- #read_messages_from_seq ⇒ Object
- #record_history(msg) ⇒ Object
- #rot_for(spec) ⇒ Object
- #run_msg_reader_thread ⇒ Object
- #run_request_loop ⇒ Object
- #run_worker_thread ⇒ Object
- #send_transaction(transaction) ⇒ Object
- #sniff_meta_tuple(tuple) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
-
#stop! ⇒ Object
stop without any remote handshaking.
- #update_to_tick(tick: nil, tags: nil, all: false) ⇒ Object
Constructor Details
#initialize(client) ⇒ Worker
Returns a new instance of Worker.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/tupelo/client/worker.rb', line 72 def initialize client @client = client @seq = nil @arc = nil @log = client.log @client_id = nil @global_tick = nil @start_tick = nil @local_tick = 0 @delta = 0 @cmd_queue = client.make_queue @tuplespace = nil = client. @blobber = nil @read_waiters = [] @prep_waiters = [] @trans_waiters = [] @notify_waiters = [] @stopping = false @subspaces = [] end |
Instance Attribute Details
#arc ⇒ Object (readonly)
Returns the value of attribute arc.
11 12 13 |
# File 'lib/tupelo/client/worker.rb', line 11 def arc @arc end |
#blobber ⇒ Object (readonly)
Returns the value of attribute blobber.
22 23 24 |
# File 'lib/tupelo/client/worker.rb', line 22 def blobber @blobber end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
9 10 11 |
# File 'lib/tupelo/client/worker.rb', line 9 def client @client end |
#client_id ⇒ Object (readonly)
Returns the value of attribute client_id.
12 13 14 |
# File 'lib/tupelo/client/worker.rb', line 12 def client_id @client_id end |
#cmd_queue ⇒ Object (readonly)
Returns the value of attribute cmd_queue.
19 20 21 |
# File 'lib/tupelo/client/worker.rb', line 19 def cmd_queue @cmd_queue end |
#delta ⇒ Object (readonly)
Returns the value of attribute delta.
16 17 18 |
# File 'lib/tupelo/client/worker.rb', line 16 def delta @delta end |
#global_tick ⇒ Object (readonly)
Returns the value of attribute global_tick.
14 15 16 |
# File 'lib/tupelo/client/worker.rb', line 14 def global_tick @global_tick end |
#local_tick ⇒ Object (readonly)
Returns the value of attribute local_tick.
13 14 15 |
# File 'lib/tupelo/client/worker.rb', line 13 def local_tick @local_tick end |
#message_class ⇒ Object (readonly)
Returns the value of attribute message_class.
21 22 23 |
# File 'lib/tupelo/client/worker.rb', line 21 def end |
#msg_reader_thread ⇒ Object (readonly)
Returns the value of attribute msg_reader_thread.
17 18 19 |
# File 'lib/tupelo/client/worker.rb', line 17 def msg_reader_thread @msg_reader_thread end |
#notify_waiters ⇒ Object (readonly)
Returns the value of attribute notify_waiters.
26 27 28 |
# File 'lib/tupelo/client/worker.rb', line 26 def notify_waiters @notify_waiters end |
#prep_waiters ⇒ Object (readonly)
Returns the value of attribute prep_waiters.
24 25 26 |
# File 'lib/tupelo/client/worker.rb', line 24 def prep_waiters @prep_waiters end |
#read_waiters ⇒ Object (readonly)
Returns the value of attribute read_waiters.
23 24 25 |
# File 'lib/tupelo/client/worker.rb', line 23 def read_waiters @read_waiters end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
10 11 12 |
# File 'lib/tupelo/client/worker.rb', line 10 def seq @seq end |
#start_tick ⇒ Object (readonly)
Returns the value of attribute start_tick.
15 16 17 |
# File 'lib/tupelo/client/worker.rb', line 15 def start_tick @start_tick end |
#subspaces ⇒ Object (readonly)
Returns the value of attribute subspaces.
27 28 29 |
# File 'lib/tupelo/client/worker.rb', line 27 def subspaces @subspaces end |
#trans_waiters ⇒ Object (readonly)
Returns the value of attribute trans_waiters.
25 26 27 |
# File 'lib/tupelo/client/worker.rb', line 25 def trans_waiters @trans_waiters end |
#tuplespace ⇒ Object (readonly)
Returns the value of attribute tuplespace.
20 21 22 |
# File 'lib/tupelo/client/worker.rb', line 20 def tuplespace @tuplespace end |
#worker_thread ⇒ Object (readonly)
Returns the value of attribute worker_thread.
18 19 20 |
# File 'lib/tupelo/client/worker.rb', line 18 def worker_thread @worker_thread end |
Instance Method Details
#<<(cmd) ⇒ Object
166 167 168 |
# File 'lib/tupelo/client/worker.rb', line 166 def << cmd cmd_queue << cmd end |
#at(time, &action) ⇒ Object
159 160 161 162 163 164 |
# File 'lib/tupelo/client/worker.rb', line 159 def at time, &action @atdo ||= AtDo.new @atdo.at time do cmd_queue << action end end |
#handle_client_request(req) ⇒ Object
453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 |
# File 'lib/tupelo/client/worker.rb', line 453 def handle_client_request req log.debug {"client requested #{req.inspect}"} case req when raise "only seq can send messages" when Waiter handle_waiter req when Matcher handle_matcher req when Unwaiter handle_unwaiter req when Transaction handle_transaction req when NotifyWaiter notify_waiters.delete req or notify_waiters.push req when Proc req.call else raise "unknown request from client: #{req}" end rescue => ex log.error "error while handling #{req.inspect}: #{ex}" ## Raise an error in the waiter? Need to generalize the mechanism in ## Transaction. end |
#handle_matcher(matcher) ⇒ Object
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 |
# File 'lib/tupelo/client/worker.rb', line 523 def handle_matcher matcher if matcher.all tuplespace.each {|tuple| matcher.gloms tuple} ## maybe should have tuplespace.find_all_matches_for ... ## in case there is an optimization matcher.fails else tuple = tuplespace.find_match_for waiter.template if tuple waiter.peek tuple else matcher.fails end end end |
#handle_message(msg) ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 |
# File 'lib/tupelo/client/worker.rb', line 289 def msg log.debug {"seq sent #{msg.inspect}"} if msg.control? client.handle_ack msg op_type, = msg.control_op case op_type when Funl::SUBSCRIBE_ALL update_to_tick tick: msg.global_tick, all: true when Funl::SUBSCRIBE update_to_tick tick: msg.global_tick, tags: (client. | ) when Funl::UNSUBSCRIBE_ALL update_to_tick tick: msg.global_tick, all: false when Funl::UNSUBSCRIBE update_to_tick tick: msg.global_tick, tags: (client. - ) else raise "Unimplemented: #{msg.inspect}" end return end if !global_tick raise "bug: should have subscribed and received ack before data" end if msg.global_tick < global_tick + 1 log.debug {"discarding redundant message at #{msg.global_tick}"} # due to archiver timing, for example return end @global_tick = msg.global_tick @delta = 0 record_history msg op = msg.blob ? Operation.new(*blobber.load(msg.blob)) : Operation::NOOP ## op.freeze_deeply log.debug {"applying #{op} from client #{msg.client_id}"} notify_waiters.each do |waiter| waiter << [:attempt, msg.global_tick, msg.client_id, op, msg.] end granted_tuples = tuplespace.find_distinct_matches_for(op.takes) read_tuples = op.reads.map {|t| tuplespace.find_match_for(t)} succeeded = !op.atomic || (granted_tuples.all? && read_tuples.all?) take_tuples = granted_tuples.compact if client.subscribed_all write_tuples = op.writes else write_tuples = op.writes.select do |tuple| subspaces.any? {|subspace| subspace === tuple} end end ## This is duplicated effort: the sender has already done this. ## So maybe the result could be transmitted in the msg protocol? if succeeded log.debug {"inserting #{op.writes}; deleting #{take_tuples}"} tuplespace.transaction inserts: write_tuples, deletes: take_tuples, tick: @global_tick op.writes.each do |tuple| tuple end take_tuples.each do |tuple| ### abstract this out if tuple.kind_of? Hash and tuple.key? "__tupelo__" if tuple["__tupelo__"] == "subspace" # tuple is subspace metatdata ## do some error checking subspaces.delete_if {|sp| sp.tag == tuple["tag"]} end end end end notify_waiters.each do |waiter| waiter << [ succeeded ? :success : :failure, msg.global_tick, msg.client_id, op, msg.] end trans = nil if msg.client_id == client_id trans = trans_waiters.first unless trans and trans.local_tick == msg.local_tick log.error "transaction #{op} out of order in sequence " + trans_waiters.inspect ## exit? wait? end trans_waiters.shift log.debug {"operation belongs to this client: #{trans.inspect}"} end if not take_tuples.empty? if succeeded take_tuples.each do |tuple| prep_waiters.keep_if do |waiter| waiter.unprepare tuple ## optimization: track number of instances of tuple, to avoid ## false positive in #unprepare end end log.debug {trans ? "taking #{granted_tuples}" : "client #{msg.client_id} takes #{granted_tuples}"} else log.debug { missing = op.takes - take_tuples trans ? "failed to take #{missing}" : "client #{msg.client_id} failed to take #{missing}"} end end if succeeded op.writes.each do |tuple| read_waiters.delete_if do |waiter| waiter.gloms tuple end end op.pulses.each do |tuple| log.debug {"pulsing #{tuple}"} read_waiters.delete_if do |waiter| waiter.gloms tuple end end op.writes.each do |tuple| prep_waiters.keep_if do |waiter| waiter.prepare tuple end end end if trans trans_waiters.delete trans if succeeded trans.done msg.global_tick, granted_tuples # note: tuples not frozen else trans.fail (op.takes - take_tuples) + (op.reads - read_tuples) end end end |
#handle_one_request ⇒ Object
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/tupelo/client/worker.rb', line 205 def handle_one_request case cmd = cmd_queue.pop when :stop @stopping = true if trans_waiters.empty? throw :done else log.info {"stopping; waiting for #{trans_waiters}"} end when cmd if @stopping if trans_waiters.empty? throw :done else log.info {"stopping; waiting for #{trans_waiters}"} end end else handle_client_request cmd unless @stopping end end |
#handle_seq_closed ⇒ Object
179 180 181 182 |
# File 'lib/tupelo/client/worker.rb', line 179 def handle_seq_closed ## what to do here in general? ## for each waiter, push :stop into queue ? end |
#handle_transaction(t) ⇒ Object
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/tupelo/client/worker.rb', line 487 def handle_transaction t case when t.open? t.prepare prep_waiters << t unless prep_waiters.include? t when t.closed? t.submit prep_waiters.delete t when t.failed? else log.warn "not open or closed or failed: #{t.inspect}" end rescue => ex log.error "error while handling #{t.inspect}: #{ex}" t.error ex end |
#handle_unwaiter(unwaiter) ⇒ Object
504 505 506 507 |
# File 'lib/tupelo/client/worker.rb', line 504 def handle_unwaiter unwaiter waiter = unwaiter.waiter read_waiters.delete waiter or prep_waiters.delete waiter end |
#handle_waiter(waiter) ⇒ Object
509 510 511 512 513 514 515 516 517 518 519 520 521 |
# File 'lib/tupelo/client/worker.rb', line 509 def handle_waiter waiter if waiter.once tuple = tuplespace.find_match_for waiter.template if tuple waiter.peek tuple else read_waiters << waiter end else tuplespace.each {|tuple| waiter.gloms tuple} read_waiters << waiter end end |
#in_thread? ⇒ Boolean
133 134 135 |
# File 'lib/tupelo/client/worker.rb', line 133 def in_thread? Thread.current == worker_thread end |
#log(*args) ⇒ Object
97 98 99 100 101 102 103 |
# File 'lib/tupelo/client/worker.rb', line 97 def log *args if args.empty? @log else @log.unknown *args end end |
#make_template(obj) ⇒ Object
Used by api to protect worker’s copy from client changes. Also, for serialization types that don’t represent symbols, this converts a template so that it works correctly regardless. So keyword args are very natural: read(k1: val, k2: val)
606 607 608 609 610 |
# File 'lib/tupelo/client/worker.rb', line 606 def make_template obj return obj unless obj.respond_to? :to_ary or obj.respond_to? :to_hash spec = Marshal.load(Marshal.dump(obj)) rot_for(spec).optimize! end |
#observe_started_client ⇒ Object
137 138 139 140 141 142 143 |
# File 'lib/tupelo/client/worker.rb', line 137 def observe_started_client @client_id = client.client_id @blobber = client.blobber @seq = client.seq @arc = client.arc @start_tick = client.start_tick end |
#pot_for(spec) ⇒ Object
616 617 618 |
# File 'lib/tupelo/client/worker.rb', line 616 def pot_for spec PortableObjectTemplate.new(spec, proc {|k| blobber.load(blobber.dump(k))}) end |
#read_messages_from_seq ⇒ Object
184 185 186 187 188 |
# File 'lib/tupelo/client/worker.rb', line 184 def seq.each do |msg| self << msg end end |
#record_history(msg) ⇒ Object
451 |
# File 'lib/tupelo/client/worker.rb', line 451 def record_history msg; end |
#rot_for(spec) ⇒ Object
612 613 614 |
# File 'lib/tupelo/client/worker.rb', line 612 def rot_for spec RubyObjectTemplate.new(spec, proc {|k| blobber.load(blobber.dump(k))}) end |
#run_msg_reader_thread ⇒ Object
170 171 172 173 174 175 176 177 |
# File 'lib/tupelo/client/worker.rb', line 170 def run_msg_reader_thread log.warn "connection to seq closed" handle_seq_closed rescue => ex log.error ex raise end |
#run_request_loop ⇒ Object
197 198 199 200 201 202 203 |
# File 'lib/tupelo/client/worker.rb', line 197 def run_request_loop catch :done do loop do handle_one_request end end end |
#run_worker_thread ⇒ Object
190 191 192 193 194 195 |
# File 'lib/tupelo/client/worker.rb', line 190 def run_worker_thread run_request_loop rescue => ex log.error ex raise end |
#send_transaction(transaction) ⇒ Object
539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 |
# File 'lib/tupelo/client/worker.rb', line 539 def send_transaction transaction msg = .new msg.client_id = client_id msg.local_tick = local_tick + 1 msg.global_tick = global_tick msg.delta = delta + 1 # pipelined write/take msg. = transaction. writes = transaction.writes pulses = transaction.pulses takes = transaction.take_tuples_for_remote.compact reads = transaction.read_tuples_for_remote.compact unless msg. = [] tuples = [writes, pulses, takes, reads].compact.flatten(1) subspaces.each do |subspace| tuples.each do |tuple| if subspace === tuple << subspace.tag break end end end will_get_this_msg = client.subscribed_all || .any? {|tag| client..include? tag} ## optimize unless will_get_this_msg << true # reflect end if not .empty? msg. = log.debug {"tagged transaction: #{tags}"} end end begin msg.blob = blobber.dump([ transaction.atomic, writes, pulses, takes, reads ]) ## optimization: use bitfields to identify which ops are present ## (instead of nils), and combine this with atomic flag in one int rescue => ex raise ex, "cannot serialize #{transaction.inspect}: #{ex}" end begin seq << msg rescue => ex raise ex, "cannot send request for #{transaction.inspect}: #{ex}" end @local_tick += 1 @delta += 1 trans_waiters << transaction return msg.local_tick end |
#sniff_meta_tuple(tuple) ⇒ Object
441 442 443 444 445 446 447 448 449 |
# File 'lib/tupelo/client/worker.rb', line 441 def tuple if tuple.kind_of? Hash and tuple.key? "__tupelo__" if tuple["__tupelo__"] == "subspace" # tuple is subspace metatdata ## do some error checking ## what if subspace already exists? subspaces << Subspace.new(tuple, self) end end end |
#start ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/tupelo/client/worker.rb', line 118 def start return if @worker_thread log.info "worker starting" observe_started_client @msg_reader_thread = Thread.new do run_msg_reader_thread end @worker_thread = Thread.new do run_worker_thread end end |
#stop ⇒ Object
145 146 147 148 149 150 |
# File 'lib/tupelo/client/worker.rb', line 145 def stop cmd_queue << :stop worker_thread.join if worker_thread ## join(limit)? msg_reader_thread.kill if msg_reader_thread @atdo.stop if @atdo end |
#stop! ⇒ Object
stop without any remote handshaking
153 154 155 156 157 |
# File 'lib/tupelo/client/worker.rb', line 153 def stop! @msg_reader_thread.kill if msg_reader_thread @worker_thread.kill if worker_thread @atdo.stop if @atdo end |
#update_to_tick(tick: nil, tags: nil, all: false) ⇒ Object
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/tupelo/client/worker.rb', line 228 def update_to_tick tick: nil, tags: nil, all: false # At this point we know that the seq messages now accumulating in # cmd_queue are tick+1, tick+2, .... # (or a subset of this sequence if not subscribed_all). # Some of these might get discarded later if archiver is more current. log.debug {"update_to_tick #{tick}"} unless arc if tick > 0 log.warn "no archiver provided; assuming pubsub mode; " + "some client ops (take and local read) will not work" end @global_tick = tick log.info "global_tick = #{global_tick}" return end log.info "requesting tuplespace from arc" subscription_delta = { request_all: all, request_tags: , subscribed_all: client.subscribed_all, subscribed_tags: client. } arc << [GET_TUPLESPACE, subscription_delta, tick] begin tuplespace.clear ## in some cases, we can keep some of it, but the current ## archiver is not smart enough to send exactly the delta ### abort all current transactions??? arc_tick = arc.read[0] log.info "arc says global_tick = #{arc_tick}" done = false count = 0 arc.each do |tuple| if tuple.nil? done = true else raise "bad object stream from archiver" if done tuple tuplespace.insert tuple count += 1 end end unless done raise "did not get all of tuplespace from archiver" ## roll back? end log.info "received tuplespace from arc: #{count} tuples" @global_tick = arc_tick log.info "global_tick = #{global_tick}" end ensure arc.close if arc and not arc.closed? end |