Class: Profit::Server
- Inherits:
-
Object
- Object
- Profit::Server
- Defined in:
- lib/profit/server.rb
Constant Summary collapse
- METRIC_LIMIT_PER_KEY =
100
Instance Attribute Summary collapse
-
#ctx ⇒ Object
readonly
Returns the value of attribute ctx.
Instance Method Summary collapse
-
#initialize(options = {}) ⇒ Server
constructor
A new instance of Server.
- #run ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Server
Returns a new instance of Server.
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/profit/server.rb', line 9 def initialize( = {}) @options = {} @options[:redis_address] = [:redis_address] || "127.0.0.1" @options[:redis_port] = [:redis_port] || 6379 @options[:zmq_address] = [:zmq_address] || "tcp://*:5556" @options[:pool_size] = [:pool_size] || 10 @options[:log_path] = [:log_path] || STDOUT @options[:log_level] = [:log_level] || :error logger.level = log_level @ctx = ZMQ::Context.new logger.info "Starting profit_server with options: #{@options}" end |
Instance Attribute Details
#ctx ⇒ Object (readonly)
Returns the value of attribute ctx.
5 6 7 |
# File 'lib/profit/server.rb', line 5 def ctx @ctx end |
Instance Method Details
#run ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/profit/server.rb', line 23 def run EM.run do # startup the EM::Hiredis connections spawn_redis_connections # gives us a graceful exit setup_interrupt_handling # this is the entry to message handling EM.add_periodic_timer do # blocking ZMQ socket = puller.recv || "" # take a worker from the pool to save the metric to Redis redis_pool.perform do |conn| = JSON.parse() metric_key = ["profit", "metric", .delete("metric_key")].join(":") add_key_response = conn.sadd("profit:keys", metric_key) add_key_response.callback { |resp| logger.debug "adding metric key callback: #{resp}" } add_key_response.errback { |resp| logger.error "adding metric key error: #{resp}"} push_metric_response = conn.lpush metric_key, .to_json push_metric_response.callback { |resp| logger.debug "push metric callback: #{resp}" } push_metric_response.errback { |resp| logger.error "push metric error: #{resp}"} trim_list_response = conn.ltrim(metric_key, 0, METRIC_LIMIT_PER_KEY - 1) trim_list_response.callback { |resp| logger.debug "trim list callback: #{resp}" } trim_list_response.errback { |resp| logger.error "trim list error: #{resp}"} trim_list_response end end end end |