Class: Funl::Client
Overview
Generic client base class. Manages the setup and handshake on the streams to the client sequencer and the message sequencer.
Instance Attribute Summary collapse
-
#arc ⇒ Object
readonly
Returns the value of attribute arc.
-
#blob_type ⇒ Object
readonly
Returns the value of attribute blob_type.
-
#blobber ⇒ Object
readonly
Returns the value of attribute blobber.
-
#client_id ⇒ Object
readonly
Returns the value of attribute client_id.
-
#cseq ⇒ Object
readonly
Returns the value of attribute cseq.
-
#greeting ⇒ Object
readonly
Returns the value of attribute greeting.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#message_class ⇒ Object
readonly
Returns the value of attribute message_class.
-
#seq ⇒ Object
readonly
Returns the value of attribute seq.
-
#start_tick ⇒ Object
readonly
Returns the value of attribute start_tick.
-
#stream_type ⇒ Object
readonly
Returns the value of attribute stream_type.
-
#symbolize_keys ⇒ Object
readonly
Returns the value of attribute symbolize_keys.
Instance Method Summary collapse
- #arc_server_stream_for(io) ⇒ Object
- #cseq_read_client_id ⇒ Object
-
#handle_ack(ack) ⇒ Object
Maintain subscription status.
-
#initialize(seq: seq!, , cseq: cseq!, , arc: nil, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, symbolize_keys: false) ⇒ Client
constructor
A new instance of Client.
- #seq_read_greeting ⇒ Object
-
#start ⇒ Object
Handshake with both cseq and seq.
-
#subscribe(tags) ⇒ Object
Send a subscribe message registering interest in
tags
. -
#subscribe_all ⇒ Object
Send a subscribe message registering interest in all messages.
- #subscribed_all ⇒ Object
- #subscribed_tags ⇒ Object
-
#unsubscribe(tags) ⇒ Object
Unsubscribe from
tags
. -
#unsubscribe_all ⇒ Object
Unsubscribe from all messages.
Methods included from Stream
#client_stream_for, #message_server_stream_for, #server_stream_for
Constructor Details
#initialize(seq: seq!, , cseq: cseq!, , arc: nil, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, symbolize_keys: false) ⇒ Client
Returns a new instance of Client.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/funl/client.rb', line 26 def initialize(seq: seq!, cseq: cseq!, arc: nil, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, symbolize_keys: false) @log = log @stream_type = stream_type ## discover this thru connections @message_class = @symbolize_keys = symbolize_keys @seq = client_stream_for(seq) @cseq = client_stream_for(cseq) @arcio = arc @sub_tracker = SubscriptionTracker.new(self) end |
Instance Attribute Details
#arc ⇒ Object (readonly)
Returns the value of attribute arc.
15 16 17 |
# File 'lib/funl/client.rb', line 15 def arc @arc end |
#blob_type ⇒ Object (readonly)
Returns the value of attribute blob_type.
22 23 24 |
# File 'lib/funl/client.rb', line 22 def blob_type @blob_type end |
#blobber ⇒ Object (readonly)
Returns the value of attribute blobber.
23 24 25 |
# File 'lib/funl/client.rb', line 23 def blobber @blobber end |
#client_id ⇒ Object (readonly)
Returns the value of attribute client_id.
19 20 21 |
# File 'lib/funl/client.rb', line 19 def client_id @client_id end |
#cseq ⇒ Object (readonly)
Returns the value of attribute cseq.
14 15 16 |
# File 'lib/funl/client.rb', line 14 def cseq @cseq end |
#greeting ⇒ Object (readonly)
Returns the value of attribute greeting.
20 21 22 |
# File 'lib/funl/client.rb', line 20 def greeting @greeting end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
16 17 18 |
# File 'lib/funl/client.rb', line 16 def log @log end |
#message_class ⇒ Object (readonly)
Returns the value of attribute message_class.
18 19 20 |
# File 'lib/funl/client.rb', line 18 def @message_class end |
#seq ⇒ Object (readonly)
Returns the value of attribute seq.
13 14 15 |
# File 'lib/funl/client.rb', line 13 def seq @seq end |
#start_tick ⇒ Object (readonly)
Returns the value of attribute start_tick.
21 22 23 |
# File 'lib/funl/client.rb', line 21 def start_tick @start_tick end |
#stream_type ⇒ Object (readonly)
Returns the value of attribute stream_type.
17 18 19 |
# File 'lib/funl/client.rb', line 17 def stream_type @stream_type end |
#symbolize_keys ⇒ Object (readonly)
Returns the value of attribute symbolize_keys.
24 25 26 |
# File 'lib/funl/client.rb', line 24 def symbolize_keys @symbolize_keys end |
Instance Method Details
#arc_server_stream_for(io) ⇒ Object
124 125 126 127 |
# File 'lib/funl/client.rb', line 124 def arc_server_stream_for io server_stream_for(io, type: blob_type) # note: blob_type, not stream_type, since we are sending bare objects end |
#cseq_read_client_id ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/funl/client.rb', line 100 def cseq_read_client_id log.debug "getting client_id from cseq" @client_id = cseq.read["client_id"] log.info "client_id = #{client_id}" cseq.close rescue nil @cseq = nil end |
#handle_ack(ack) ⇒ Object
Maintain subscription status. Must be called by the user (or subclass) of this class, most likely in the thread created by #start.
94 95 96 97 98 |
# File 'lib/funl/client.rb', line 94 def handle_ack ack raise ArgumentError unless ack.control? op_type, = ack.control_op @sub_tracker.update op_type, end |
#seq_read_greeting ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/funl/client.rb', line 108 def seq_read_greeting log.debug "getting greeting from seq" @greeting = seq.read @start_tick = greeting["tick"] log.info "start_tick = #{start_tick}" @blob_type = greeting["blob"] log.info "blob_type = #{blob_type}" @blobber = Blobber.for(blob_type, symbolize_keys: symbolize_keys) seq.expect @arc = @arcio && client_stream_for(@arcio, type: blob_type, symbolize_keys: symbolize_keys) # note: @arc is nil when client is the archiver itself end |
#start ⇒ Object
Handshake with both cseq and seq. Does not start any threads–that is left to subclasses. Yields after getting client id so that caller can set log.progname, for example.
47 48 49 50 51 |
# File 'lib/funl/client.rb', line 47 def start cseq_read_client_id yield if block_given? seq_read_greeting end |
#subscribe(tags) ⇒ Object
Send a subscribe message registering interest in tags
. Seq will respond with an ack message containing the tick on which subscription took effect. Waits for the specified tags
to be subscribed (assuming #handle_ack is called regularly, such as in worker thread).
65 66 67 |
# File 'lib/funl/client.rb', line 65 def subscribe @sub_tracker.subscribe end |
#subscribe_all ⇒ Object
Send a subscribe message registering interest in all messages. Seq will respond with an ack message containing the tick on which subscription took effect. Waits for the subscription to start (assuming #handle_ack is called regularly).
73 74 75 |
# File 'lib/funl/client.rb', line 73 def subscribe_all @sub_tracker.subscribe_all end |
#subscribed_all ⇒ Object
53 54 55 |
# File 'lib/funl/client.rb', line 53 def subscribed_all @sub_tracker.subscribed_all end |
#subscribed_tags ⇒ Object
57 58 59 |
# File 'lib/funl/client.rb', line 57 def @sub_tracker. end |
#unsubscribe(tags) ⇒ Object
Unsubscribe from tags
. Seq will respond with an ack message containing the tick on which subscription ended. Waits for the subscription to end (assuming #handle_ack is called regularly).
80 81 82 |
# File 'lib/funl/client.rb', line 80 def unsubscribe @sub_tracker.unsubscribe end |
#unsubscribe_all ⇒ Object
Unsubscribe from all messages. Any tag subscriptions remain in effect. Seq will respond with an ack message containing the tick on which subscription ended. Waits for the subscription to end (assuming #handle_ack is called regularly).
88 89 90 |
# File 'lib/funl/client.rb', line 88 def unsubscribe_all @sub_tracker.unsubscribe_all end |