Class: Funl::MessageSequencer

Inherits:
Object
  • Object
show all
Includes:
Stream
Defined in:
lib/funl/message-sequencer.rb

Overview

Assigns a unique sequential ids to each message and relays it to its destinations.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Stream

#client_stream_for, #message_server_stream_for, #server_stream_for

Constructor Details

#initialize(server, *conns, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, blob_type: Blobber::MSGPACK_TYPE, tick: 0) ⇒ MessageSequencer

Returns a new instance of MessageSequencer.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/funl/message-sequencer.rb', line 22

def initialize server, *conns, log: Logger.new($stderr),
    stream_type: ObjectStream::MSGPACK_TYPE,
    message_class: Message,
    blob_type: Blobber::MSGPACK_TYPE,
    tick: 0

  @server = server
  @log = log
  @stream_type = stream_type
  @message_class = message_class
  @blob_type = blob_type
  @greeting = default_greeting
  @tick = tick

  @streams = []
  conns.each do |conn|
    try_conn conn
  end
end

Instance Attribute Details

#blob_typeObject (readonly)

Returns the value of attribute blob_type.



19
20
21
# File 'lib/funl/message-sequencer.rb', line 19

def blob_type
  @blob_type
end

#greetingObject (readonly)

Returns the value of attribute greeting.



20
21
22
# File 'lib/funl/message-sequencer.rb', line 20

def greeting
  @greeting
end

#logObject (readonly)

Returns the value of attribute log.



16
17
18
# File 'lib/funl/message-sequencer.rb', line 16

def log
  @log
end

#message_classObject (readonly)

Returns the value of attribute message_class.



18
19
20
# File 'lib/funl/message-sequencer.rb', line 18

def message_class
  @message_class
end

#serverObject (readonly)

Returns the value of attribute server.



12
13
14
# File 'lib/funl/message-sequencer.rb', line 12

def server
  @server
end

#server_threadObject (readonly)

Returns the value of attribute server_thread.



13
14
15
# File 'lib/funl/message-sequencer.rb', line 13

def server_thread
  @server_thread
end

#stream_typeObject (readonly)

Returns the value of attribute stream_type.



17
18
19
# File 'lib/funl/message-sequencer.rb', line 17

def stream_type
  @stream_type
end

#streamsObject (readonly)

Returns the value of attribute streams.



14
15
16
# File 'lib/funl/message-sequencer.rb', line 14

def streams
  @streams
end

#tickObject (readonly)

Returns the value of attribute tick.



15
16
17
# File 'lib/funl/message-sequencer.rb', line 15

def tick
  @tick
end

Instance Method Details

#default_greetingObject



42
43
44
45
46
# File 'lib/funl/message-sequencer.rb', line 42

def default_greeting
  {
    "blob" => blob_type
  }.freeze # can't change after initial conns read it
end

#runObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/funl/message-sequencer.rb', line 72

def run
  loop do
    readables, _ = select [server, *streams]

    readables.each do |readable|
      case readable
      when server
        begin
          conn, addr = readable.accept_nonblock
          log.debug "accepted #{conn.inspect} from #{addr.inspect}"
          try_conn conn
        rescue IO::WaitReadable
          next
        end

      else
        log.debug {"readable = #{readable}"}
        begin
          msgs = []
          readable.read do |msg|
            msgs << msg
          end
        rescue IOError, SystemCallError => ex
          log.debug {"closing #{readable}: #{ex}"}
          @streams.delete readable
          readable.close unless readable.closed?
        else
          log.debug {
            "read #{msgs.size} messages from #{readable.peer_name}"}
        end

        msgs.each do |msg|
          handle_message msg
        end
      end
    end
  end
rescue => ex
  log.error ex
  raise
end

#startObject



58
59
60
61
62
# File 'lib/funl/message-sequencer.rb', line 58

def start
  @server_thread = Thread.new do
    run
  end
end

#stopObject



64
65
66
# File 'lib/funl/message-sequencer.rb', line 64

def stop
  server_thread.kill if server_thread
end

#waitObject



68
69
70
# File 'lib/funl/message-sequencer.rb', line 68

def wait
  server_thread.join
end