Class: MessageChannel::Mongodb
- Inherits:
-
Object
- Object
- MessageChannel::Mongodb
- Defined in:
- lib/message_channel/mongodb.rb
Constant Summary collapse
- SIZE =
8000- NAME =
"_event_queue"
Instance Method Summary collapse
- #get_event_queue ⇒ Object
- #get_event_tail(event_queue) ⇒ Object
-
#initialize(host: nil, port: nil, db: nil, size: nil, name: nil) ⇒ Mongodb
constructor
A new instance of Mongodb.
- #listen(*patterns, &block) ⇒ Object
- #listen_each(*patterns, &block) ⇒ Object
- #listen_once(*patterns) ⇒ Object
- #notify(topic, **items) ⇒ Object
- #unlisten(**patterns) ⇒ Object
Constructor Details
#initialize(host: nil, port: nil, db: nil, size: nil, name: nil) ⇒ Mongodb
Returns a new instance of Mongodb.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/message_channel/mongodb.rb', line 11 def initialize( host: nil, port: nil, db: nil, size: nil, name: nil ) @host = host || "127.0.0.1" @port = ( port || 27017 ).to_i @db = db || "test" @size = ( size || SIZE ).to_i @name = name || NAME @url = "mongodb://#{ @host }:#{ @port }/#{ @db }" @client = ::Mongo::Client.new( @url ) @threads = {} @mutex = Mutex.new @event_queue = get_event_queue end |
Instance Method Details
#get_event_queue ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/message_channel/mongodb.rb', line 27 def get_event_queue event_queue = @mutex.synchronize do if @client.database.collection_names.include?( @name ) event_queue = @client[ @name ] else event_queue = @client[ @name, capped: true, size: @size ] event_queue.create now = Time.now doc = { topic: "reset", at: now.strftime("%Y%m%d.%H%M%S.%6L"), } event_queue.insert_one( doc ) event_queue end end end |
#get_event_tail(event_queue) ⇒ Object
45 46 47 48 49 50 51 52 53 |
# File 'lib/message_channel/mongodb.rb', line 45 def get_event_tail( event_queue ) filter = {} if ( enum = event_queue.find( {}, { sort: { "$natural" => -1 } } ).to_enum ) if ( doc = enum.next rescue nil ) filter = { "_id"=>{ "$gt"=>doc["_id"] } } end end event_tail = event_queue.find( filter, { cursor_type: :tailable_await } ).to_enum end |
#listen(*patterns, &block) ⇒ Object
108 109 110 111 112 113 114 115 116 |
# File 'lib/message_channel/mongodb.rb', line 108 def listen( *patterns, &block ) if block.nil? listen_once( *patterns ) else listen_each( *patterns ) do |topic, items| block.call( topic, items ) end end end |
#listen_each(*patterns, &block) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/message_channel/mongodb.rb', line 86 def listen_each( *patterns, &block ) patterns.each do |pattern| @threads[pattern] = ::Thread.start( pattern ) do |pttrn| begin event_queue = get_event_queue event_tail = get_event_tail( event_queue ) while ( doc = event_tail.next ) items = JSON.parse( doc.to_json, symbolize_names: true ) topic = items[:topic] if File.fnmatch( pttrn, topic, File::FNM_PATHNAME ) items.delete( :_id ) items.delete( :topic ) block.call( topic, items ) end end rescue => e nil end end end end |
#listen_once(*patterns) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/message_channel/mongodb.rb', line 55 def listen_once( *patterns ) queue = Queue.new threads = {} patterns.each do |pattern| threads[pattern] = ::Thread.start( pattern ) do |pttrn| event_queue = get_event_queue event_tail = get_event_tail( event_queue ) begin while ( doc = event_tail.next ) items = JSON.parse( doc.to_json, symbolize_names: true ) topic = items[:topic] if File.fnmatch( pttrn, topic, File::FNM_PATHNAME ) items.delete( :_id ) items.delete( :topic ) queue.push [topic, items] end end rescue => e nil end end end topic, items = queue.pop patterns.each do |pattern| threads[pattern].kill rescue nil threads.delete( pattern ) rescue nil end [topic, items] end |
#notify(topic, **items) ⇒ Object
125 126 127 128 |
# File 'lib/message_channel/mongodb.rb', line 125 def notify( topic, **items ) items[:topic] = topic @event_queue.insert_one( items ) end |
#unlisten(**patterns) ⇒ Object
118 119 120 121 122 123 |
# File 'lib/message_channel/mongodb.rb', line 118 def unlisten( **patterns ) patterns.each do |pattern| @threads[pattern].kill rescue nil @threads.delete( pattern ) rescue nil end end |