#!/usr/bin/env ruby

require 'rubygems'
require 'twitter/json_stream'
require 'mq'
require 'pp'
require 'optparse'
require 'yaml'

Signal.trap('INT') { AMQP.stop{ EM.stop } }
Signal.trap('TERM'){ AMQP.stop{ EM.stop } }

DEBUG = ENV['DEBUG']!="false"

def parse_args(args)
  options = {:track => nil, :username => 'funkymethod', :password => 'w00dst0ck', :qhost => 'localhost', :qport => 5672, :qname => "tweets"}
  
  opts = OptionParser.new do |opts|
    opts.banner = "Usage: twitter2mq.rb --track term1,term2,term3,... | --track-file FILE [options]"
    opts.separator ""
    opts.separator "Required options:"
    
    opts.on("-f", "--track-file FILE", "Obtain the terms for the track predicate from FILE. FILE should be a YAML file") do |file|
      unless File.exists?(file)
        puts "YAML file #{file} does not exist."
        exit
      end
      obj = YAML.load_file(file)
      if obj.is_a?(Array)
        options[:track] = obj.join(',')
      elsif obj.is_a?(String)
        options[:track] = obj.tr_s(' ', ',')
      else
        puts "Couldn't parse #{file}."
        exit
      end
      puts options[:track] if DEBUG
    end
    
    opts.on("-t", "--track term1,term2,term3", Array, "Track each of the terms provided on the command line") do |tlist|
      options[:track] = tlist.join(',')
    end
    
    opts.separator ""
    opts.separator "Other options:"
    
    opts.on("-u", "--username USERNAME", "Your twitter username") do |username|
      options[:username] = username
    end
    
    opts.on("-p", "--password PASSWORD", "Your twitter password") do |password|
      options[:password] = password
    end
    
    opts.on("-h", "--qhost HOSTNAME", "The AMQP server. Default is #{options[:qhost]}") do |hostname|
      options[:qhost] = hostname
    end
    
    opts.on("-r", "--qport PORT", "The port number to connect to on the AMQP server. Default is #{options[:qport]}") do |port|
      options[:qport] = port
    end
    
    opts.on("-q", "--qname QUEUE", "The name of the queue in which to deposit the tweets. Default is \"#{options[:qname]}\"") do |queue|
      options[:qname] = queue
    end
    
    opts.on_tail("--help", "Show this message") do
      puts opts
      exit
    end
  end
  
  opts.parse!(args)
  if options[:track].nil?
    puts opts
    exit
  end
  options
end

EventMachine::run {
  
  opts = parse_args(ARGV)
  
  stream = Twitter::JSONStream.connect(
    :path    => '/1/statuses/filter.json',
    :method  => 'POST',
    :content => "track=#{opts[:track]}",
    :auth    => "#{opts[:username]}:#{opts[:password]}"
  )
  
  AMQP.start(:host => opts[:qhost], :port => opts[:qport]) do
    queue = MQ.queue(opts[:qname])

    stream.each_item do |item|
      queue.publish(item)
      puts "Queued tweet" if DEBUG
    end

    stream.on_error do |message|
      # No need to worry here. It might be an issue with Twitter. 
      # Log message for future reference. JSONStream will try to reconnect after a timeout.
      puts "[WARNING] #{message}"
    end

    stream.on_max_reconnects do |timeout, retries|
      puts "[ERROR] Reached max reconnection attempts: timeout: #{timeout}, retries: #{retries}"
    end
  end
}

