Module: Serfx::Commands

Included in:
Connection
Defined in:
lib/serfx/commands.rb

Overview

Implements all of Serf’s rpc commands using Serfx::Connection#request method

Instance Method Summary collapse

Instance Method Details

#authResponse

authenticate against the serf agent. if RPC credentials are setup, then ‘auth` has to be second command, immediately after `handshake`.

Returns:



24
25
26
27
# File 'lib/serfx/commands.rb', line 24

def auth
  tcp_send(:auth, 'AuthKey' => @authkey)
  read_response(:auth)
end

#event(name, payload = nil, coalesce = true) ⇒ Response

fires an user event

same name during similar time frame

Parameters:

  • name (String)

    a string representing name of the event

  • payload (String) (defaults to: nil)

    payload, default is nil

  • coalesce (Boolena) (defaults to: true)

    whether serf should coalesce events within

Returns:



35
36
37
38
39
40
41
42
# File 'lib/serfx/commands.rb', line 35

def event(name, payload = nil, coalesce = true)
  event = {
    'Name' => name,
    'Coalesce' => coalesce
  }
  event['Payload'] = payload unless payload.nil?
  request(:event, event)
end

#force_leave(node) ⇒ Response

force a failed node to leave the cluster

Parameters:

  • node (String)

    name of the failed node

Returns:



48
49
50
# File 'lib/serfx/commands.rb', line 48

def force_leave(node)
  request(:force_leave, 'Node' => node)
end

#get_coordinateResponse

obtain network coordinate of a node

Parameters:

  • node (String)

    Name of the node

Returns:



217
218
219
# File 'lib/serfx/commands.rb', line 217

def get_coordinate
  request(:get_coordinate, 'Node' => node)
end

#handshakeResponse

performs initial hanshake of an RPC session. Handshake has to be the first command to be invoked during an RPC session.

Returns:



15
16
17
18
# File 'lib/serfx/commands.rb', line 15

def handshake
  tcp_send(:handshake, 'Version' => 1)
  read_response(:handshake)
end

#install_key(key) ⇒ Response

install a new encryption key onto the cluster’s keyring

Parameters:

  • key (String)

    16 bytes of base64-encoded data.

Returns:



179
180
181
# File 'lib/serfx/commands.rb', line 179

def install_key(key)
  request(:install_key, 'Key' => key)
end

#join(existing, replay = false) ⇒ Response

join an existing cluster.

Parameters:

  • existing (Array)

    an array of existing serf agents

  • replay (Boolean) (defaults to: false)

    Whether events should be replayed upon joining

Returns:



57
58
59
# File 'lib/serfx/commands.rb', line 57

def join(existing, replay = false)
  request(:join, 'Existing' => existing, 'Replay' => replay)
end

#leaveResponse

leave is used trigger a graceful leave and shutdown of the current agent

Returns:



135
136
137
# File 'lib/serfx/commands.rb', line 135

def leave
  request(:leave)
end

#list_keysResponse

return a list of all encryption keys currently in use on the cluster

Returns:



202
203
204
# File 'lib/serfx/commands.rb', line 202

def list_keys
  request(:list_keys)
end

#membersResponse

obtain the list of existing members

Returns:



64
65
66
# File 'lib/serfx/commands.rb', line 64

def members
  request(:members)
end

#members_filtered(tags, status = 'alive', name = nil) ⇒ Response

obatin the list of cluster members, filtered by tags.

Parameters:

  • tags (Array)

    an array of tags for filter

  • status (Boolean) (defaults to: 'alive')

    filter members based on their satatus

  • name (String) (defaults to: nil)

    filter based on exact name or pattern.

Returns:



75
76
77
78
79
80
81
82
# File 'lib/serfx/commands.rb', line 75

def members_filtered(tags, status = 'alive', name = nil)
  filter = {
    'Tags' => tags,
    'Status' => status
  }
  filter['Name'] = name unless name.nil?
  request(:members_filtered, filter)
end

#monitor(loglevel = 'debug') ⇒ Response

monitor is similar to the stream command, but instead of events it subscribes the channel to log messages from the agent

Parameters:

  • loglevel (String) (defaults to: 'debug')

Returns:



123
124
125
# File 'lib/serfx/commands.rb', line 123

def monitor(loglevel = 'debug')
  request(:monitor, 'LogLevel' => loglevel.upcase)
end

#query(name, payload, opts = {}, &block) ⇒ Response

query is used to issue a new query

Parameters:

  • name (String)

    name of the query

  • payload (String)

    payload for this query event

  • opts (Hash) (defaults to: {})

    additional query options

Returns:



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/serfx/commands.rb', line 145

def query(name, payload, opts = {}, &block)
  params = { 'Name' => name, 'Payload' => payload }
  params.merge!(opts)
  res = request(:query, params)
  loop do
    header = read_data
    check_rpc_error!(header)
    ev = read_data

    # Ignore responses with an empty From field
    break if ev['From'].nil? || ev['From'].empty?

    if ev['Type'] == 'done'
      break
    else
      block.call(ev) if block
    end
  end
  res
end

#remove_key(key) ⇒ Response

remove a key from the cluster’s keyring

Parameters:

  • key (String)

    16 bytes of base64-encoded data.

Returns:



195
196
197
# File 'lib/serfx/commands.rb', line 195

def remove_key(key)
  request(:remove_key, 'Key' => key)
end

#respond(id, payload) ⇒ Response

respond is used with ‘stream` to subscribe to queries and then respond.

Parameters:

  • id (Integer)

    an opaque value that is assigned by the IPC layer

  • payload (String)

    payload for the response event

Returns:



171
172
173
# File 'lib/serfx/commands.rb', line 171

def respond(id, payload)
  request(:respond, 'ID' => id, 'Payload' => payload)
end

#statsResponse

obtain stats about the agent(same as info command)

Returns:



209
210
211
# File 'lib/serfx/commands.rb', line 209

def stats
  request(:stats)
end

#stop(sequence_number) ⇒ Object

stop is used to stop either a stream or monitor



128
129
130
# File 'lib/serfx/commands.rb', line 128

def stop(sequence_number)
  tcp_send(:stop, 'Stop' => sequence_number)
end

#stream(types, &block) ⇒ Thread, Response

subscribe to a stream of all events matching a given type filter. Events will continue to be sent until the stream is stopped

Parameters:

  • types (String)

    comma separated list of events

Returns:



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/serfx/commands.rb', line 101

def stream(types, &block)
  res = request(:stream, 'Type' => types)
  t = Thread.new do
    loop do
      header = read_data
      check_rpc_error!(header)
      if header['Seq'] == res.header.seq
        ev = read_data
        block.call(ev) if block
      else
        break
      end
    end
  end
  [res, t]
end

#tags(tags, delete_tags = []) ⇒ Response

alter the tags on a Serf agent while it is running. A member-update event will be triggered immediately to notify the other agents in the cluster of the change. The tags command can add new tags, modify existing tags, or delete tags

Parameters:

  • tags (Hash)

    a hash representing tags as key-value pairs

  • delete_tags (Array) (defaults to: [])

    an array of tags to be deleted

Returns:



92
93
94
# File 'lib/serfx/commands.rb', line 92

def tags(tags, delete_tags = [])
  request(:tags, 'Tags' => tags, 'DeleteTags' => delete_tags)
end

#use_key(key) ⇒ Response

change the primary key, which is used to encrypt messages

Parameters:

  • key (String)

    16 bytes of base64-encoded data.

Returns:



187
188
189
# File 'lib/serfx/commands.rb', line 187

def use_key(key)
  request(:use_key, 'Key' => key)
end