Class: WSDirector::Protocols::Base
- Inherits:
-
Object
- Object
- WSDirector::Protocols::Base
- Includes:
- Utils
- Defined in:
- lib/wsdirector/protocols/base.rb
Overview
Base protocol describes basic actions
Direct Known Subclasses
Constant Summary
Constants included from Utils
Instance Method Summary collapse
-
#debug(step) ⇒ Object
Prints provided message.
- #handle_step(step) ⇒ Object
- #init_client(**options) ⇒ Object
-
#initialize(task) ⇒ Base
constructor
A new instance of Base.
- #receive(step) ⇒ Object
-
#receive_all(step) ⇒ Object
rubocop: disable Metrics/CyclomaticComplexity.
-
#send(step) ⇒ Object
rubocop: enable Metrics/CyclomaticComplexity.
-
#sleep(step) ⇒ Object
Sleeps for a specified number of seconds.
- #to_proc ⇒ Object
- #wait_all(_step) ⇒ Object
Methods included from Utils
Constructor Details
#initialize(task) ⇒ Base
Returns a new instance of Base.
11 12 13 |
# File 'lib/wsdirector/protocols/base.rb', line 11 def initialize(task) @task = task end |
Instance Method Details
#debug(step) ⇒ Object
Prints provided message
46 47 48 |
# File 'lib/wsdirector/protocols/base.rb', line 46 def debug(step) print(step.fetch("message")) end |
#handle_step(step) ⇒ Object
19 20 21 22 23 24 25 26 |
# File 'lib/wsdirector/protocols/base.rb', line 19 def handle_step(step) type = step.delete("type") raise Error, "Unknown step: #{type}" unless respond_to?(type) return unless task.sampled?(step) public_send(type, step) end |
#init_client(**options) ⇒ Object
15 16 17 |
# File 'lib/wsdirector/protocols/base.rb', line 15 def init_client(**) @client = build_client(**) end |
#receive(step) ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/wsdirector/protocols/base.rb', line 50 def receive(step) expected = step.fetch("data") received = client.receive raise UnmatchedExpectationError, prepare_receive_error(expected, received) unless receive_matches?(expected, received) rescue ThreadError raise NoMessageError, "Expected to receive #{expected} but nothing has been received" end |
#receive_all(step) ⇒ Object
rubocop: disable Metrics/CyclomaticComplexity
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/wsdirector/protocols/base.rb', line 60 def receive_all(step) = step.delete("messages") raise ArgumentError, "Messages array must be specified" if .nil? || .empty? expected = Hash[.map do |msg| multiplier = parse_multiplier(msg.delete("multiplier") || "1") [msg["data"], multiplier] end] total_expected = expected.values.sum total_received = 0 total_expected.times do received = client.receive total_received += 1 match = expected.find { |k, _| receive_matches?(k, received) } raise UnexpectedMessageError, "Unexpected message received: #{received}" if match.nil? expected[match.first] -= 1 expected.delete(match.first) if expected[match.first].zero? end rescue ThreadError raise NoMessageError, "Expected to receive #{total_expected} messages " \ "but received only #{total_received}" end |
#send(step) ⇒ Object
rubocop: enable Metrics/CyclomaticComplexity
94 95 96 97 98 |
# File 'lib/wsdirector/protocols/base.rb', line 94 def send(step) data = step.fetch("data") data = JSON.generate(data) if data.is_a?(Hash) client.send(data) end |
#sleep(step) ⇒ Object
Sleeps for a specified number of seconds.
If “shift” is provided than the initial value is shifted by random number from (-shift, shift).
Set “debug” to true to print the delay time.
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/wsdirector/protocols/base.rb', line 34 def sleep(step) delay = step.fetch("time").to_f shift = step.fetch("shift", 0).to_f delay = delay - shift * rand + shift * rand print("Sleep for #{delay}s") if step.fetch("debug", false) Kernel.sleep delay if delay > 0 end |
#to_proc ⇒ Object
104 105 106 |
# File 'lib/wsdirector/protocols/base.rb', line 104 def to_proc proc { |step| handle_step(step) } end |
#wait_all(_step) ⇒ Object
100 101 102 |
# File 'lib/wsdirector/protocols/base.rb', line 100 def wait_all(_step) task.global_holder.wait_all end |