Class: Cod::TcpClient

Inherits:
Channel show all
Defined in:
lib/cod/tcp_client.rb

Overview

Acts as a channel that connects to a tcp listening socket on the other end.

Connection negotiation has three phases, as follows: 1) Connection is establishing. Sent messages are buffered and really sent

down the wire once the connection stands. Reading from the channel
will block the client forever.

2) Connection is established: Sending and receiving are immediate and

no buffering is done.

3) Connection is down because of an interruption or exception. Sending and

receiving messages no longer works, instead a ConnectionLost error is
raised.

Defined Under Namespace

Classes: Connection, OtherEnd, RobustConnection

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Channel

#interact

Constructor Details

#initialize(destination, serializer) ⇒ TcpClient

Constructs a tcp client channel. destination may either be a socket, in which case phase 1) of connection negotiation is skipped, or a string that contains an ‘address:port’ part.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/cod/tcp_client.rb', line 24

def initialize(destination, serializer)
  @serializer = serializer
  @destination = destination

  # TODO remove this soon
  fail "Deprecated API" if destination.kind_of?(TCPSocket)
  
  # TcpClient handles two cases: Construction via an url (destination is a
  # string) and construction via a connection that has been
  # preestablished (destination is a socket):
  if destination.respond_to?(:established?)
    # Should be a connection already.
    @connection = destination
  else
    @connection = RobustConnection.new(destination)
  end

  @work_queue = WorkQueue.new

  # The predicate for allowing sends: Is the connection up?
  @work_queue.predicate {
    cached_connection_established?
  }
end

Class Method Details

._load(params) ⇒ Object

:nodoc:



128
129
130
131
132
133
# File 'lib/cod/tcp_client.rb', line 128

def self._load(params) # :nodoc:
  # Instead of a tcp client (no way to construct one at this point), we'll
  # insert a kind of marker in the object stream that will be replaced 
  # with a valid client later on. (hopefully)
  OtherEnd.new(params)
end

Instance Method Details

#_dump(level) ⇒ Object

:nodoc:



125
126
127
# File 'lib/cod/tcp_client.rb', line 125

def _dump(level) # :nodoc:
  @destination
end

#clientObject



104
105
106
107
108
109
110
111
112
113
# File 'lib/cod/tcp_client.rb', line 104

def client
  # NOTE: Normally, it doesn't make sense to ask the client channel for
  # something for a service connection, since the service needs to know
  # where to send requests in addition to knowing where to receive
  # answers. In the case of sockets, this is different: The service will
  # send its answers back the same way it got the requests from, so this
  # is really ok:
  #
  Service::Client.new(self, self)
end

#closeObject

Closes all underlying connections. You should only call this if you don’t want to use the channel again, since it will also stop reconnection attempts.



53
54
55
56
# File 'lib/cod/tcp_client.rb', line 53

def close
  @work_queue.shutdown if @work_queue
  @connection.close
end

#get(opts = {}) ⇒ Object

Receives a message. opts may contain various options, see below. Options include:



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/cod/tcp_client.rb', line 86

def get(opts={})
  while @work_queue.size > 0
    @work_queue.try_work
  end
            
  check_connection_state

  @connection.read(@serializer)
rescue Errno::ECONNRESET, EOFError
  # Connection reset by peer
  raise ConnectionLost
end

#put(obj) ⇒ Object

Sends an object to the other end of the channel, if it is connected. If it is not connected, objects sent will queue up and once the internal storage reaches the high watermark, they will be dropped silently.

Example:

channel.put :object
# Really, any Ruby object that the current serializer can turn into 
# a string!


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/cod/tcp_client.rb', line 67

def put(obj)
  # TODO high watermark check
  # NOTE: predicate will call #try_connect
  @work_queue.schedule {
    send(obj)
  }
  
  @work_queue.exclusive {
    # If we're connected, shut down the worker thread, do all the work
    # here.
    check_connection_state
  }
  
  @work_queue.try_work
end

#serviceObject

——————————————————— service/client



101
102
103
# File 'lib/cod/tcp_client.rb', line 101

def service
  fail "A tcp client cannot be a service."
end

#to_read_fdsObject



137
138
139
# File 'lib/cod/tcp_client.rb', line 137

def to_read_fds
  @connection.socket if @connection
end