Class: Funl::MessageSequencer

Inherits:
Object
  • Object
show all
Includes:
Stream
Defined in:
lib/funl/message-sequencer.rb

Overview

Assigns a unique sequential ids to each message and relays it to its destinations.

Direct Known Subclasses

MessageSequencerNio, MessageSequencerSelect

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Stream

#client_stream_for, #message_server_stream_for, #server_stream_for

Constructor Details

#initialize(server, *conns, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, blob_type: Blobber::MSGPACK_TYPE, tick: 0) ⇒ MessageSequencer

Returns a new instance of MessageSequencer.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/funl/message-sequencer.rb', line 31

def initialize server, *conns, log: Logger.new($stderr),
    stream_type: ObjectStream::MSGPACK_TYPE,
    message_class: Message,
    blob_type: Blobber::MSGPACK_TYPE,
    tick: 0

  @server = server
  @log = log
  @stream_type = stream_type
  @message_class = message_class
  @blob_type = blob_type
  @greeting = default_greeting
  @tick = tick

  init_selector

  conns.each do |conn|
    try_conn conn
  end

  @subscribers_to_all = [] # [conn, ...]
  @subscribers = Hash.new {|h, tag| h[tag] = []} # tag => [conn, ...]
  @tags = Hash.new {|h, conn| h[conn] = []} # conn => [tag, ...]
end

Instance Attribute Details

#blob_typeObject (readonly)

Returns the value of attribute blob_type.



18
19
20
# File 'lib/funl/message-sequencer.rb', line 18

def blob_type
  @blob_type
end

#greetingObject (readonly)

Returns the value of attribute greeting.



19
20
21
# File 'lib/funl/message-sequencer.rb', line 19

def greeting
  @greeting
end

#logObject (readonly)

Returns the value of attribute log.



15
16
17
# File 'lib/funl/message-sequencer.rb', line 15

def log
  @log
end

#message_classObject (readonly)

Returns the value of attribute message_class.



17
18
19
# File 'lib/funl/message-sequencer.rb', line 17

def message_class
  @message_class
end

#serverObject (readonly)

Returns the value of attribute server.



12
13
14
# File 'lib/funl/message-sequencer.rb', line 12

def server
  @server
end

#server_threadObject (readonly)

Returns the value of attribute server_thread.



13
14
15
# File 'lib/funl/message-sequencer.rb', line 13

def server_thread
  @server_thread
end

#stream_typeObject (readonly)

Returns the value of attribute stream_type.



16
17
18
# File 'lib/funl/message-sequencer.rb', line 16

def stream_type
  @stream_type
end

#subscribersObject (readonly)

Returns the value of attribute subscribers.



20
21
22
# File 'lib/funl/message-sequencer.rb', line 20

def subscribers
  @subscribers
end

#tickObject (readonly)

Returns the value of attribute tick.



14
15
16
# File 'lib/funl/message-sequencer.rb', line 14

def tick
  @tick
end

Class Method Details

.new(*a) ⇒ Object



22
23
24
25
26
27
28
29
# File 'lib/funl/message-sequencer.rb', line 22

def self.new *a
  if self == MessageSequencer
    require 'funl/message-sequencer-select'
    MessageSequencerSelect.new(*a)
  else
    super
  end
end

Instance Method Details

#default_greetingObject



56
57
58
59
60
# File 'lib/funl/message-sequencer.rb', line 56

def default_greeting
  {
    "blob" => blob_type
  }.freeze # can't change after initial conns read it
end

#runObject



76
77
78
79
80
81
82
83
# File 'lib/funl/message-sequencer.rb', line 76

def run
  loop do
    select_streams
  end
rescue => ex
  log.error ex
  raise
end

#startObject



62
63
64
65
66
# File 'lib/funl/message-sequencer.rb', line 62

def start
  @server_thread = Thread.new do
    run
  end
end

#stopObject



68
69
70
# File 'lib/funl/message-sequencer.rb', line 68

def stop
  server_thread.kill if server_thread
end

#waitObject



72
73
74
# File 'lib/funl/message-sequencer.rb', line 72

def wait
  server_thread.join
end