Class: DRb::DRbArPg

Inherits:
Object
  • Object
show all
Defined in:
lib/drb/ar_pg.rb

Overview

Implements DRb over an ActiveRecord connection to a PostgreSQL server

DRb PostgreSQL socket URIs look like drbarpg:?<option>. The option is optional.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, listen_channel, notify_connection_id, act_as, config = {}) ⇒ DRbArPg

Returns a new instance of DRbArPg.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/drb/ar_pg.rb', line 36

def initialize(uri, listen_channel, notify_connection_id, act_as, config={})
  @notify_connection_id = notify_connection_id
  @config = config
  @msg = DRbMessage.new(@config)
  @conn = Drbarpg::Message.connection_pool.checkout
  @pgconn = @conn.raw_connection

  @listen_connection_id = @conn.exec_query("SELECT nextval('drbarpg_connections_id_seq') AS seq").first['seq']
  @listen_channel = if listen_channel.nil? || listen_channel.empty?
    @conn.quote_table_name("drbarpg__#{@listen_connection_id}")
  else
    @conn.quote_table_name("drbarpg_#{listen_channel}")
  end

  @conn.execute("LISTEN #{@listen_channel}");

  case act_as
  when :server
    # Save the connection to the database to ensure that only one server can listen
    # on a given channel.
    @conn.exec_insert('INSERT INTO drbarpg_connections (id, listen_channel)
                      VALUES ($1::int, $2::text)', nil,
                      [[nil, @listen_connection_id], [nil, @listen_channel]])

    uri_channel, uri_option = self.class.parse_uri(uri) if uri
    uri = "drbarpg://_#{@listen_connection_id}?#{uri_option}" if uri_channel.nil? || uri_channel.empty?
    @uri = uri
  when :master
    # connect to server channel
    @conn.exec_update("NOTIFY #{@conn.quote_table_name("drbarpg_#{@notify_connection_id}")}, #{@conn.quote(@listen_connection_id)}");
    # wait for acknowledgement with peer channel
    @pgconn.wait_for_notify do |channel, pid, payload|
      @notify_connection_id = payload
      @notify_channel = @conn.quote_table_name("drbarpg__#{@notify_connection_id}")
    end
  when :slave
    @notify_channel = @conn.quote_table_name("drbarpg__#{@notify_connection_id}")
    # acknowledge with peer channel
    @conn.exec_update("NOTIFY #{@notify_channel}, #{@conn.quote(@listen_connection_id)}");
  end
end

Instance Attribute Details

#uriObject (readonly)

Returns the value of attribute uri.



12
13
14
# File 'lib/drb/ar_pg.rb', line 12

def uri
  @uri
end

Class Method Details

.open(uri, config) ⇒ Object



14
15
16
17
18
# File 'lib/drb/ar_pg.rb', line 14

def self.open(uri, config)
  server_channel, = parse_uri(uri)

  self.new(uri, nil, server_channel, :master, config)
end

.open_server(uri, config) ⇒ Object



20
21
22
23
# File 'lib/drb/ar_pg.rb', line 20

def self.open_server(uri, config)
  channel, = parse_uri(uri)
  self.new(uri, channel, nil, :server, config)
end

.uri_option(uri, config) ⇒ Object



31
32
33
34
# File 'lib/drb/ar_pg.rb', line 31

def self.uri_option(uri, config)
  channel, option = parse_uri(uri)
  return "drbarpg://#{channel}", option
end

Instance Method Details

#acceptObject



25
26
27
28
29
# File 'lib/drb/ar_pg.rb', line 25

def accept
  @pgconn.wait_for_notify do |channel, pid, payload|
    return self.class.new(nil, nil, payload, :slave, @config)
  end
end

#alive?Boolean

Returns:

  • (Boolean)


90
91
92
93
94
95
96
97
98
99
100
# File 'lib/drb/ar_pg.rb', line 90

def alive?
  @pgconn.consume_input
  if (n=@pgconn.notifies)
    if n[:extra] == 'c'
      close
    else
      raise DRbConnError, "received unexpected notification"
    end
  end
  !!@conn
end

#closeObject



78
79
80
81
82
83
84
85
86
87
88
# File 'lib/drb/ar_pg.rb', line 78

def close
  if @conn
    if @notify_channel
      @conn.exec_update("NOTIFY #{@notify_channel}, 'c'");
    end
    @conn.execute("UNLISTEN #{@listen_channel}");
    @conn.exec_delete('DELETE FROM drbarpg_connections WHERE id=$1::int', nil, [[nil, @listen_connection_id]])
    Drbarpg::Message.connection_pool.checkin(@conn)
    @conn = @pgconn = nil
  end
end

#recv_replyObject



108
109
110
111
# File 'lib/drb/ar_pg.rb', line 108

def recv_reply
  stream = StringIO.new(wait_for_message)
  return @msg.recv_reply(stream)
end

#recv_requestObject



113
114
115
116
117
118
119
120
121
# File 'lib/drb/ar_pg.rb', line 113

def recv_request
  begin
    stream = StringIO.new(wait_for_message)
    return @msg.recv_request(stream)
  rescue
    close
    raise $!
  end
end

#send_reply(succ, result) ⇒ Object



123
124
125
126
127
128
129
130
131
132
# File 'lib/drb/ar_pg.rb', line 123

def send_reply(succ, result)
  begin
    stream = StringIO.new
    @msg.send_reply(stream, succ, result)
    send_message(stream.string)
  rescue
    close
    raise $!
  end
end

#send_request(ref, msg_id, *arg, &b) ⇒ Object



102
103
104
105
106
# File 'lib/drb/ar_pg.rb', line 102

def send_request(ref, msg_id, *arg, &b)
  stream = StringIO.new
  @msg.send_request(stream, ref, msg_id, *arg, &b)
  send_message(stream.string)
end