Class: MQTT::Proxy

Inherits:
Object
  • Object
show all
Defined in:
lib/mqtt/proxy.rb

Overview

Class for implementing a proxy to filter/mangle MQTT packets.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Proxy

Create a new MQTT Proxy instance.

Possible argument keys:

:local_host      Address to bind listening socket to.
:local_port      Port to bind listening socket to.
:server_host     Address of upstream server to send packets upstream to.
:server_port     Port of upstream server to send packets upstream to.
:select_timeout  Time in seconds before disconnecting a connection.
:logger          Ruby Logger object to send informational messages to.

NOTE: be careful not to connect to yourself!



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/mqtt/proxy.rb', line 42

def initialize(args = {})
  @local_host = args[:local_host] || '0.0.0.0'
  @local_port = args[:local_port] || MQTT::DEFAULT_PORT
  @server_host = args[:server_host]
  @server_port = args[:server_port] || 18_830
  @select_timeout = args[:select_timeout] || 60

  # Setup a logger
  @logger = args[:logger]
  if @logger.nil?
    @logger = Logger.new($stdout)
    @logger.level = Logger::INFO
  end

  # Default is not to have any filters
  @client_filter = nil
  @server_filter = nil

  # Create TCP server socket
  @server = TCPServer.open(@local_host, @local_port)
  @logger.info "MQTT::Proxy listening on #{@local_host}:#{@local_port}"
end

Instance Attribute Details

#client_filter=(value) ⇒ Object (writeonly)

A filter Proc for packets coming from the client (to the server).



25
26
27
# File 'lib/mqtt/proxy.rb', line 25

def client_filter=(value)
  @client_filter = value
end

#local_hostObject (readonly)

Address to bind listening socket to



7
8
9
# File 'lib/mqtt/proxy.rb', line 7

def local_host
  @local_host
end

#local_portObject (readonly)

Port to bind listening socket to



10
11
12
# File 'lib/mqtt/proxy.rb', line 10

def local_port
  @local_port
end

#loggerObject (readonly)

Ruby Logger object to send informational messages to



22
23
24
# File 'lib/mqtt/proxy.rb', line 22

def logger
  @logger
end

#select_timeoutObject (readonly)

Time in seconds before disconnecting an idle connection



19
20
21
# File 'lib/mqtt/proxy.rb', line 19

def select_timeout
  @select_timeout
end

#server_filter=(value) ⇒ Object (writeonly)

A filter Proc for packets coming from the server (to the client).



28
29
30
# File 'lib/mqtt/proxy.rb', line 28

def server_filter=(value)
  @server_filter = value
end

#server_hostObject (readonly)

Address of upstream server to send packets upstream to



13
14
15
# File 'lib/mqtt/proxy.rb', line 13

def server_host
  @server_host
end

#server_portObject (readonly)

Port of upstream server to send packets upstream to.



16
17
18
# File 'lib/mqtt/proxy.rb', line 16

def server_port
  @server_port
end

Instance Method Details

#runObject

Start accepting connections and processing packets.



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/mqtt/proxy.rb', line 66

def run
  loop do
    # Wait for a client to connect and then create a thread for it
    Thread.new(@server.accept) do |client_socket|
      logger.info "Accepted client: #{client_socket.peeraddr.join(':')}"
      server_socket = TCPSocket.new(@server_host, @server_port)
      begin
        process_packets(client_socket, server_socket)
      rescue => e
        logger.error e.to_s
      end
      logger.info "Disconnected: #{client_socket.peeraddr.join(':')}"
      server_socket.close
      client_socket.close
    end
  end
end