Class: Funl::Client

Inherits:
Object
  • Object
show all
Includes:
Stream
Defined in:
lib/funl/client.rb

Overview

Generic client base class. Manages the setup and handshake on the streams to the client sequencer and the message sequencer.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Stream

#client_stream_for, #message_server_stream_for, #server_stream_for

Constructor Details

#initialize(seq: seq!, , cseq: cseq!, , arc: nil, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, symbolize_keys: false) ⇒ Client

Returns a new instance of Client.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/funl/client.rb', line 26

def initialize(seq: seq!, cseq: cseq!, arc: nil,
      log: Logger.new($stderr),
      stream_type: ObjectStream::MSGPACK_TYPE,
      message_class: Message,
      symbolize_keys: false)

  @log = log
  @stream_type = stream_type ## discover this thru connections
  @message_class = message_class
  @symbolize_keys = symbolize_keys

  @seq = client_stream_for(seq)
  @cseq = client_stream_for(cseq)
  @arcio = arc

  @sub_tracker = SubscriptionTracker.new(self)
end

Instance Attribute Details

#arcObject (readonly)

Returns the value of attribute arc.



15
16
17
# File 'lib/funl/client.rb', line 15

def arc
  @arc
end

#blob_typeObject (readonly)

Returns the value of attribute blob_type.



22
23
24
# File 'lib/funl/client.rb', line 22

def blob_type
  @blob_type
end

#blobberObject (readonly)

Returns the value of attribute blobber.



23
24
25
# File 'lib/funl/client.rb', line 23

def blobber
  @blobber
end

#client_idObject (readonly)

Returns the value of attribute client_id.



19
20
21
# File 'lib/funl/client.rb', line 19

def client_id
  @client_id
end

#cseqObject (readonly)

Returns the value of attribute cseq.



14
15
16
# File 'lib/funl/client.rb', line 14

def cseq
  @cseq
end

#greetingObject (readonly)

Returns the value of attribute greeting.



20
21
22
# File 'lib/funl/client.rb', line 20

def greeting
  @greeting
end

#logObject (readonly)

Returns the value of attribute log.



16
17
18
# File 'lib/funl/client.rb', line 16

def log
  @log
end

#message_classObject (readonly)

Returns the value of attribute message_class.



18
19
20
# File 'lib/funl/client.rb', line 18

def message_class
  @message_class
end

#seqObject (readonly)

Returns the value of attribute seq.



13
14
15
# File 'lib/funl/client.rb', line 13

def seq
  @seq
end

#start_tickObject (readonly)

Returns the value of attribute start_tick.



21
22
23
# File 'lib/funl/client.rb', line 21

def start_tick
  @start_tick
end

#stream_typeObject (readonly)

Returns the value of attribute stream_type.



17
18
19
# File 'lib/funl/client.rb', line 17

def stream_type
  @stream_type
end

#symbolize_keysObject (readonly)

Returns the value of attribute symbolize_keys.



24
25
26
# File 'lib/funl/client.rb', line 24

def symbolize_keys
  @symbolize_keys
end

Instance Method Details

#arc_server_stream_for(io) ⇒ Object



124
125
126
127
# File 'lib/funl/client.rb', line 124

def arc_server_stream_for io
  server_stream_for(io, type: blob_type)
    # note: blob_type, not stream_type, since we are sending bare objects
end

#cseq_read_client_idObject



100
101
102
103
104
105
106
# File 'lib/funl/client.rb', line 100

def cseq_read_client_id
  log.debug "getting client_id from cseq"
  @client_id = cseq.read["client_id"]
  log.info "client_id = #{client_id}"
  cseq.close rescue nil
  @cseq = nil
end

#handle_ack(ack) ⇒ Object

Maintain subscription status. Must be called by the user (or subclass) of this class, most likely in the thread created by #start.

Raises:

  • (ArgumentError)


94
95
96
97
98
# File 'lib/funl/client.rb', line 94

def handle_ack ack
  raise ArgumentError unless ack.control?
  op_type, tags = ack.control_op
  @sub_tracker.update op_type, tags
end

#seq_read_greetingObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/funl/client.rb', line 108

def seq_read_greeting
  log.debug "getting greeting from seq"
  @greeting = seq.read
  @start_tick = greeting["tick"]
  log.info "start_tick = #{start_tick}"
  @blob_type = greeting["blob"]
  log.info "blob_type = #{blob_type}"
  @blobber = Blobber.for(blob_type, symbolize_keys: symbolize_keys)
  seq.expect message_class

  @arc = @arcio &&
    client_stream_for(@arcio, type: blob_type,
      symbolize_keys: symbolize_keys)
    # note: @arc is nil when client is the archiver itself
end

#startObject

Handshake with both cseq and seq. Does not start any threads–that is left to subclasses. Yields after getting client id so that caller can set log.progname, for example.



47
48
49
50
51
# File 'lib/funl/client.rb', line 47

def start
  cseq_read_client_id
  yield if block_given?
  seq_read_greeting
end

#subscribe(tags) ⇒ Object

Send a subscribe message registering interest in tags. Seq will respond with an ack message containing the tick on which subscription took effect. Waits for the specified tags to be subscribed (assuming #handle_ack is called regularly, such as in worker thread).



65
66
67
# File 'lib/funl/client.rb', line 65

def subscribe tags
  @sub_tracker.subscribe tags
end

#subscribe_allObject

Send a subscribe message registering interest in all messages. Seq will respond with an ack message containing the tick on which subscription took effect. Waits for the subscription to start (assuming #handle_ack is called regularly).



73
74
75
# File 'lib/funl/client.rb', line 73

def subscribe_all
  @sub_tracker.subscribe_all
end

#subscribed_allObject



53
54
55
# File 'lib/funl/client.rb', line 53

def subscribed_all
  @sub_tracker.subscribed_all
end

#subscribed_tagsObject



57
58
59
# File 'lib/funl/client.rb', line 57

def subscribed_tags
  @sub_tracker.subscribed_tags
end

#unsubscribe(tags) ⇒ Object

Unsubscribe from tags. Seq will respond with an ack message containing the tick on which subscription ended. Waits for the subscription to end (assuming #handle_ack is called regularly).



80
81
82
# File 'lib/funl/client.rb', line 80

def unsubscribe tags
  @sub_tracker.unsubscribe tags
end

#unsubscribe_allObject

Unsubscribe from all messages. Any tag subscriptions remain in effect. Seq will respond with an ack message containing the tick on which subscription ended. Waits for the subscription to end (assuming #handle_ack is called regularly).



88
89
90
# File 'lib/funl/client.rb', line 88

def unsubscribe_all
  @sub_tracker.unsubscribe_all
end