Class: Rserve::Connection

Inherits:
Engine
  • Object
show all
Includes:
Protocol
Defined in:
lib/rserve/connection.rb

Overview

class providing TCP/IP connection to an Rserve

Defined Under Namespace

Classes: EvalError

Constant Summary collapse

RserveNotStartedError =

:section: Errors

Class.new(StandardError)
ServerNotAvailableError =
Class.new(StandardError)
IncorrectServerError =
Class.new(StandardError)
IncorrectServerVersionError =
Class.new(StandardError)
IncorrectProtocolError =
Class.new(StandardError)
NotConnectedError =
Class.new(StandardError)
AT_plain =

authorization type: plain text

0
AT_crypt =

authorization type: unix crypt

1
@@connected_object =
nil

Constants included from Protocol

Protocol::CMD_RESP, Protocol::CMD_SPECIAL_MASK, Protocol::CMD_assignSEXP, Protocol::CMD_attachSession, Protocol::CMD_closeFile, Protocol::CMD_createFile, Protocol::CMD_ctrl, Protocol::CMD_ctrlEval, Protocol::CMD_ctrlShutdown, Protocol::CMD_ctrlSource, Protocol::CMD_detachSession, Protocol::CMD_detachedVoidEval, Protocol::CMD_eval, Protocol::CMD_login, Protocol::CMD_openFile, Protocol::CMD_readFile, Protocol::CMD_removeFile, Protocol::CMD_serAssign, Protocol::CMD_serEEval, Protocol::CMD_serEval, Protocol::CMD_setBufferSize, Protocol::CMD_setEncoding, Protocol::CMD_setSEXP, Protocol::CMD_shutdown, Protocol::CMD_voidEval, Protocol::CMD_writeFile, Protocol::DT_ARRAY, Protocol::DT_BYTESTREAM, Protocol::DT_CHAR, Protocol::DT_DOUBLE, Protocol::DT_INT, Protocol::DT_LARGE, Protocol::DT_SEXP, Protocol::DT_STRING, Protocol::ERROR_DESCRIPTIONS, Protocol::ERR_IOerror, Protocol::ERR_Rerror, Protocol::ERR_accessDenied, Protocol::ERR_auth_failed, Protocol::ERR_conn_broken, Protocol::ERR_ctrl_closed, Protocol::ERR_data_overflow, Protocol::ERR_detach_failed, Protocol::ERR_inv_cmd, Protocol::ERR_inv_par, Protocol::ERR_notOpen, Protocol::ERR_object_too_big, Protocol::ERR_out_of_mem, Protocol::ERR_session_busy, Protocol::ERR_unknownCmd, Protocol::ERR_unsupportedCmd, Protocol::MAX_LONG_SIGNED, Protocol::MAX_LONG_UNSIGNED, Protocol::RESP_ERR, Protocol::RESP_OK

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Protocol

#doubleToRawLongBits, #get_int, #get_int_original, #get_len, #get_long, #get_long_original, #longBitsToDouble, #longBitsToDouble_old, #new_hdr, #set_hdr, #set_int, #set_long

Methods inherited from Engine

#create_reference, #evaluate, #finalize_reference, #get, #get_parent_enviroment, #lock, #new_enviroment, #parse, #parse_and_eval, #resolve_reference, #supports_REPL?, #supports_enviroments?, #supports_references?, #suuports_locking?, #try_lock, #unlock

Constructor Details

#initialize(opts = Hash.new) ⇒ Connection

Make a new local connection You could provide a hash with options. Options are analog to java client:

:auth_req

If authentification is required (false by default)

:transfer_charset

Transfer charset (“UTF-8” by default)

:auth_type

Type of authentification (AT_plain by default)

:hostname

Hostname of Rserve (“127.0.0.1” by default)

:port_number

Port Number of Rserve (6311 by default)

:max_tries

Maximum number of tries before give up (5 by default)

:cmd_init

Command to init Rserve if not initialized (“R CMD Rserve” by default)

:proc_rserve_ok

Proc testing if Rserve works (uses system by default)



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/rserve/connection.rb', line 50

