Class: Telegraph::Wire
- Inherits:
-
Object
- Object
- Telegraph::Wire
- Includes:
- Logging
- Defined in:
- lib/telegraph/wire.rb
Instance Attribute Summary collapse
-
#stream ⇒ Object
readonly
Returns the value of attribute stream.
Class Method Summary collapse
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
-
#initialize(stream) ⇒ Wire
constructor
A new instance of Wire.
- #next_message(options = {:timeout => 0}) ⇒ Object
- #process_messages(options = {:timeout => 0}) ⇒ Object
- #send_message(body, options = {}) ⇒ Object
- #unacked_messages ⇒ Object
- #unacked_sequences_numbers ⇒ Object
Methods included from Logging
Constructor Details
#initialize(stream) ⇒ Wire
17 18 19 20 |
# File 'lib/telegraph/wire.rb', line 17 def initialize(stream) @sequence = AckSequence.new @stream = stream end |
Instance Attribute Details
#stream ⇒ Object (readonly)
Returns the value of attribute stream.
5 6 7 |
# File 'lib/telegraph/wire.rb', line 5 def stream @stream end |
Class Method Details
.connect(host, port) ⇒ Object
7 8 9 10 11 12 13 14 15 |
# File 'lib/telegraph/wire.rb', line 7 def self.connect(host, port) wire = new TCPSocket.new(host, port) return wire unless block_given? begin yield wire ensure wire.close end end |
Instance Method Details
#close ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/telegraph/wire.rb', line 22 def close if @stream.closed? debug { "stream already closed" } else debug { "closing stream #{@stream.inspect}" } @stream.close end end |
#closed? ⇒ Boolean
31 32 33 |
# File 'lib/telegraph/wire.rb', line 31 def closed? @stream.closed? end |
#next_message(options = {:timeout => 0}) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/telegraph/wire.rb', line 51 def ( = {:timeout => 0}) begin raise NoMessageAvailable unless IO.select [@stream], nil, nil, [:timeout] = Message.read(@stream) unacked_sequences_numbers.delete .sequence_ack if .sequence_ack return rescue IOError, Errno::ECONNRESET => e raise LineDead, e. end rescue LineDead close rescue nil raise end |
#process_messages(options = {:timeout => 0}) ⇒ Object
45 46 47 48 49 |
# File 'lib/telegraph/wire.rb', line 45 def ( = {:timeout => 0}) yield () while true rescue NoMessageAvailable retry end |
#send_message(body, options = {}) ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/telegraph/wire.rb', line 35 def (body, = {}) sequence_ack = [:ack] ? [:ack].sequence_number : nil = Message.new(body, @sequence.next, sequence_ack) .write stream unacked_sequences_numbers[.sequence_number] = if [:need_ack] rescue IOError, Errno::EPIPE, Errno::ECONNRESET => e close rescue nil raise LineDead, e. end |
#unacked_messages ⇒ Object
69 70 71 |
# File 'lib/telegraph/wire.rb', line 69 def unacked_sequences_numbers.values end |
#unacked_sequences_numbers ⇒ Object
65 66 67 |
# File 'lib/telegraph/wire.rb', line 65 def unacked_sequences_numbers @unacked_sequences_numbers ||= {} end |