Class: ZooKeeper::Session

Inherits:
Object
  • Object
show all
Includes:
Slf4r::Logger
Defined in:
lib/zkruby/session.rb

Overview

Represents an session that may span connections

Constant Summary collapse

DEFAULT_TIMEOUT =
4
DEFAULT_CONNECT_DELAY =
0.2
DEFAULT_PORT =
2181

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(binding, addresses, options = nil) ⇒ Session

Returns a new instance of Session.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/zkruby/session.rb', line 19

def initialize(binding,addresses,options=nil)

    @binding = binding

    @addresses = parse_addresses(addresses)
    parse_options(options)

    # These are the server states
    # :disconnected, :connected, :auth_failed, :expired
    @keeper_state = nil

    # Client state is
    # :ready, :closing, :closed
    @client_state = :ready

    @xid=0
    @pending_queue = []

    # Create the watch list
    # hash by watch type of hashes by path of set of watchers
    @watches = [ :children, :data, :exists ].inject({}) do |ws,wtype| 
        ws[wtype] = Hash.new() { |h,k| h[k] = Set.new() }
        ws
    end

    @watcher = nil

    @ping_logger = Slf4r::LoggerFacade.new("ZooKeeper::Session::Ping")

end

Instance Attribute Details

#connObject (readonly)

Returns the value of attribute conn.



16
17
18
# File 'lib/zkruby/session.rb', line 16

def conn
  @conn
end

#ping_intervalObject (readonly)

Returns the value of attribute ping_interval.



13
14
15
# File 'lib/zkruby/session.rb', line 13

def ping_interval
  @ping_interval
end

#ping_loggerObject (readonly)

Returns the value of attribute ping_logger.



14
15
16
# File 'lib/zkruby/session.rb', line 14

def ping_logger
  @ping_logger
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



15
16
17
# File 'lib/zkruby/session.rb', line 15

def timeout
  @timeout
end

#watcherObject

Returns the value of attribute watcher.



17
18
19
# File 'lib/zkruby/session.rb', line 17

def watcher
  @watcher
end

Instance Method Details

#chroot(path) ⇒ Object



50
51
52
# File 'lib/zkruby/session.rb', line 50

def chroot(path)
    return @chroot + path
end

#close(&callback) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/zkruby/session.rb', line 146

def close(&callback)
    case @client_state
    when :ready
        # we keep the requested block in a close packet
        @close_packet = ClosePacket.new(next_xid(),:close,-11,nil,nil,nil,nil,callback)
        close_packet = @close_packet 
        @client_state = :closing

        # If there are other requests in flight, then we wait for them to finish
        # before sending the close packet since it immediately causes the socket
        # to close.
        queue_close_packet_if_necessary()
        @close_packet
    when :closed, :closing
        raise ProtocolError, "Already closed"
    else
        raise ProtocolError, "Unexpected state #{@client_state}"
    end
end

#closed?Boolean

close won’t run your block if the connection is already closed, so this is how you can check

Returns:

  • (Boolean)


61
62
63
# File 'lib/zkruby/session.rb', line 61

def closed?
    @client_state == :closed
end

#connected?Boolean

Connection API - testing whether to send a ping

Returns:

  • (Boolean)


66
67
68
# File 'lib/zkruby/session.rb', line 66

def connected?()
    @keeper_state == :connected
end

#disconnectedObject

Connection API - called when the connection has dropped from either end



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/zkruby/session.rb', line 103

def disconnected()
    @conn = nil
    logger.info { "Disconnected id=#{@session_id}, keeper=:#{@keeper_state}, client=:#{@client_state}" }

    # We keep trying to reconnect until the session expiration time is reached
    @disconnect_time = Time.now if @keeper_state == :connected
    time_since_first_disconnect = (Time.now - @disconnect_time) 

    if @client_state == :closed || time_since_first_disconnect > timeout
        session_expired()
    else
        # if we are connected then everything in the pending queue has been sent so
        # we must clear
        # if not, then we'll keep them and hope the next reconnect works
        if @keeper_state == :connected
            clear_pending_queue(:disconnected)
            invoke_watch(@watcher,KeeperState::DISCONNECTED,nil,WatchEvent::NONE) if @watcher
        end
        @keeper_state = :disconnected
        reconnect()
    end
end

#pingObject

Connection API - called when no data has been received for #ping_interval



94
95
96
97
98
99
100
# File 'lib/zkruby/session.rb', line 94

def ping()
    if @keeper_state == :connected
        ping_logger.debug { "Ping send" }
        hdr = Proto::RequestHeader.new(:xid => -2, :_type => 11)
        conn.send_records(hdr)
    end
end

#prime_connection(conn) ⇒ Object

Connection API - Injects a new connection that is ready to receive records

Parameters:

  • conn

    that responds to #send_records(record…) and #disconnect()



72
73
74
75
76
77
# File 'lib/zkruby/session.rb', line 72

def prime_connection(conn)
    @conn = conn
    send_session_connect()
    send_auth_data()
    reset_watches()
end

#queue_request(request, op, opcode, response = nil, watch_type = nil, watcher = nil, ptype = Packet, &callback) ⇒ Object

Raises:

  • (Error.SESSION_EXPIRED)


135
136
137
138
139
140
141
142
143
144
# File 'lib/zkruby/session.rb', line 135

def queue_request(request,op,opcode,response=nil,watch_type=nil,watcher=nil,ptype=Packet,&callback)
    raise Error.SESSION_EXPIRED, "Session expired due to client state #{@client_state}"  unless @client_state == :ready
    watch_type, watcher = resolve_watcher(watch_type,watcher)

    xid = next_xid

    packet = ptype.new(xid,op,opcode,request,response,watch_type,watcher, callback)

    queue_packet(packet)
end

#receive_records(io) ⇒ Object

Connection API - called when data is available, reads and processes one packet/event

Parameters:

  • io (IO)


82
83
84
85
86
87
88
89
90
91
# File 'lib/zkruby/session.rb', line 82

def receive_records(io)
    case @keeper_state
    when :disconnected
        complete_connection(io)
    when :connected
        process_reply(io)
    else
        logger.warn { "Receive packet for closed session #{@keeper_state}" }
    end
end

#startObject

Start the session - called by the ProtocolBinding

Raises:



127
128
129
130
131
132
133
# File 'lib/zkruby/session.rb', line 127

def start()
    raise ProtocolError, "Already started!" unless @keeper_state.nil?
    @keeper_state = :disconnected
    @disconnect_time = Time.now
    logger.debug("Starting new zookeeper client session")
    reconnect()
end

#unchroot(path) ⇒ Object



54
55
56
57
# File 'lib/zkruby/session.rb', line 54

def unchroot(path)
    return path unless path
    path.slice(@chroot.length..-1)
end