def initialize(opts=Hash.new)
  @auth_req         = opts.delete(:auth_req)          || false
  @transfer_charset = opts.delete(:transfer_charset)  || "UTF-8"
  @auth_type        = opts.delete(:auth_type)         || AT_plain
  @hostname         = opts.delete(:hostname)          || "127.0.0.1"
  @port_number      = opts.delete(:port_number)       || 6311
  @max_tries        = opts.delete(:max_tries)         || 5
  @cmd_init         = opts.delete(:cmd_init)          || "R CMD Rserve"
  @proc_rserve_ok   = opts.delete(:proc_rserve_ok)    || lambda { system "killall -s 0 Rserve" } 
  @session          = opts.delete(:session)           || nil
  @tries            = 0
  @connected=false
  if (!@session.nil?)
    @hostname=@session.host
    @port_number=@session.port
  end
  begin
    #puts "Tryin to connect..."
    connect
  rescue Errno::ECONNREFUSED
    if @tries<@max_tries
      @tries+=1
      # Rserve is available?
      if @proc_rserve_ok.call
        # Rserve is available. Incorrect host and/or portname
        raise ServerNotAvailableError, "Rserve started, but not available on #{hostname}:#{port_number}"
        # Rserve not available. We should instanciate it first
      else
        if run_server
          # Wait a moment, please
          sleep(0.25)
          retry
        else
          raise RserveNotStartedError, "Can't start Rserve"
        end
      end
      #puts "Init RServe"

    else
      raise
    end
  end
end

Instance Attribute Details

#auth_reqObject (readonly)

Returns the value of attribute auth_req.



26
27
28
# File 'lib/rserve/connection.rb', line 26

def auth_req
  @auth_req
end

#auth_typeObject (readonly)

Returns the value of attribute auth_type.



27
28
29
# File 'lib/rserve/connection.rb', line 27

def auth_type
  @auth_type
end

#connectedObject (readonly)

Returns the value of attribute connected.



25
26
27
# File 'lib/rserve/connection.rb', line 25

def connected
  @connected
end

#hostnameObject (readonly)

Returns the value of attribute hostname.



21
22
23
# File 'lib/rserve/connection.rb', line 21

def hostname
  @hostname
end

#keyObject (readonly)

Returns the value of attribute key.



28
29
30
# File 'lib/rserve/connection.rb', line 28

def key
  @key
end

#last_errorObject (readonly)

Returns the value of attribute last_error.



24
25
26
# File 'lib/rserve/connection.rb', line 24

def last_error
  @last_error
end

#persistent=(value) ⇒ Object (writeonly)

Sets the attribute persistent

Parameters:

  • value

    the value to set the attribute persistent to.



35
36
37
# File 'lib/rserve/connection.rb', line 35

def persistent=(value)
  @persistent = value
end

#portObject (readonly)

Returns the value of attribute port.



31
32
33
# File 'lib/rserve/connection.rb', line 31

def port
  @port
end

#port_numberObject (readonly)

Returns the value of attribute port_number.



22
23
24
# File 'lib/rserve/connection.rb', line 22

def port_number
  @port_number
end

#protocolObject (readonly)

Returns the value of attribute protocol.



23
24
25
# File 'lib/rserve/connection.rb', line 23

def protocol
  @protocol
end

#rsrv_versionObject (readonly)

Returns the value of attribute rsrv_version.



34
35
36
# File 'lib/rserve/connection.rb', line 34

def rsrv_version
  @rsrv_version
end

#rtObject (readonly)

Returns the value of attribute rt.



29
30
31
# File 'lib/rserve/connection.rb', line 29

def rt
  @rt
end

#sObject (readonly)

Returns the value of attribute s.



30
31
32
# File 'lib/rserve/connection.rb', line 30

def s
  @s
end

#sessionObject (readonly)

Returns the value of attribute session.



32
33
34
# File 'lib/rserve/connection.rb', line 32

def session
  @session
end

#transfer_charset=(value) ⇒ Object (writeonly)

Sets the attribute transfer_charset

Parameters:

  • value

    the value to set the attribute transfer_charset to.



33
34
35
# File 'lib/rserve/connection.rb', line 33

def transfer_charset=(value)
  @transfer_charset = value
end

Instance Method Details

#assign(sym, ct) ⇒ Object

assign a string value to a symbol in R. The symbol is created if it doesn’t exist already.

Parameters:

  • sym

    symbol name. Currently assign uses CMD_setSEXP command of Rserve, i.e. the symbol value is NOT parsed. It is the responsibility of the user to make sure that the symbol name is valid in R (recall the difference between a symbol and an expression!). In fact R will always create the symbol, but it may not be accessible (examples: “barnfoo” or “bar$foo”).

  • ct

    contents

Raises:



237
238
239
240
241
242
243
244
245
246
247
# File 'lib/rserve/connection.rb', line 237

def assign(sym, ct)
  raise NotConnectedError if !connected? or rt.nil?
  case ct
  when String
    assign_string(sym,ct)
  when REXP
    assign_rexp(sym,ct)
  else
    assign_rexp(sym, Rserve::REXP::Wrapper.wrap(ct))
  end
end

#assign_rexp(sym, rexp) ⇒ Object



