Class: DRb::DRbArPg

Inherits:
Object
  • Object
show all
Defined in:
lib/drb/ar_pg.rb

Overview

Implements DRb over an ActiveRecord connection to a PostgreSQL server

DRb PostgreSQL socket URIs look like drbarpg:?<option>. The option is optional.

Constant Summary collapse

DefaultConnectTimeout =
1.0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, listen_channel, notify_connection_id, act_as, config = {}) ⇒ DRbArPg

Returns a new instance of DRbArPg.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/drb/ar_pg.rb', line 38

def initialize(uri, listen_channel, notify_connection_id, act_as, config={})
  @notify_connection_id = notify_connection_id
  @config = config
  @msg = DRbMessage.new(@config)
  @conn = Drbarpg::Message.connection_pool.checkout
  @pgconn = @conn.raw_connection

  @listen_connection_id = @conn.exec_query("SELECT nextval('drbarpg_connections_id_seq') AS seq").first['seq']
  @listen_channel = if listen_channel.nil? || listen_channel.empty?
    @conn.quote_table_name("drbarpg__#{@listen_connection_id}")
  else
    @conn.quote_table_name("drbarpg_#{listen_channel}")
  end

  @conn.execute("LISTEN #{@listen_channel}");

  case act_as
  when :server
    @conn.transaction do
      already_running = @conn.exec_query('SELECT id FROM drbarpg_connections WHERE listen_channel = $1::text FOR UPDATE', nil,
                        [[nil, @listen_channel]]).first

      if already_running
        # There is already a service running with the same name, we would like to use.
        # Try to connect to it, to verify it is really yet running.
        begin
          self.class.new(nil, nil, listen_channel, :master, config)
        rescue DRbConnError
          # Unable to connect to the given server -> the entry
          # in drbarpg_connections seems to be orphanded.
          @conn.exec_delete('DELETE FROM drbarpg_connections WHERE id = $1::INT', nil,
                            [[nil, already_running['id']]])
        end
      end
      # Save the connection to the database to ensure that only one server can listen
      # on a given channel.
      @conn.exec_insert('INSERT INTO drbarpg_connections (id, listen_channel)
                        VALUES ($1::int, $2::text)', nil,
                        [[nil, @listen_connection_id], [nil, @listen_channel]])
    end

    uri_channel, uri_option = self.class.parse_uri(uri) if uri
    uri = "drbarpg://_#{@listen_connection_id}?#{uri_option}" if uri_channel.nil? || uri_channel.empty?
    @uri = uri
  when :master
    # connect to server channel
    @conn.exec_update("NOTIFY #{@conn.quote_table_name("drbarpg_#{@notify_connection_id}")}, #{@conn.quote(@listen_connection_id)}");

    # wait for acknowledgement with peer channel
    timeout = config[:connect_timeout] || DefaultConnectTimeout
    @pgconn.wait_for_notify(timeout) do |channel, pid, payload|
      @notify_connection_id = payload
      @notify_channel = @conn.quote_table_name("drbarpg__#{@notify_connection_id}")
    end
    raise(DRbConnError, "timeout while connecting to server") unless @notify_channel
  when :slave
    @notify_channel = @conn.quote_table_name("drbarpg__#{@notify_connection_id}")
    # acknowledge with peer channel
    @conn.exec_update("NOTIFY #{@notify_channel}, #{@conn.quote(@listen_connection_id)}");
  end
end

Instance Attribute Details

#uriObject (readonly)

Returns the value of attribute uri.



12
13
14
# File 'lib/drb/ar_pg.rb', line 12

def uri
  @uri
end

Class Method Details

.open(uri, config) ⇒ Object



16
17
18
19
20
# File 'lib/drb/ar_pg.rb', line 16

def self.open(uri, config)
  server_channel, = parse_uri(uri)

  self.new(uri, nil, server_channel, :master, config)
end

.open_server(uri, config) ⇒ Object



22
23
24
25
# File 'lib/drb/ar_pg.rb', line 22

def self.open_server(uri, config)
  channel, = parse_uri(uri)
  self.new(uri, channel, nil, :server, config)
end

.uri_option(uri, config) ⇒ Object



33
34
35
36
# File 'lib/drb/ar_pg.rb', line 33

def self.uri_option(uri, config)
  channel, option = parse_uri(uri)
  return "drbarpg://#{channel}", option
end

Instance Method Details

#acceptObject



27
28
29
30
31
# File 'lib/drb/ar_pg.rb', line 27

def accept
  @pgconn.wait_for_notify do |channel, pid, payload|
    return self.class.new(nil, nil, payload, :slave, @config)
  end
end

#alive?Boolean

Returns:

  • (Boolean)


112
113
114
115
116
117
118
119
120
121
122
# File 'lib/drb/ar_pg.rb', line 112

def alive?
  @pgconn.consume_input
  if (n=@pgconn.notifies)
    if n[:extra] == 'c'
      close
    else
      raise DRbConnError, "received unexpected notification"
    end
  end
  !!@conn
end

#closeObject



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/drb/ar_pg.rb', line 100

def close
  if @conn
    if @notify_channel
      @conn.exec_update("NOTIFY #{@notify_channel}, 'c'");
    end
    @conn.execute("UNLISTEN #{@listen_channel}");
    @conn.exec_delete('DELETE FROM drbarpg_connections WHERE id=$1::int', nil, [[nil, @listen_connection_id]])
    Drbarpg::Message.connection_pool.checkin(@conn)
    @conn = @pgconn = nil
  end
end

#recv_replyObject



130
131
132
133
# File 'lib/drb/ar_pg.rb', line 130

def recv_reply
  stream = StringIO.new(wait_for_message)
  return @msg.recv_reply(stream)
end

#recv_requestObject



135
136
137
138
139
140
141
142
143
# File 'lib/drb/ar_pg.rb', line 135

def recv_request
  begin
    stream = StringIO.new(wait_for_message)
    return @msg.recv_request(stream)
  rescue
    close
    raise $!
  end
end

#send_reply(succ, result) ⇒ Object



145
146
147
148
149
150
151
152
153
154
# File 'lib/drb/ar_pg.rb', line 145

def send_reply(succ, result)
  begin
    stream = StringIO.new
    @msg.send_reply(stream, succ, result)
    send_message(stream.string)
  rescue
    close
    raise $!
  end
end

#send_request(ref, msg_id, *arg, &b) ⇒ Object



124
125
126
127
128
# File 'lib/drb/ar_pg.rb', line 124

def send_request(ref, msg_id, *arg, &b)
  stream = StringIO.new
  @msg.send_request(stream, ref, msg_id, *arg, &b)
  send_message(stream.string)
end