#!/usr/bin/env ruby

require 'rubygems'
require 'eventmachine'
require 'json'
require 'active_support'
# require 'nokogiri'
require 'rexml/parsers/baseparser'
require 'rexml/parsers/streamparser'
require 'rexml/streamlistener'
require 'rexml/text'
require 'open-uri'
require 'mq'

Signal.trap('INT') { AMQP.stop{ EM.stop } }
Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

class Listener
  
  attr :status
  attr :callbacks
  
  def initialize
    @status = ""
    @callbacks = []
  end

  def each_tweet(&blk)
    @callbacks << blk
    puts "Registered callback"
  end

  def tag_start(name, attrs=[])
    @status += "<#{name}>" # Twitter's feed doesn't use attrs.
  end

  def tag_end(name)
    @status += "</#{name}>"
    if name=="status"
      json = Hash.from_xml(@status)["status"].to_json
      puts "#{json}\r\n"
      begin
        @callbacks.each {|c| c.call(json)}
      rescue Exception => e
        puts e
      end
      @status = ""
    end
  end

  def text(text)
   @status += REXML::Text.new(text).to_s
  end

  def reset
    @status = ""
  end
  
  def method_missing(method, *args, &block)
  end
end

begin
EventMachine::run {


  listener = Listener.new
  AMQP.start(:host => 'broadway.citemine.com', :port => 5672) do
    queue = MQ.queue("tweets")

    listener.each_tweet do |tweet|
      queue.publish(tweet)
      puts "Queued tweet"
    end

  end

  Thread.new do
    Thread.pass
    ARGV.each_with_index do |url, i|
      puts "Processing #{url}"
      open(url) do |source|
        listener.reset
        parser = REXML::Parsers::StreamParser.new(source, listener)
        parser.ignore_undefined_namespaces = true
        begin
          parser.parse
        rescue
          puts "Parser error. Continuing."
        end
      end
      puts "#{i+1} URLs processed"
    end
    AMQP.stop do
      EM.stop
    end
  end
}
rescue Exception => e
  puts "Broken: #{e}"
  e.backtrace.each do |b|
    puts b
  end
end

