Class: Listen::Adapter::TCP

Inherits:
Base
  • Object
show all
Includes:
Celluloid::IO
Defined in:
lib/listen/adapter/tcp.rb

Overview

Adapter to receive file system modifications over TCP

Constant Summary collapse

OS_REGEXP =

match any

//
DEFAULTS =
{
  host: 'localhost',
  port: '4000'
}
RECEIVE_WINDOW =

Number of bytes to receive at a time

1024

Instance Attribute Summary collapse

Attributes inherited from Base

#options

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Base

#configure, #initialize, usable?

Constructor Details

This class inherits a constructor from Listen::Adapter::Base

Instance Attribute Details

#bufferObject (readonly)

Returns the value of attribute buffer.



19
20
21
# File 'lib/listen/adapter/tcp.rb', line 19

def buffer
  @buffer
end

#socketObject (readonly)

Returns the value of attribute socket.



19
20
21
# File 'lib/listen/adapter/tcp.rb', line 19

def socket
  @socket
end

Class Method Details

.local_fs?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/listen/adapter/tcp.rb', line 83

def self.local_fs?
  false
end

Instance Method Details

#finalizeObject

Cleans up buffer and socket



45
46
47
48
49
50
51
# File 'lib/listen/adapter/tcp.rb', line 45

def finalize
  @buffer = nil
  return unless @socket

  @socket.close
  @socket = nil
end

#handle_data(data) ⇒ Object

Buffers incoming data and handles messages accordingly



64
65
66
67
68
69
70
71
72
73
# File 'lib/listen/adapter/tcp.rb', line 64

def handle_data(data)
  @buffer << data
  while (message = Listen::TCP::Message.from_buffer(@buffer))
    handle_message(message)
  end
rescue
  _log :error, format('TCP.handle_data crashed: %s:%s', $ERROR_INFO,
                      $ERROR_POSITION * "\n")
  raise
end

#handle_message(message) ⇒ Object

Handles incoming message by notifying of path changes



76
77
78
79
80
81
# File 'lib/listen/adapter/tcp.rb', line 76

def handle_message(message)
  type, change, dir, path, _ = message.object
  _log(:debug) { "TCP message: #{[type, change, dir, path].inspect}" }

  _queue_change(type.to_sym, Pathname(dir), path, change: change.to_sym)
end

#runObject

Continuously receive and asynchronously handle data



57
58
59
60
61
# File 'lib/listen/adapter/tcp.rb', line 57

def run
  while (data = @socket.recv(RECEIVE_WINDOW))
    async.handle_data(data)
  end
end

#startObject

Initializes and starts a Celluloid::IO-powered TCP-recipient



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/listen/adapter/tcp.rb', line 22

def start
  attempts ||= 3
  _log :info, "TCP: opening socket #{options.host}:#{options.port}"
  @socket = TCPSocket.new(options.host, options.port)
  @buffer = ''
  async.run
rescue Celluloid::Task::TerminatedError
  _log :debug, "TCP adapter was terminated: #{$ERROR_INFO.inspect}"
rescue Errno::ECONNREFUSED
  sleep 1
  attempts -= 1
  _log :warn, "TCP.start: #{$ERROR_INFO.inspect}"
  retry if attempts > 0
  _log :error, format('TCP.start: %s:%s', $ERROR_INFO.inspect,
                      $ERROR_POSITION * "\n")
  raise
rescue
  _log :error, format('TCP.start: %s:%s', $ERROR_INFO.inspect,
                      $ERROR_POSITION * "\n")
  raise
end