Class: Funl::Client

Inherits:
Object
  • Object
show all
Includes:
Stream
Defined in:
lib/funl/client.rb

Overview

Generic client base class. Manages the setup and handshake on the streams to the client sequencer and the message sequencer.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Stream

#client_stream_for, #message_server_stream_for, #server_stream_for

Constructor Details

#initialize(seq: seq!, , cseq: cseq!, , arc: nil, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message) ⇒ Client

Returns a new instance of Client.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/funl/client.rb', line 25

def initialize(seq: seq!, cseq: cseq!, arc: nil,
      log: Logger.new($stderr),
      stream_type: ObjectStream::MSGPACK_TYPE,
      message_class: Message)

  @log = log
  @stream_type = stream_type ## discover this thru connections
  @message_class = message_class

  @seq = client_stream_for(seq)
  @cseq = client_stream_for(cseq)
  @arcio = arc
  
  @sub_tracker = SubscriptionTracker.new(self)
end

Instance Attribute Details

#arcObject (readonly)

Returns the value of attribute arc.



15
16
17
# File 'lib/funl/client.rb', line 15

def arc
  @arc
end

#blob_typeObject (readonly)

Returns the value of attribute blob_type.



22
23
24
# File 'lib/funl/client.rb', line 22

def blob_type
  @blob_type
end

#blobberObject (readonly)

Returns the value of attribute blobber.



23
24
25
# File 'lib/funl/client.rb', line 23

def blobber
  @blobber
end

#client_idObject (readonly)

Returns the value of attribute client_id.



19
20
21
# File 'lib/funl/client.rb', line 19

def client_id
  @client_id
end

#cseqObject (readonly)

Returns the value of attribute cseq.



14
15
16
# File 'lib/funl/client.rb', line 14

def cseq
  @cseq
end

#greetingObject (readonly)

Returns the value of attribute greeting.



20
21
22
# File 'lib/funl/client.rb', line 20

def greeting
  @greeting
end

#logObject (readonly)

Returns the value of attribute log.



16
17
18
# File 'lib/funl/client.rb', line 16

def log
  @log
end

#message_classObject (readonly)

Returns the value of attribute message_class.



18
19
20
# File 'lib/funl/client.rb', line 18

def message_class
  @message_class
end

#seqObject (readonly)

Returns the value of attribute seq.



13
14
15
# File 'lib/funl/client.rb', line 13

def seq
  @seq
end

#start_tickObject (readonly)

Returns the value of attribute start_tick.



21
22
23
# File 'lib/funl/client.rb', line 21

def start_tick
  @start_tick
end

#stream_typeObject (readonly)

Returns the value of attribute stream_type.



17
18
19
# File 'lib/funl/client.rb', line 17

def stream_type
  @stream_type
end

Instance Method Details

#arc_server_stream_for(io) ⇒ Object



119
120
121
122
# File 'lib/funl/client.rb', line 119

def arc_server_stream_for io
  server_stream_for(io, type: blob_type)
    # note: blob_type, not stream_type, since we are sending bare objects
end

#cseq_read_client_idObject



97
98
99
100
101
102
103
# File 'lib/funl/client.rb', line 97

def cseq_read_client_id
  log.debug "getting client_id from cseq"
  @client_id = cseq.read["client_id"]
  log.info "client_id = #{client_id}"
  cseq.close rescue nil
  @cseq = nil
end

#handle_ack(ack) ⇒ Object

Maintain subscription status. Must be called by the user (or subclass) of this class, most likely in the thread created by #start.

Raises:

  • (ArgumentError)


91
92
93
94
95
# File 'lib/funl/client.rb', line 91

def handle_ack ack
  raise ArgumentError unless ack.control?
  op_type, tags = ack.control_op
  @sub_tracker.update op_type, tags
end

#seq_read_greetingObject



105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/funl/client.rb', line 105

def seq_read_greeting
  log.debug "getting greeting from seq"
  @greeting = seq.read
  @start_tick = greeting["tick"]
  log.info "start_tick = #{start_tick}"
  @blob_type = greeting["blob"]
  log.info "blob_type = #{blob_type}"
  @blobber = Blobber.for(blob_type)
  seq.expect message_class

  @arc = @arcio && client_stream_for(@arcio, type: blob_type)
    # note: @arc is nil when client is the archiver itself
end

#startObject

Handshake with both cseq and seq. Does not start any threads–that is left to subclasses. Yields after getting client id so that caller can set log.progname, for example.



44
45
46
47
48
# File 'lib/funl/client.rb', line 44

def start
  cseq_read_client_id
  yield if block_given?
  seq_read_greeting
end

#subscribe(tags) ⇒ Object

Send a subscribe message registering interest in tags. Seq will respond with an ack message containing the tick on which subscription took effect. Waits for the specified tags to be subscribed (assuming #handle_ack is called regularly, such as in worker thread).



62
63
64
# File 'lib/funl/client.rb', line 62

def subscribe tags
  @sub_tracker.subscribe tags
end

#subscribe_allObject

Send a subscribe message registering interest in all messages. Seq will respond with an ack message containing the tick on which subscription took effect. Waits for the subscription to start (assuming #handle_ack is called regularly).



70
71
72
# File 'lib/funl/client.rb', line 70

def subscribe_all
  @sub_tracker.subscribe_all
end

#subscribed_allObject



50
51
52
# File 'lib/funl/client.rb', line 50

def subscribed_all
  @sub_tracker.subscribed_all
end

#subscribed_tagsObject



54
55
56
# File 'lib/funl/client.rb', line 54

def subscribed_tags
  @sub_tracker.subscribed_tags
end

#unsubscribe(tags) ⇒ Object

Unsubscribe from tags. Seq will respond with an ack message containing the tick on which subscription ended. Waits for the subscription to end (assuming #handle_ack is called regularly).



77
78
79
# File 'lib/funl/client.rb', line 77

def unsubscribe tags
  @sub_tracker.unsubscribe tags
end

#unsubscribe_allObject

Unsubscribe from all messages. Any tag subscriptions remain in effect. Seq will respond with an ack message containing the tick on which subscription ended. Waits for the subscription to end (assuming #handle_ack is called regularly).



85
86
87
# File 'lib/funl/client.rb', line 85

def unsubscribe_all
  @sub_tracker.unsubscribe_all
end