Class: Pollen::FiberBody

Inherits:
Object
  • Object
show all
Defined in:
lib/pollen/fiber_body.rb

Instance Method Summary collapse

Constructor Details

#initializeFiberBody

Returns a new instance of FiberBody.



5
6
7
# File 'lib/pollen/fiber_body.rb', line 5

def initialize
  @latest_heartbeat = Time.now.to_i
end

Instance Method Details

#completed?(event) ⇒ Boolean

Returns:

  • (Boolean)


37
38
39
# File 'lib/pollen/fiber_body.rb', line 37

def completed?(event)
  %w[completed failed].include?(event)
end

#execute!(connection) ⇒ Object

TODO: Assign a Fiber scheduler to handle blocking kernel calls



10
11
12
13
14
15
16
17
18
# File 'lib/pollen/fiber_body.rb', line 10

def execute!(connection)
  init!(connection)
  pusher.push(connection.payload, event: connection.event)
  loop! if pending?(connection.event)
  pusher.push(nil, event: 'terminated')
  pusher.close
rescue IOError
  # The Fiber should die when it can't write to socket
end

#heartbeat!Object



45
46
47
48
49
50
# File 'lib/pollen/fiber_body.rb', line 45

def heartbeat!
  return unless Time.now.to_i > @latest_heartbeat + Pollen.server.configuration.heartbeat

  pusher.comment
  @latest_heartbeat = Time.now.to_i
end

#init!(connection) ⇒ Object



20
21
22
23
# File 'lib/pollen/fiber_body.rb', line 20

def init!(connection)
  @connection = connection
  pusher.write_headers
end

#loop!Object



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/pollen/fiber_body.rb', line 25

def loop!
  while Time.now.to_i < @connection.terminate_at
    heartbeat!
    event = Fiber.yield
    next unless event

    pusher.push(event[:data], event: event[:event])
    @latest_heartbeat = Time.now.to_i
    break if completed?(event[:event])
  end
end

#pending?(event) ⇒ Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/pollen/fiber_body.rb', line 41

def pending?(event)
  !completed?(event)
end

#pusherObject



52
53
54
# File 'lib/pollen/fiber_body.rb', line 52

def pusher
  @pusher ||= Pusher.new(@connection.socket)
end