Class: Funl::Client
Overview
Generic client base class. Manages the setup and handshake on the streams to the client sequencer and the message sequencer.
Instance Attribute Summary collapse
-
#arc ⇒ Object
readonly
Returns the value of attribute arc.
-
#blob_type ⇒ Object
readonly
Returns the value of attribute blob_type.
-
#blobber ⇒ Object
readonly
Returns the value of attribute blobber.
-
#client_id ⇒ Object
readonly
Returns the value of attribute client_id.
-
#cseq ⇒ Object
readonly
Returns the value of attribute cseq.
-
#greeting ⇒ Object
readonly
Returns the value of attribute greeting.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#message_class ⇒ Object
readonly
Returns the value of attribute message_class.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
-
#start_tick ⇒ Object
readonly
Returns the value of attribute start_tick.
-
#stream_type ⇒ Object
readonly
Returns the value of attribute stream_type.
Instance Method Summary collapse
- #arc_server_stream_for(io) ⇒ Object
- #cseq_read_client_id ⇒ Object
-
#handle_ack(ack) ⇒ Object
Maintain subscription status.
-
#initialize(seq: seq!, , cseq: cseq!, , arc: nil, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message) ⇒ Client
constructor
A new instance of Client.
- #seq_read_greeting ⇒ Object
-
#start ⇒ Object
Handshake with both cseq and seq.
-
#subscribe(tags) ⇒ Object
Send a subscribe message registering interest in
tags. -
#subscribe_all ⇒ Object
Send a subscribe message registering interest in all messages.
- #subscribed_all ⇒ Object
- #subscribed_tags ⇒ Object
-
#unsubscribe(tags) ⇒ Object
Unsubscribe from
tags. -
#unsubscribe_all ⇒ Object
Unsubscribe from all messages.
Methods included from Stream
#client_stream_for, #message_server_stream_for, #server_stream_for
Constructor Details
#initialize(seq: seq!, , cseq: cseq!, , arc: nil, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message) ⇒ Client
Returns a new instance of Client.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/funl/client.rb', line 25 def initialize(seq: seq!, cseq: cseq!, arc: nil, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message) @log = log @stream_type = stream_type ## discover this thru connections = @seq = client_stream_for(seq) @cseq = client_stream_for(cseq) @arcio = arc @sub_tracker = SubscriptionTracker.new(self) end |
Instance Attribute Details
#arc ⇒ Object (readonly)
Returns the value of attribute arc.
15 16 17 |
# File 'lib/funl/client.rb', line 15 def arc @arc end |
#blob_type ⇒ Object (readonly)
Returns the value of attribute blob_type.
22 23 24 |
# File 'lib/funl/client.rb', line 22 def blob_type @blob_type end |
#blobber ⇒ Object (readonly)
Returns the value of attribute blobber.
23 24 25 |
# File 'lib/funl/client.rb', line 23 def blobber @blobber end |
#client_id ⇒ Object (readonly)
Returns the value of attribute client_id.
19 20 21 |
# File 'lib/funl/client.rb', line 19 def client_id @client_id end |
#cseq ⇒ Object (readonly)
Returns the value of attribute cseq.
14 15 16 |
# File 'lib/funl/client.rb', line 14 def cseq @cseq end |
#greeting ⇒ Object (readonly)
Returns the value of attribute greeting.
20 21 22 |
# File 'lib/funl/client.rb', line 20 def greeting @greeting end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
16 17 18 |
# File 'lib/funl/client.rb', line 16 def log @log end |
#message_class ⇒ Object (readonly)
Returns the value of attribute message_class.
18 19 20 |
# File 'lib/funl/client.rb', line 18 def end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
13 14 15 |
# File 'lib/funl/client.rb', line 13 def seq @seq end |
#start_tick ⇒ Object (readonly)
Returns the value of attribute start_tick.
21 22 23 |
# File 'lib/funl/client.rb', line 21 def start_tick @start_tick end |
#stream_type ⇒ Object (readonly)
Returns the value of attribute stream_type.
17 18 19 |
# File 'lib/funl/client.rb', line 17 def stream_type @stream_type end |
Instance Method Details
#arc_server_stream_for(io) ⇒ Object
119 120 121 122 |
# File 'lib/funl/client.rb', line 119 def arc_server_stream_for io server_stream_for(io, type: blob_type) # note: blob_type, not stream_type, since we are sending bare objects end |
#cseq_read_client_id ⇒ Object
97 98 99 100 101 102 103 |
# File 'lib/funl/client.rb', line 97 def cseq_read_client_id log.debug "getting client_id from cseq" @client_id = cseq.read["client_id"] log.info "client_id = #{client_id}" cseq.close rescue nil @cseq = nil end |
#handle_ack(ack) ⇒ Object
Maintain subscription status. Must be called by the user (or subclass) of this class, most likely in the thread created by #start.
91 92 93 94 95 |
# File 'lib/funl/client.rb', line 91 def handle_ack ack raise ArgumentError unless ack.control? op_type, = ack.control_op @sub_tracker.update op_type, end |
#seq_read_greeting ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/funl/client.rb', line 105 def seq_read_greeting log.debug "getting greeting from seq" @greeting = seq.read @start_tick = greeting["tick"] log.info "start_tick = #{start_tick}" @blob_type = greeting["blob"] log.info "blob_type = #{blob_type}" @blobber = Blobber.for(blob_type) seq.expect @arc = @arcio && client_stream_for(@arcio, type: blob_type) # note: @arc is nil when client is the archiver itself end |
#start ⇒ Object
Handshake with both cseq and seq. Does not start any threads–that is left to subclasses. Yields after getting client id so that caller can set log.progname, for example.
44 45 46 47 48 |
# File 'lib/funl/client.rb', line 44 def start cseq_read_client_id yield if block_given? seq_read_greeting end |
#subscribe(tags) ⇒ Object
Send a subscribe message registering interest in tags. Seq will respond with an ack message containing the tick on which subscription took effect. Waits for the specified tags to be subscribed (assuming #handle_ack is called regularly, such as in worker thread).
62 63 64 |
# File 'lib/funl/client.rb', line 62 def subscribe @sub_tracker.subscribe end |
#subscribe_all ⇒ Object
Send a subscribe message registering interest in all messages. Seq will respond with an ack message containing the tick on which subscription took effect. Waits for the subscription to start (assuming #handle_ack is called regularly).
70 71 72 |
# File 'lib/funl/client.rb', line 70 def subscribe_all @sub_tracker.subscribe_all end |
#subscribed_all ⇒ Object
50 51 52 |
# File 'lib/funl/client.rb', line 50 def subscribed_all @sub_tracker.subscribed_all end |
#subscribed_tags ⇒ Object
54 55 56 |
# File 'lib/funl/client.rb', line 54 def @sub_tracker. end |
#unsubscribe(tags) ⇒ Object
Unsubscribe from tags. Seq will respond with an ack message containing the tick on which subscription ended. Waits for the subscription to end (assuming #handle_ack is called regularly).
77 78 79 |
# File 'lib/funl/client.rb', line 77 def unsubscribe @sub_tracker.unsubscribe end |
#unsubscribe_all ⇒ Object
Unsubscribe from all messages. Any tag subscriptions remain in effect. Seq will respond with an ack message containing the tick on which subscription ended. Waits for the subscription to end (assuming #handle_ack is called regularly).
85 86 87 |
# File 'lib/funl/client.rb', line 85 def unsubscribe_all @sub_tracker.unsubscribe_all end |