Class: NeverBlock::DB::FiberedPostgresConnection

Inherits:
PGconn
  • Object
show all
Defined in:
lib/never_block/db/fibered_postgres_connection.rb

Overview

A modified postgres connection driver builds on the original pg driver. This driver is able to register the socket at a certain backend (EM or Rev) and then whenever the query is executed within the scope of a friendly fiber it will be done in async mode and the fiber will yield

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ FiberedPostgresConnection

Creates a new postgresql connection, sets it to nonblocking and wraps the descriptor in an IO object.



21
22
23
24
25
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 21

def initialize(*args)
  super(*args)
  init_descriptor
  #setnonblocking(true)
end

Instance Attribute Details

#fdObject (readonly)

needed to access the sockect by the event loop



16
17
18
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 16

def fd
  @fd
end

#ioObject (readonly)

needed to access the sockect by the event loop



16
17
18
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 16

def io
  @io
end

Instance Method Details

#exec(sql) ⇒ Object

Assuming the use of NeverBlock fiber extensions and that the exec is run in the context of a fiber. One that have the value :neverblock set to true. All neverblock IO classes check this value, setting it to false will force the execution in a blocking way.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 35

def exec(sql)
  begin
    if Fiber.respond_to? :current and Fiber.current[:neverblock]          
      send_query sql
      @fiber = Fiber.current          
      Fiber.yield 
      while is_busy
        consume_input
        Fiber.yield if is_busy
      end
      res, data = 0, []
      while res != nil
        res = self.get_result
        data << res unless res.nil?
      end
      data.last          
    else          
      super(sql)
    end
  rescue Exception => e
    reset if e.msg.include? "not connected"
    raise e
  end   
end

#init_descriptorObject



27
28
29
30
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 27

def init_descriptor
  @fd = socket
  @io = IO.new(socket)
end

#register_with_event_loop(loop) ⇒ Object

Attaches the connection socket to an event loop. Currently only supports EM, but Rev support will be completed soon.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 73

def register_with_event_loop(loop)
  if loop == :em
    unless EM.respond_to?(:attach)
      puts "invalide EM version, please download the modified gem from: (http://github.com/riham/eventmachine)"
      exit
    end
    if EM.reactor_running?
       @em_connection = EM::attach(@io,EMConnectionHandler,self)
    else
      raise "REACTOR NOT RUNNING YA ZALAMA"
    end 
  elsif loop.class.name == "REV::Loop"
    loop.attach(RevConnectionHandler.new(socket))
  else
    raise "could not register with the event loop"
  end
  @loop = loop
end

#resetObject

reset the connection and reattach to the event loop



63
64
65
66
67
68
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 63

def reset
  unregister_from_event_loop
  super
  init_descriptor
  register_with_event_loop(@loop)    
end

#resume_commandObject

The callback, this is called whenever there is data available at the socket



104
105
106
107
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 104

def resume_command
  #let the fiber continue its work          
  @fiber.resume
end

#unregister_from_event_loopObject

Unattaches the connection socket from the event loop As with register, EM is the only one supported for now



94
95
96
97
98
99
100
# File 'lib/never_block/db/fibered_postgres_connection.rb', line 94

def unregister_from_event_loop
  if @loop == :em
    @em_connection.unattach(false)
  else
    raise NotImplementedError.new("unregister_from_event_loop not implemented for #{@loop}")
  end
end