278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/rserve/connection.rb', line 278

def assign_rexp(sym, rexp)
  r = REXPFactory.new(rexp);
  rl=r.get_binary_length();
  symn=sym.unpack("C*");
  sl=symn.length+1;
  sl=(sl&0xfffffc)+4 if ((sl&3)>0) # make sure the symbol length is divisible by 4
  rq=Array.new(sl+rl+((rl>0xfffff0) ? 12 : 8));
  symn.length.times {|i| rq[i+4]=symn[i]}
  ic=symn.length
  while(ic<sl)
    rq[ic+4]=0;
    ic+=1;
  end # pad with 0

  set_hdr(Rserve::Protocol::DT_STRING,sl,rq,0)
  set_hdr(Rserve::Protocol::DT_SEXP,rl,rq,sl+4);
  r.get_binary_representation(rq, sl+((rl>0xfffff0) ? 12 : 8));
  # puts "ASSIGN RQ: #{rq}" if $DEBUG
  rp=rt.request(:cmd=>Rserve::Protocol::CMD_setSEXP, :cont=>rq)
  if (!rp.nil? and rp.ok?)
    rp
  else
    raise "Assign Failed"
  end
end

#assign_string(sym, ct) ⇒ Object



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/rserve/connection.rb', line 249

def assign_string(sym,ct)
  symn = sym.unpack("C*")
  ctn  = ct.unpack("C*")
  sl=symn.length+1
  cl=ctn.length+1
  sl=(sl&0xfffffc)+4 if ((sl&3)>0)  # make sure the symbol length is divisible by 4
  cl=(cl&0xfffffc)+4 if ((cl&3)>0)  # make sure the content length is divisible by 4
  rq=Array.new(sl+4+cl+4)
  symn.length.times {|i| rq[i+4]=symn[i]}
  ic=symn.length
  while (ic<sl)
    rq[ic+4]=0
    ic+=1
  end
  ctn.length.times {|i| rq[i+sl+8]=ctn[i]}
  ic=ctn.length
  while (ic<cl)
    rq[ic+sl+8]=0
    ic+=1
  end
  set_hdr(Rserve::Protocol::DT_STRING,sl,rq,0)
  set_hdr(Rserve::Protocol::DT_STRING,cl,rq,sl+4)
  rp=rt.request(:cmd=>Rserve::Protocol::CMD_setSEXP,:cont=>rq)
  if (!rp.nil? and rp.ok?)
    rp
  else
    raise "Assign Failed"
  end
end

#closeObject

Closes current connection



147
148
149
150
151
152
153
154
155
156
# File 'lib/rserve/connection.rb', line 147

def close
  if !@s.nil? and !@s.closed?
    @s.close_write
    @s.close_read
  end
  raise "Can't close socket" unless @s.closed?
  @connected=false
  @@connected_object=nil
  true
end

#connectObject



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/rserve/connection.rb', line 95

def connect
  # On windows, Rserve doesn't allows concurrent connections. 
  # So, we must close the last open connection first
  if ON_WINDOWS and !@@connected_object.nil?
    @@connected_object.close
  end

  close if @connected
  
  @s = TCPSocket::new(@hostname, @port_number)
  @rt=Rserve::Talk.new(@s)
  if @session.nil?
    #puts "Connected"
    # Accept first input
    input=@s.recv(32).unpack("a4a4a4a4a4a4a4a4")      
    raise IncorrectServerError, "Handshake failed: Rsrv signature expected, but received [#{input[0]}]" unless input[0]=="Rsrv"
    @rsrv_version=input[1].to_i
    raise IncorrectServerVersionError, "Handshake failed: The server uses more recent protocol than this client." if @rsrv_version>103
    @protocol=input[2]
    raise IncorrectProtocolError, "Handshake failed: unsupported transfer protocol #{@protocol}, I talk only QAP1." if @protocol!="QAP1"
    (3..7).each do |i|
      attr=input[i]
      if (attr=="ARpt") 
        if (!auth_req) # this method is only fallback when no other was specified
          auth_req=true
          auth_type=AT_plain
        end
      end
      if (attr=="ARuc") 
        auth_req=true
        authType=AT_crypt
      end
      if (attr[0]=='K') 
        key=attr[1,3]
      end
      
    end
  else # we have a session to take care of
    @s.write(@session.key.pack("C*"))
    @rsrv_version=session.rsrv_version
  end
  @connected=true
  @@connected_object=self
  @last_error="OK"
end

#connected?Boolean

Check connection state. Note that currently this state is not checked on-the-spot, that is if connection went down by an outside event this is not reflected by the flag. return true if this connection is alive

Returns:

  • (Boolean)


