Class: Telegraph::Wire

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/telegraph/wire.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

#debug, logger

Constructor Details

#initialize(stream) ⇒ Wire



17
18
19
20
# File 'lib/telegraph/wire.rb', line 17

def initialize(stream)
  @sequence = AckSequence.new
  @stream = stream
end

Instance Attribute Details

#streamObject (readonly)

Returns the value of attribute stream.



5
6
7
# File 'lib/telegraph/wire.rb', line 5

def stream
  @stream
end

Class Method Details

.connect(host, port) ⇒ Object



7
8
9
10
11
12
13
14
15
# File 'lib/telegraph/wire.rb', line 7

def self.connect(host, port)
  wire = new TCPSocket.new(host, port)
  return wire unless block_given?
  begin
    yield wire
  ensure
    wire.close
  end
end

Instance Method Details

#closeObject



22
23
24
25
26
27
28
29
# File 'lib/telegraph/wire.rb', line 22

def close
  if @stream.closed?
    debug { "stream already closed" }
  else
    debug { "closing stream #{@stream.inspect}" }
    @stream.close
  end
end

#closed?Boolean



31
32
33
# File 'lib/telegraph/wire.rb', line 31

def closed?
  @stream.closed?
end

#next_message(options = {:timeout => 0}) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/telegraph/wire.rb', line 51

def next_message(options = {:timeout => 0})
  begin
    raise NoMessageAvailable unless IO.select [@stream], nil, nil, options[:timeout]
    message = Message.read(@stream)
    unacked_sequences_numbers.delete message.sequence_ack if message.sequence_ack
    return message
  rescue IOError, Errno::ECONNRESET => e
    raise LineDead, e.message
  end
rescue LineDead
  close rescue nil
  raise
end

#process_messages(options = {:timeout => 0}) ⇒ Object



45
46
47
48
49
# File 'lib/telegraph/wire.rb', line 45

def process_messages(options = {:timeout => 0})
  yield next_message(options) while true
rescue NoMessageAvailable
  retry
end

#send_message(body, options = {}) ⇒ Object



35
36
37
38
39
40
41
42
43
# File 'lib/telegraph/wire.rb', line 35

def send_message(body, options = {})
  sequence_ack = options[:ack] ? options[:ack].sequence_number : nil
  message = Message.new(body, @sequence.next, sequence_ack)
  message.write stream
  unacked_sequences_numbers[message.sequence_number] = message if options[:need_ack]
rescue IOError, Errno::EPIPE, Errno::ECONNRESET => e
  close rescue nil
  raise LineDead, e.message
end

#unacked_messagesObject



69
70
71
# File 'lib/telegraph/wire.rb', line 69

def unacked_messages
  unacked_sequences_numbers.values
end

#unacked_sequences_numbersObject



65
66
67
# File 'lib/telegraph/wire.rb', line 65

def unacked_sequences_numbers
  @unacked_sequences_numbers ||= {}
end