Class: WSDirector::Protocols::Base
- Inherits:
-
Object
- Object
- WSDirector::Protocols::Base
- Includes:
- Utils
- Defined in:
- lib/wsdirector/protocols/base.rb
Overview
Base protocol describes basic actions
Direct Known Subclasses
Constant Summary
Constants included from Utils
Instance Method Summary collapse
-
#debug(step) ⇒ Object
Prints provided message.
- #handle_step(step) ⇒ Object
- #init_client(**options) ⇒ Object
-
#initialize(task) ⇒ Base
constructor
A new instance of Base.
- #receive(step) ⇒ Object
-
#receive_all(step) ⇒ Object
rubocop: disable Metrics/CyclomaticComplexity.
-
#send(step) ⇒ Object
rubocop: enable Metrics/CyclomaticComplexity.
-
#sleep(step) ⇒ Object
Sleeps for a specified number of seconds.
- #to_proc ⇒ Object
- #wait_all(_step) ⇒ Object
Methods included from Utils
Constructor Details
#initialize(task) ⇒ Base
Returns a new instance of Base.
11 12 13 |
# File 'lib/wsdirector/protocols/base.rb', line 11 def initialize(task) @task = task end |
Instance Method Details
#debug(step) ⇒ Object
Prints provided message
43 44 45 |
# File 'lib/wsdirector/protocols/base.rb', line 43 def debug(step) print(step.fetch("message")) end |
#handle_step(step) ⇒ Object
19 20 21 22 23 |
# File 'lib/wsdirector/protocols/base.rb', line 19 def handle_step(step) type = step.delete("type") raise Error, "Unknown step: #{type}" unless respond_to?(type) public_send(type, step) end |
#init_client(**options) ⇒ Object
15 16 17 |
# File 'lib/wsdirector/protocols/base.rb', line 15 def init_client(**) @client = build_client(**) end |
#receive(step) ⇒ Object
47 48 49 50 51 52 53 54 |
# File 'lib/wsdirector/protocols/base.rb', line 47 def receive(step) expected = step.fetch("data") received = client.receive raise UnmatchedExpectationError, prepare_receive_error(expected, received) unless receive_matches?(expected, received) rescue ThreadError raise NoMessageError, "Expected to receive #{expected} but nothing has been received" end |
#receive_all(step) ⇒ Object
rubocop: disable Metrics/CyclomaticComplexity
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/wsdirector/protocols/base.rb', line 57 def receive_all(step) = step.delete("messages") raise ArgumentError, "Messages array must be specified" if .nil? || .empty? expected = Hash[.map do |msg| multiplier = parse_multiplier(msg.delete("multiplier") || "1") [msg["data"], multiplier] end] total_expected = expected.values.sum total_received = 0 total_expected.times do received = client.receive total_received += 1 match = expected.find { |k, _| receive_matches?(k, received) } raise UnexpectedMessageError, "Unexpected message received: #{received}" if match.nil? expected[match.first] -= 1 expected.delete(match.first) if expected[match.first].zero? end rescue ThreadError raise NoMessageError, "Expected to receive #{total_expected} messages " \ "but received only #{total_received}" end |
#send(step) ⇒ Object
rubocop: enable Metrics/CyclomaticComplexity
91 92 93 94 95 |
# File 'lib/wsdirector/protocols/base.rb', line 91 def send(step) data = step.fetch("data") data = JSON.generate(data) if data.is_a?(Hash) client.send(data) end |
#sleep(step) ⇒ Object
Sleeps for a specified number of seconds.
If “shift” is provided than the initial value is shifted by random number from (-shift, shift).
Set “debug” to true to print the delay time.
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/wsdirector/protocols/base.rb', line 31 def sleep(step) delay = step.fetch("time").to_f shift = step.fetch("shift", 0).to_f delay = delay - shift * rand + shift * rand print("Sleep for #{delay}s") if step.fetch("debug", false) Kernel.sleep delay if delay > 0 end |
#to_proc ⇒ Object
101 102 103 |
# File 'lib/wsdirector/protocols/base.rb', line 101 def to_proc proc { |step| handle_step(step) } end |
#wait_all(_step) ⇒ Object
97 98 99 |
# File 'lib/wsdirector/protocols/base.rb', line 97 def wait_all(_step) task.global_holder.wait_all end |