#!/usr/bin/env ruby

require 'rubygems'
require 'eventmachine'
require 'json'
require 'active_support'
require 'open-uri'
require 'mq'

Signal.trap('INT') { AMQP.stop{ EM.stop } }
Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

class Listener
  
  attr :callbacks
  
  def initialize
    @callbacks = []
  end

  def each_tweet(&blk)
    @callbacks << blk
    puts "Registered callback"
  end

  def tweet(json)
   @callbacks.each {|c| c.call(json)}
  end

end

begin
EventMachine::run {


  listener = Listener.new
  AMQP.start(:host => 'broadway.citemine.com', :port => 5672) do
    queue = MQ.queue("tweets")

    listener.each_tweet do |tweet|
      queue.publish(tweet)
      puts "Queued tweet"
    end

  end

  Thread.new do
    Thread.pass
    ARGV.each_with_index do |url, i|
      puts "Processing #{url}"
      open(url) do |source|
        begin
          source.each do |line|
            # Is it valid json?
            JSON.parse(line)
            listener.tweet(line)
            puts(line)
          end
        rescue
          puts "Parser error. Continuing."
        end
      end
      puts "#{i+1} URLs processed"
    end
    AMQP.stop do
      EM.stop
    end
  end
}
rescue Exception => e
  puts "Broken: #{e}"
  e.backtrace.each do |b|
    puts b
  end
end

