Class: Que::Connection

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/que/connection.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ Connection

Returns a new instance of Connection.



44
45
46
47
# File 'lib/que/connection.rb', line 44

def initialize(connection)
  @wrapped_connection = connection
  @prepared_statements = Set.new
end

Instance Attribute Details

#wrapped_connectionObject (readonly)

Returns the value of attribute wrapped_connection.



23
24
25
# File 'lib/que/connection.rb', line 23

def wrapped_connection
  @wrapped_connection
end

Class Method Details

.wrap(conn) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/que/connection.rb', line 28

def wrap(conn)
  case conn
  when self
    conn
  when PG::Connection
    if conn.instance_variable_defined?(:@que_wrapper)
      conn.instance_variable_get(:@que_wrapper)
    else
      conn.instance_variable_set(:@que_wrapper, new(conn))
    end
  else
    raise Error, "Unsupported input for Connection.wrap: #{conn.class}"
  end
end

Instance Method Details

#drain_notificationsObject



118
119
120
# File 'lib/que/connection.rb', line 118

def drain_notifications
  loop { break if next_notification.nil? }
end

#execute(command, params = []) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/que/connection.rb', line 49

def execute(command, params = [])
  sql =
    case command
    when Symbol then SQL[command]
    when String then command
    else raise Error, "Bad command! #{command.inspect}"
    end

  params = convert_params(params)

  result =
    Que.run_sql_middleware(sql, params) do
      # Some versions of the PG gem dislike an empty/nil params argument.
      if params.empty?
        wrapped_connection.async_exec(sql)
      else
        wrapped_connection.async_exec_params(sql, params)
      end
    end

  Que.internal_log :connection_execute, self do
    {
      backend_pid: backend_pid,
      command:     command,
      params:      params,
      ntuples:     result.ntuples,
    }
  end

  convert_result(result)
end

#execute_prepared(command, params = nil) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/que/connection.rb', line 81

def execute_prepared(command, params = nil)
  Que.assert(Symbol, command)

  if !Que.use_prepared_statements || in_transaction?
    return execute(command, params)
  end

  name = "que_#{command}"

  begin
    unless @prepared_statements.include?(command)
      wrapped_connection.prepare(name, SQL[command])
      @prepared_statements.add(command)
      prepared_just_now = true
    end

    convert_result(
      wrapped_connection.exec_prepared(name, params)
    )
  rescue ::PG::InvalidSqlStatementName => error
    # Reconnections on ActiveRecord can cause the same connection
    # objects to refer to new backends, so recover as well as we can.

    unless prepared_just_now
      Que.log level: :warn, event: :reprepare_statement, command: command
      @prepared_statements.delete(command)
      retry
    end

    raise error
  end
end

#in_transaction?Boolean

Returns:

  • (Boolean)


126
127
128
# File 'lib/que/connection.rb', line 126

def in_transaction?
  wrapped_connection.transaction_status != ::PG::PQTRANS_IDLE
end

#next_notificationObject



114
115
116
# File 'lib/que/connection.rb', line 114

def next_notification
  wrapped_connection.notifies
end

#server_versionObject



122
123
124
# File 'lib/que/connection.rb', line 122

def server_version
  wrapped_connection.server_version
end