Class: DRb::DRbArPg
- Inherits:
-
Object
- Object
- DRb::DRbArPg
- Defined in:
- lib/drb/ar_pg.rb
Overview
Implements DRb over an ActiveRecord connection to a PostgreSQL server
DRb PostgreSQL socket URIs look like drbarpg:?<option>
. The option is optional.
Constant Summary collapse
- DefaultConnectTimeout =
1.0
Instance Attribute Summary collapse
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Class Method Summary collapse
Instance Method Summary collapse
- #accept ⇒ Object
- #alive? ⇒ Boolean
- #close ⇒ Object
-
#initialize(uri, listen_channel, notify_connection_id, act_as, config = {}) ⇒ DRbArPg
constructor
A new instance of DRbArPg.
- #recv_reply ⇒ Object
- #recv_request ⇒ Object
- #send_reply(succ, result) ⇒ Object
- #send_request(ref, msg_id, *arg, &b) ⇒ Object
Constructor Details
#initialize(uri, listen_channel, notify_connection_id, act_as, config = {}) ⇒ DRbArPg
Returns a new instance of DRbArPg.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/drb/ar_pg.rb', line 38 def initialize(uri, listen_channel, notify_connection_id, act_as, config={}) @notify_connection_id = notify_connection_id @config = config @msg = DRbMessage.new(@config) @conn = Drbarpg::Message.connection_pool.checkout @pgconn = @conn.raw_connection @listen_connection_id = @conn.exec_query("SELECT nextval('drbarpg_connections_id_seq') AS seq").first['seq'] @listen_channel = if listen_channel.nil? || listen_channel.empty? @conn.quote_table_name("drbarpg__#{@listen_connection_id}") else @conn.quote_table_name("drbarpg_#{listen_channel}") end @conn.execute("LISTEN #{@listen_channel}"); case act_as when :server @conn.transaction do already_running = @conn.exec_query('SELECT id FROM drbarpg_connections WHERE listen_channel = $1::text FOR UPDATE', nil, [[nil, @listen_channel]]).first if already_running # There is already a service running with the same name, we would like to use. # Try to connect to it, to verify it is really yet running. begin self.class.new(nil, nil, listen_channel, :master, config) rescue DRbConnError # Unable to connect to the given server -> the entry # in drbarpg_connections seems to be orphanded. @conn.exec_delete('DELETE FROM drbarpg_connections WHERE id = $1::INT', nil, [[nil, already_running['id']]]) end end # Save the connection to the database to ensure that only one server can listen # on a given channel. @conn.exec_insert('INSERT INTO drbarpg_connections (id, listen_channel) VALUES ($1::int, $2::text)', nil, [[nil, @listen_connection_id], [nil, @listen_channel]]) end uri_channel, uri_option = self.class.parse_uri(uri) if uri uri = "drbarpg://_#{@listen_connection_id}?#{uri_option}" if uri_channel.nil? || uri_channel.empty? @uri = uri when :master # connect to server channel @conn.exec_update("NOTIFY #{@conn.quote_table_name("drbarpg_#{@notify_connection_id}")}, #{@conn.quote(@listen_connection_id)}"); # wait for acknowledgement with peer channel timeout = config[:connect_timeout] || DefaultConnectTimeout @pgconn.wait_for_notify(timeout) do |channel, pid, payload| @notify_connection_id = payload @notify_channel = @conn.quote_table_name("drbarpg__#{@notify_connection_id}") end raise(DRbConnError, "timeout while connecting to server") unless @notify_channel when :slave @notify_channel = @conn.quote_table_name("drbarpg__#{@notify_connection_id}") # acknowledge with peer channel @conn.exec_update("NOTIFY #{@notify_channel}, #{@conn.quote(@listen_connection_id)}"); end end |
Instance Attribute Details
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
12 13 14 |
# File 'lib/drb/ar_pg.rb', line 12 def uri @uri end |
Class Method Details
.open(uri, config) ⇒ Object
16 17 18 19 20 |
# File 'lib/drb/ar_pg.rb', line 16 def self.open(uri, config) server_channel, = parse_uri(uri) self.new(uri, nil, server_channel, :master, config) end |
.open_server(uri, config) ⇒ Object
22 23 24 25 |
# File 'lib/drb/ar_pg.rb', line 22 def self.open_server(uri, config) channel, = parse_uri(uri) self.new(uri, channel, nil, :server, config) end |
.uri_option(uri, config) ⇒ Object
33 34 35 36 |
# File 'lib/drb/ar_pg.rb', line 33 def self.uri_option(uri, config) channel, option = parse_uri(uri) return "drbarpg://#{channel}", option end |
Instance Method Details
#accept ⇒ Object
27 28 29 30 31 |
# File 'lib/drb/ar_pg.rb', line 27 def accept @pgconn.wait_for_notify do |channel, pid, payload| return self.class.new(nil, nil, payload, :slave, @config) end end |
#alive? ⇒ Boolean
112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/drb/ar_pg.rb', line 112 def alive? @pgconn.consume_input if (n=@pgconn.notifies) if n[:extra] == 'c' close else raise DRbConnError, "received unexpected notification" end end !!@conn end |
#close ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/drb/ar_pg.rb', line 100 def close if @conn if @notify_channel @conn.exec_update("NOTIFY #{@notify_channel}, 'c'"); end @conn.execute("UNLISTEN #{@listen_channel}"); @conn.exec_delete('DELETE FROM drbarpg_connections WHERE id=$1::int', nil, [[nil, @listen_connection_id]]) Drbarpg::Message.connection_pool.checkin(@conn) @conn = @pgconn = nil end end |
#recv_reply ⇒ Object
130 131 132 133 |
# File 'lib/drb/ar_pg.rb', line 130 def recv_reply stream = StringIO.new() return @msg.recv_reply(stream) end |
#recv_request ⇒ Object
135 136 137 138 139 140 141 142 143 |
# File 'lib/drb/ar_pg.rb', line 135 def recv_request begin stream = StringIO.new() return @msg.recv_request(stream) rescue close raise $! end end |
#send_reply(succ, result) ⇒ Object
145 146 147 148 149 150 151 152 153 154 |
# File 'lib/drb/ar_pg.rb', line 145 def send_reply(succ, result) begin stream = StringIO.new @msg.send_reply(stream, succ, result) (stream.string) rescue close raise $! end end |
#send_request(ref, msg_id, *arg, &b) ⇒ Object
124 125 126 127 128 |
# File 'lib/drb/ar_pg.rb', line 124 def send_request(ref, msg_id, *arg, &b) stream = StringIO.new @msg.send_request(stream, ref, msg_id, *arg, &b) (stream.string) end |