142
143
144
# File 'lib/rserve/connection.rb', line 142

def connected?
  @connected
end

#detachObject

Detaches the session and closes the connection (requires Rserve 0.4+). The session can be only resumed by calling RSession.attach

Raises:



319
320
321
322
323
324
325
326
327
328
329
# File 'lib/rserve/connection.rb', line 319

def detach
  raise NotConnectedError if !connected? or rt.nil?
  rp=rt.request(:cmd=>Rserve::Protocol::CMD_detachSession)
  if !rp.nil? and rp.ok? 
    s=Rserve::Session.new(self,rp)
    close
    s
  else
    raise "Cannot detach"
  end
end

#eval(cmd) ⇒ Object

evaluates the given command and retrieves the result

  • @param cmd command/expression string

  • @return R-xpression or null if an error occured */

Raises:



197
198
199
200
201
202
203
204
205
# File 'lib/rserve/connection.rb', line 197

def eval(cmd)
  raise NotConnectedError if !connected? or rt.nil?
  rp=rt.request(:cmd=>Rserve::Protocol::CMD_eval, :cont=>cmd+"\n")
  if !rp.nil? and rp.ok?
    parse_eval_response(rp)
  else
    raise EvalError.new(rp), "eval failed: #{rp.to_s}"
  end
end

#get_server_versionObject

Get server version as reported during the handshake.



158
159
160
# File 'lib/rserve/connection.rb', line 158

def get_server_version
  @rsrv_version
end

#parse_eval_response(rp) ⇒ Object

NOT TESTED



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/rserve/connection.rb', line 208

def parse_eval_response(rp)
  rxo=0
  pc=rp.cont
  if (rsrv_version>100) # /* since 0101 eval responds correctly by using DT_SEXP type/len header which is 4 bytes long */
    rxo=4
    # we should check parameter type (should be DT_SEXP) and fail if it's not
    if pc.nil?
      raise "Error while processing eval output: SEXP (type #{Rserve::Protocol::DT_SEXP}) expected but nil returned"
    elsif (pc[0]!=Rserve::Protocol::DT_SEXP and pc[0]!=(Rserve::Protocol::DT_SEXP|Rserve::Protocol::DT_LARGE))
      raise "Error while processing eval output: SEXP (type #{Rserve::Protocol::DT_SEXP}) expected but found result type "+pc[0].to_s+"."
    end
    
    if (pc[0]==(Rserve::Protocol::DT_SEXP|Rserve::Protocol::DT_LARGE))
      rxo=8; # large data need skip of 8 bytes
    end
    # warning: we are not checking or using the length - we assume that only the one SEXP is returned. This is true for the current CMD_eval implementation, but may not be in the future. */
  end
  if pc.length>rxo
    rx=REXPFactory.new;
    rx.parse_REXP(pc, rxo);
    return rx.get_REXP();
  else
    return nil
  end
end

#shutdownObject

Shutdown remote Rserve. Note that some Rserves cannot be shut down from the client side

Raises:



307
308
309
310
311
312
313
314
315
# File 'lib/rserve/connection.rb', line 307

def shutdown
  raise NotConnectedError if !connected? or rt.nil?
  rp=rt.request(:cmd=>Rserve::Protocol::CMD_shutdown)
  if !rp.nil? and rp.ok? 
    true
  else
    raise "Shutdown failed"
  end
end

#void_eval(cmd) ⇒ Object

evaluates the given command, but does not fetch the result (useful for assignment operations)

  • @param cmd command/expression string */

Raises:



164
165
166
167
168
169
170
171
172
173
# File 'lib/rserve/connection.rb', line 164

def void_eval(cmd)
  raise NotConnectedError if !connected? or rt.nil?
  rp=rt.request(:cmd=>Rserve::Protocol::CMD_voidEval, :cont=>cmd+"\n")
  if !rp.nil? and rp.ok?
    true
  else
    raise EvalError.new(rp), "voidEval failed: #{rp.to_s}"
  end

end

#void_eval_detach(cmd) ⇒ Object

  • @param cmd command/expression string.

  • @return session object that can be use to attach back to the session once the command completed

Raises:



180
181
182
183
184
185
186
187
188
189
190
# File 'lib/rserve/connection.rb', line 180

def void_eval_detach(cmd)
  raise NotConnectedError if !connected? or rt.nil?
  rp=rt.request(:cmd=>Rserve::Protocol::CMD_detachedVoidEval,:cont=>cmd+"\n")
  if rp.nil? or !rp.ok?
    raise EvalError.new(rp), "detached void eval failed : #{rp.to_s}"
  else
    s=Rserve::Session.new(self,rp)
    close
    s
  end
end