Class: DRb::DRbArPg
- Inherits:
-
Object
- Object
- DRb::DRbArPg
- Defined in:
- lib/drb/ar_pg.rb
Overview
Implements DRb over an ActiveRecord connection to a PostgreSQL server
DRb PostgreSQL socket URIs look like drbarpg:?<option>. The option is optional.
Instance Attribute Summary collapse
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Class Method Summary collapse
Instance Method Summary collapse
- #accept ⇒ Object
- #alive? ⇒ Boolean
- #close ⇒ Object
-
#initialize(uri, listen_channel, notify_connection_id, act_as, config = {}) ⇒ DRbArPg
constructor
A new instance of DRbArPg.
- #recv_reply ⇒ Object
- #recv_request ⇒ Object
- #send_reply(succ, result) ⇒ Object
- #send_request(ref, msg_id, *arg, &b) ⇒ Object
Constructor Details
#initialize(uri, listen_channel, notify_connection_id, act_as, config = {}) ⇒ DRbArPg
Returns a new instance of DRbArPg.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/drb/ar_pg.rb', line 36 def initialize(uri, listen_channel, notify_connection_id, act_as, config={}) @notify_connection_id = notify_connection_id @config = config @msg = DRbMessage.new(@config) @conn = ::Message.connection_pool.checkout @pgconn = @conn.raw_connection @listen_connection_id = @conn.exec_query("SELECT nextval('drbarpg_connections_id_seq') AS seq").first['seq'] @listen_channel = if listen_channel.nil? || listen_channel.empty? @conn.quote_table_name("drbarpg__#{@listen_connection_id}") else @conn.quote_table_name("drbarpg_#{listen_channel}") end @conn.execute("LISTEN #{@listen_channel}"); case act_as when :server # Save the connection to the database to ensure that only one server can listen # on a given channel. @conn.exec_insert('INSERT INTO drbarpg_connections (id, listen_channel) VALUES ($1::int, $2::text)', nil, [[nil, @listen_connection_id], [nil, @listen_channel]]) uri_channel, uri_option = self.class.parse_uri(uri) if uri uri = "drbarpg://_#{@listen_connection_id}?#{uri_option}" if uri_channel.nil? || uri_channel.empty? @uri = uri when :master # connect to server channel @conn.exec_update("NOTIFY #{@conn.quote_table_name("drbarpg_#{@notify_connection_id}")}, #{@conn.quote(@listen_connection_id)}"); # wait for acknowledgement with peer channel @pgconn.wait_for_notify do |channel, pid, payload| @notify_connection_id = payload @notify_channel = @conn.quote_table_name("drbarpg__#{@notify_connection_id}") end when :slave @notify_channel = @conn.quote_table_name("drbarpg__#{@notify_connection_id}") # acknowledge with peer channel @conn.exec_update("NOTIFY #{@notify_channel}, #{@conn.quote(@listen_connection_id)}"); end end |
Instance Attribute Details
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
12 13 14 |
# File 'lib/drb/ar_pg.rb', line 12 def uri @uri end |
Class Method Details
.open(uri, config) ⇒ Object
14 15 16 17 18 |
# File 'lib/drb/ar_pg.rb', line 14 def self.open(uri, config) server_channel, = parse_uri(uri) self.new(uri, nil, server_channel, :master, config) end |
.open_server(uri, config) ⇒ Object
20 21 22 23 |
# File 'lib/drb/ar_pg.rb', line 20 def self.open_server(uri, config) channel, = parse_uri(uri) self.new(uri, channel, nil, :server, config) end |
.uri_option(uri, config) ⇒ Object
31 32 33 34 |
# File 'lib/drb/ar_pg.rb', line 31 def self.uri_option(uri, config) channel, option = parse_uri(uri) return "drbarpg://#{channel}", option end |
Instance Method Details
#accept ⇒ Object
25 26 27 28 29 |
# File 'lib/drb/ar_pg.rb', line 25 def accept @pgconn.wait_for_notify do |channel, pid, payload| return self.class.new(nil, nil, payload, :slave, @config) end end |
#alive? ⇒ Boolean
90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/drb/ar_pg.rb', line 90 def alive? @pgconn.consume_input if (n=@pgconn.notifies) if n[:extra] == 'c' close else raise DRbConnError, "received unexpected notification" end end !!@conn end |
#close ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/drb/ar_pg.rb', line 78 def close if @conn if @notify_channel @conn.exec_update("NOTIFY #{@notify_channel}, 'c'"); end @conn.execute("UNLISTEN #{@listen_channel}"); @conn.exec_delete('DELETE FROM drbarpg_connections WHERE id=$1::int', nil, [[nil, @listen_connection_id]]) ::Message.connection_pool.checkin(@conn) @conn = @pgconn = nil end end |
#recv_reply ⇒ Object
108 109 110 111 |
# File 'lib/drb/ar_pg.rb', line 108 def recv_reply stream = StringIO.new() return @msg.recv_reply(stream) end |
#recv_request ⇒ Object
113 114 115 116 117 118 119 120 121 |
# File 'lib/drb/ar_pg.rb', line 113 def recv_request begin stream = StringIO.new() return @msg.recv_request(stream) rescue close raise $! end end |
#send_reply(succ, result) ⇒ Object
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/drb/ar_pg.rb', line 123 def send_reply(succ, result) begin stream = StringIO.new @msg.send_reply(stream, succ, result) (stream.string) rescue close raise $! end end |
#send_request(ref, msg_id, *arg, &b) ⇒ Object
102 103 104 105 106 |
# File 'lib/drb/ar_pg.rb', line 102 def send_request(ref, msg_id, *arg, &b) stream = StringIO.new @msg.send_request(stream, ref, msg_id, *arg, &b) (stream.string) end |