Class: Splash::Transports::Rabbitmq::Client
- Includes:
- Config, Loggers, Splash::Transports
- Defined in:
- lib/splash/transports/rabbitmq.rb
Overview
publish / get Mode RabbitMQ Client
Constant Summary
Constants included from Loggers
Loggers::ALIAS, Loggers::LEVELS
Constants included from Constants
Constants::AUTHOR, Constants::BACKENDS_STRUCT, Constants::CONFIG_FILE, Constants::COPYRIGHT, Constants::DAEMON_LOGMON_SCHEDULING, Constants::DAEMON_METRICS_SCHEDULING, Constants::DAEMON_PID_FILE, Constants::DAEMON_PROCESS_NAME, Constants::DAEMON_PROCMON_SCHEDULING, Constants::DAEMON_STDERR_TRACE, Constants::DAEMON_STDOUT_TRACE, Constants::DEFAULT_RETENTION, Constants::EMAIL, Constants::EXECUTION_TEMPLATE, Constants::EXECUTION_TEMPLATE_TOKENS_LIST, Constants::LICENSE, Constants::LOGGERS_STRUCT, Constants::PID_PATH, Constants::PROMETHEUS_ALERTMANAGER_URL, Constants::PROMETHEUS_PUSHGATEWAY_URL, Constants::PROMETHEUS_URL, Constants::TRACE_PATH, Constants::TRANSPORTS_STRUCT, Constants::VERSION, Constants::WEBADMIN_IP, Constants::WEBADMIN_PID_FILE, Constants::WEBADMIN_PID_PATH, Constants::WEBADMIN_PORT, Constants::WEBADMIN_PROCESS_NAME, Constants::WEBADMIN_PROXY, Constants::WEBADMIN_STDERR_TRACE, Constants::WEBADMIN_STDOUT_TRACE
Instance Method Summary collapse
-
#ack(ack) ⇒ Boolean
ack a specific message for manual ack with a delivery tag to a queue.
-
#close ⇒ Object
close the RabbitMQ connection.
-
#execute(order) ⇒ Void
send an execution order message (verb+payload) via RabbitMQ to an slash input queue.
-
#get(options = {}) ⇒ Hash
Get a message from a RabbitMQ queue.
-
#initialize ⇒ Client
constructor
Constructor initialize a Bunny Client.
-
#publish(options = {}) ⇒ Object
publish to a queue.
-
#purge(options) ⇒ Object
purge a queue.
Methods included from Loggers
#change_logger, #get_logger, #get_session
Methods included from Config
Methods included from ConfigUtilities
#addservice, #checkconfig, #flush_backend, #setupsplash
Methods included from Helpers
#check_unicode_term, #daemonize, #format_by_extensions, #format_response, #get_processes, #group_root, #install_file, #is_root?, #make_folder, #make_link, #run_as_root, #search_file_in_gem, #user_root, #verify_file, #verify_folder, #verify_link, #verify_service
Methods included from Splash::Transports
#get_default_client, #get_default_subscriber
Constructor Details
#initialize ⇒ Client
Constructor initialize a Bunny Client
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/splash/transports/rabbitmq.rb', line 52 def initialize @config = get_config.transports host = @config[:rabbitmq][:host] port = @config[:rabbitmq][:port] vhost = (@config[:rabbitmq][:vhost])? @config[:rabbitmq][:vhost] : '/' passwd = (@config[:rabbitmq][:passwd])? @config[:rabbitmq][:passwd] : 'guest' user = (@config[:rabbitmq][:user])? @config[:rabbitmq][:user] : 'guest' conf = { :host => host, :vhost => vhost, :user => user, :password => passwd, :port => port.to_i} begin @connection = Bunny.new conf @connection.start @channel = @connection.create_channel rescue Bunny::Exception splash_exit case: :service_dependence_missing, more: "RabbitMQ Transport not available." end end |
Instance Method Details
#ack(ack) ⇒ Boolean
ack a specific message for manual ack with a delivery tag to a queue
88 89 90 |
# File 'lib/splash/transports/rabbitmq.rb', line 88 def ack(ack) return @channel.acknowledge(ack, false) end |
#close ⇒ Object
close the RabbitMQ connection
126 127 128 |
# File 'lib/splash/transports/rabbitmq.rb', line 126 def close @connection.close end |
#execute(order) ⇒ Void
send an execution order message (verb+payload) via RabbitMQ to an slash input queue
96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/splash/transports/rabbitmq.rb', line 96 def execute(order) queue = order[:return_to] lock = Mutex.new res = nil condition = ConditionVariable.new get_default_subscriber(queue: queue).subscribe do |delivery_info, properties, payload| res = YAML::load(payload) lock.synchronize { condition.signal } end get_logger.send "Verb : #{order[:verb].to_s} to queue : #{order[:queue]}." get_default_client.publish queue: order[:queue], message: order.to_yaml lock.synchronize { condition.wait(lock) } return res end |
#get(options = {}) ⇒ Hash
Get a message from a RabbitMQ queue
116 117 118 119 120 121 122 123 |
# File 'lib/splash/transports/rabbitmq.rb', line 116 def get( ={}) queue = @channel.queue([:queue]) opt = {}; opt[:manual_ack] = ([:manual_ack])? true : false delivery_info, properties, payload = queue.pop res = {:message => payload} res[:ack] = delivery_info.delivery_tag if [:manual_ack] return res end |
#publish(options = {}) ⇒ Object
publish to a queue
81 82 83 |
# File 'lib/splash/transports/rabbitmq.rb', line 81 def publish( ={}) return @channel.default_exchange.publish([:message], :routing_key => [:queue]) end |
#purge(options) ⇒ Object
purge a queue
73 74 75 |
# File 'lib/splash/transports/rabbitmq.rb', line 73 def purge() @channel.queue([:queue]).purge end |