Class: Publisher
- Inherits:
-
Object
- Object
- Publisher
- Defined in:
- lib/nchan_tools/pubsub.rb
Defined Under Namespace
Classes: PublisherError
Instance Attribute Summary collapse
-
#accept ⇒ Object
Returns the value of attribute accept.
-
#channel_info ⇒ Object
Returns the value of attribute channel_info.
-
#channel_info_type ⇒ Object
Returns the value of attribute channel_info_type.
-
#extra_headers ⇒ Object
Returns the value of attribute extra_headers.
-
#messages ⇒ Object
Returns the value of attribute messages.
-
#nofail ⇒ Object
Returns the value of attribute nofail.
-
#response ⇒ Object
Returns the value of attribute response.
-
#response_body ⇒ Object
Returns the value of attribute response_body.
-
#response_code ⇒ Object
Returns the value of attribute response_code.
-
#url ⇒ Object
Returns the value of attribute url.
-
#verbose ⇒ Object
Returns the value of attribute verbose.
-
#ws ⇒ Object
Returns the value of attribute ws.
Instance Method Summary collapse
- #delete ⇒ Object
- #get(accept_header = nil) ⇒ Object
-
#initialize(url, opt = {}) ⇒ Publisher
constructor
A new instance of Publisher.
- #on_complete(&block) ⇒ Object
- #on_response(&block) ⇒ Object
- #parse_channel_info(data, content_type = nil) ⇒ Object
- #post(body, content_type = nil, es_event = nil, &block) ⇒ Object
- #put(body, content_type = nil, es_event = nil, &block) ⇒ Object
- #reset ⇒ Object
- #submit(body, method = :POST, content_type = :'text/plain', eventsource_event = nil, &block) ⇒ Object
- #terminate ⇒ Object
- #with_url(alt_url) ⇒ Object
Constructor Details
#initialize(url, opt = {}) ⇒ Publisher
Returns a new instance of Publisher.
1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 |
# File 'lib/nchan_tools/pubsub.rb', line 1653 def initialize(url, opt={}) @url= url unless opt[:nostore] @messages = MessageStore.new :noid => true @messages.name = "pub" end @timeout = opt[:timeout] @accept = opt[:accept] @verbose = opt[:verbose] @on_response = opt[:on_response] @ws_wait_until_response = true if opt[:ws] || opt[:websocket] @ws = Subscriber.new url, 1, timeout: 100000, client: :websocket, permessage_deflate: opt[:permessage_deflate] @ws_sent_msg = [] @ws. do |msg| sent = @ws_sent_msg.shift if @messages && sent @messages << sent[:msg] end self.response=Typhoeus::Response.new self.response_code=200 #fake it self.response_body=msg sent[:response] = self.response sent[:condition].signal true if sent[:condition] @on_response.call(self.response_code, self.response_body) if @on_response end @ws.on_failure do |err| raise PublisherError, err end @ws.run @ws.wait :ready end end |
Instance Attribute Details
#accept ⇒ Object
Returns the value of attribute accept.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def accept @accept end |
#channel_info ⇒ Object
Returns the value of attribute channel_info.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def channel_info @channel_info end |
#channel_info_type ⇒ Object
Returns the value of attribute channel_info_type.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def channel_info_type @channel_info_type end |
#extra_headers ⇒ Object
Returns the value of attribute extra_headers.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def extra_headers @extra_headers end |
#messages ⇒ Object
Returns the value of attribute messages.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def @messages end |
#nofail ⇒ Object
Returns the value of attribute nofail.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def nofail @nofail end |
#response ⇒ Object
Returns the value of attribute response.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def response @response end |
#response_body ⇒ Object
Returns the value of attribute response_body.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def response_body @response_body end |
#response_code ⇒ Object
Returns the value of attribute response_code.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def response_code @response_code end |
#url ⇒ Object
Returns the value of attribute url.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def url @url end |
#verbose ⇒ Object
Returns the value of attribute verbose.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def verbose @verbose end |
#ws ⇒ Object
Returns the value of attribute ws.
1652 1653 1654 |
# File 'lib/nchan_tools/pubsub.rb', line 1652 def ws @ws end |
Instance Method Details
#delete ⇒ Object
1882 1883 1884 |
# File 'lib/nchan_tools/pubsub.rb', line 1882 def delete submit nil, :DELETE end |
#get(accept_header = nil) ⇒ Object
1877 1878 1879 1880 1881 |
# File 'lib/nchan_tools/pubsub.rb', line 1877 def get(accept_header=nil) self.accept=accept_header submit nil, :GET self.accept=nil end |
#on_complete(&block) ⇒ Object
1761 1762 1763 1764 |
# File 'lib/nchan_tools/pubsub.rb', line 1761 def on_complete(&block) raise ArgumentError, "block must be given" unless block @on_complete = block end |
#on_response(&block) ⇒ Object
1756 1757 1758 1759 |
# File 'lib/nchan_tools/pubsub.rb', line 1756 def on_response(&block) @on_response = block if block_given? @on_response end |
#parse_channel_info(data, content_type = nil) ⇒ Object
1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 |
# File 'lib/nchan_tools/pubsub.rb', line 1704 def parse_channel_info(data, content_type=nil) info = {} case content_type when "text/plain" mm = data.match(/^queued messages: (.*)\r$/) info[:messages] = mm[1].to_i if mm mm = data.match(/^last requested: (.*) sec\. ago\r$/) info[:last_requested] = mm[1].to_i if mm mm = data.match(/^active subscribers: (.*)\r$/) info[:subscribers] = mm[1].to_i if mm mm = data.match(/^last message id: (.*)$/) info[:last_message_id] = mm[1] if mm return info, :plain when "text/json", "application/json" begin info_json=JSON.parse data rescue JSON::ParserError => e return nil end info[:messages] = info_json["messages"].to_i info[:last_requested] = info_json["requested"].to_i info[:subscribers] = info_json["subscribers"].to_i info[:last_message_id] = info_json["last_message_id"] return info, :json when "application/xml", "text/xml" ix = Oga.parse_xml(data, :strict => true) info[:messages] = ix.at_xpath('//messages').text.to_i info[:last_requested] = ix.at_xpath('//requested').text.to_i info[:subscribers] = ix.at_xpath('//subscribers').text.to_i info[:last_message_id] = ix.at_xpath('//last_message_id').text return info, :xml when "application/yaml", "text/yaml" begin yam=YAML.load data rescue return nil end info[:messages] = yam["messages"].to_i info[:last_requested] = yam["requested"].to_i info[:subscribers] = yam["subscribers"].to_i info[:last_message_id] = yam["last_message_id"] return info, :yaml when nil ["text/plain", "text/json", "text/xml", "text/yaml"].each do |try_content_type| ret, type = parse_channel_info data, try_content_type return ret, type if ret end else raise PublisherError, "Unexpected content-type #{content_type}" end end |
#post(body, content_type = nil, es_event = nil, &block) ⇒ Object
1885 1886 1887 |
# File 'lib/nchan_tools/pubsub.rb', line 1885 def post(body, content_type=nil, es_event=nil, &block) submit body, :POST, content_type, es_event, &block end |
#put(body, content_type = nil, es_event = nil, &block) ⇒ Object
1888 1889 1890 |
# File 'lib/nchan_tools/pubsub.rb', line 1888 def put(body, content_type=nil, es_event=nil, &block) submit body, :PUT, content_type, es_event, &block end |
#reset ⇒ Object
1892 1893 1894 |
# File 'lib/nchan_tools/pubsub.rb', line 1892 def reset @messages.clear end |
#submit(body, method = :POST, content_type = :'text/plain', eventsource_event = nil, &block) ⇒ Object
1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 |
# File 'lib/nchan_tools/pubsub.rb', line 1787 def submit(body, method=:POST, content_type= :'text/plain', eventsource_event=nil, &block) self.response=nil self.response_code=nil self.response_body=nil if Enumerable===body i=0 body.each{|b| i+=1; submit(b, method, content_type, &block)} return i end return submit_ws body, content_type, &block if @ws headers = {:'Content-Type' => content_type, :'Accept' => accept} headers[:'X-Eventsource-Event'] = eventsource_event if eventsource_event headers.merge! @extra_headers if @extra_headers post = Typhoeus::Request.new( @url, headers: headers, method: method, body: body, timeout: @timeout || PUBLISH_TIMEOUT, connecttimeout: @timeout || PUBLISH_TIMEOUT, verbose: @verbose ) if body && @messages msg=Message.new body msg.content_type=content_type msg.eventsource_event=eventsource_event end if @on_complete post.on_complete @on_complete else post.on_complete do |response| self.response=response self.response_code=response.code self.response_body=response.body if response.success? #puts "published message #{msg.to_s[0..15]}" @channel_info, @channel_info_type = parse_channel_info response.body, response.headers["Content-Type"] if @messages && msg msg.id = @channel_info[:last_message_id] if @channel_info @messages << msg end elsif response.timed_out? # aw hell no #puts "publisher err: timeout" pub_url=URI.parse_possibly_unix_socket(response.request.url) pub_url = "#{pub_url.path}#{pub_url.query ? "?#{pub_url.query}" : nil}" raise PublisherError, "Publisher #{response.request.[:method]} to #{pub_url} timed out." elsif response.code == 0 # Could not get an http response, something's wrong. #puts "publisher err: #{response.return_message}" errmsg="No HTTP response: #{response.}" unless self.nofail then raise PublisherError, errmsg end else # Received a non-successful http response. #puts "publisher err: #{response.code.to_s}" errmsg="HTTP request failed: #{response.code.to_s}" unless self.nofail then raise PublisherError, errmsg end end block.call(self.response_code, self.response_body) if block on_response.call(self.response_code, self.response_body) if on_response end end #puts "publishing to #{@url}" begin post.run rescue Exception => e last=nil, i=0 e.backtrace.select! do |bt| if bt.match(/(gems\/(typhoeus|ethon)|pubsub\.rb)/) last=i false else i+=1 true end end e.backtrace.insert last, "..." raise PublisherError, e end end |
#terminate ⇒ Object
1783 1784 1785 |
# File 'lib/nchan_tools/pubsub.rb', line 1783 def terminate @ws.terminate if @ws end |
#with_url(alt_url) ⇒ Object
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 |
# File 'lib/nchan_tools/pubsub.rb', line 1693 def with_url(alt_url) prev_url=@url @url=alt_url if block_given? yield @url=prev_url else self end end |