Class: Distributor::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/distributor/client.rb

Instance Method Summary collapse

Constructor Details

#initialize(input, output = input) ⇒ Client

Returns a new instance of Client.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/distributor/client.rb', line 9

def initialize(input, output=input)
  @connector   = Distributor::Connector.new
  @multiplexer = Distributor::Multiplexer.new(output)
  @handlers    = {}
  @processes   = []
  @on_close    = Hash.new { |hash,key| hash[key] = Array.new }
  @on_hello    = []
  @on_command  = Proc.new {}
  @hookup_lock = Mutex.new

  # reserve a command channel
  @multiplexer.reserve(0)

  # feed data from the input channel into the multiplexer
  @connector.handle(input) do |io|
    @multiplexer.input io
  end

  @connector.on_close(input) do |io|
    exit 0
  end

  # handle the command channel of the multiplexer
  @connector.handle(@multiplexer.reader(0)) do |io|
    append_json(io.readpartial(4096))

    dequeue_json do |data|
      case command = data["command"]
      when "hello" then
        @on_hello.each { |c| c.call }
      when "close" then
        ch = data["ch"]
        @on_close[ch].each { |c| c.call(ch) }
      when "ack" then
        ch = data["ch"]
        @multiplexer.reserve ch
        @handlers[data["id"]].call(ch)
        @handlers.delete(data["id"])
        @processes << ch
      else
        @on_command.call(command, data)
      end
    end
  end
end

Instance Method Details

#command(command, data = {}) ⇒ Object



59
60
61
62
63
64
# File 'lib/distributor/client.rb', line 59

def command(command, data={})
  data["id"] ||= @multiplexer.generate_id
  data["command"] = command
  @multiplexer.output 0, Distributor::OkJson.encode(data)
  data["id"]
end

#hookup(ch, input, output = input) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/distributor/client.rb', line 76

def hookup(ch, input, output=input)
  @hookup_lock.synchronize do
    # handle data incoming on the multiplexer
    @connector.handle(@multiplexer.reader(ch)) do |io|
      begin
        data = io.readpartial(4096)
        # output.write "#{ch}: #{data}"
        output.write data
        output.flush
      rescue EOFError
        command "close", "ch" => ch
      end
    end
  end

  # handle data incoming from the input channel
  @connector.handle(input) do |io|
    begin
      data = io.readpartial(4096)
      @multiplexer.output ch, data
    rescue EOFError
      @on_close[ch].each { |c| c.call(ch) }
      @connector.close(io)
    end
  end
end

#on_close(ch, &blk) ⇒ Object



103
104
105
# File 'lib/distributor/client.rb', line 103

def on_close(ch, &blk)
  @on_close[ch] << blk
end

#on_command(&blk) ⇒ Object



111
112
113
# File 'lib/distributor/client.rb', line 111

def on_command(&blk)
  @on_command = blk
end

#on_hello(&blk) ⇒ Object



107
108
109
# File 'lib/distributor/client.rb', line 107

def on_hello(&blk)
  @on_hello << blk
end

#output(ch, data) ⇒ Object



55
56
57
# File 'lib/distributor/client.rb', line 55

def output(ch, data)
  @multiplexer.output ch, data
end

#run(command, &handler) ⇒ Object



66
67
68
69
# File 'lib/distributor/client.rb', line 66

def run(command, &handler)
  id = command("run", "args" => command)
  @handlers[id] = handler
end

#startObject



115
116
117
# File 'lib/distributor/client.rb', line 115

def start
  loop { @connector.listen }
end

#tunnel(port, &handler) ⇒ Object



71
72
73
74
# File 'lib/distributor/client.rb', line 71

def tunnel(port, &handler)
  id = command("tunnel", "port" => port)
  @handlers[id] = handler
end