#!/usr/bin/env ruby

require 'rubygems'
require 'mq'
require 'solr'
require 'tweetstream'
require 'json'
require 'optparse'
require 'pp'

Signal.trap('INT') {
  unless EM.forks.empty?
    EM.forks.each do |pid|
      Process.kill('KILL', pid)
    end
  end
  AMQP.stop{ EM.stop }
  exit(0)
}
Signal.trap('TERM') {
  unless EM.forks.empty?
    EM.forks.each do |pid|
      Process.kill('KILL', pid)
    end
  end
  AMQP.stop{ EM.stop }
  exit(0)
}

DEBUG = ENV['DEBUG'] && ENV['DEBUG']!="false"

def parse_args(args)
  options = {:qhost => 'localhost', :qport => 5672, :qname => "tweets", :shost => "localhost", :sport => 8983, :workers => 1 }

  opts = OptionParser.new do |opts|
    opts.banner = "Usage: mq2solr.rb [options]"
    opts.separator ""
    opts.separator "Options:"
    
    opts.on("-n", "--numworkers NUMBER", "The number of workers to start. Default is #{options[:workers]}") do |workers|
      options[:workers] = workers.to_i
    end

    opts.on("-h", "--qhost HOSTNAME", "The AMQP server. Default is #{options[:qhost]}") do |hostname|
      options[:qhost] = hostname
    end

    opts.on("-r", "--qport PORT", "The port number to connect to on the AMQP server. Default is #{options[:qport]}") do |port|
      options[:qport] = port
    end

    opts.on("-q", "--qname QUEUE", "The name of the queue in which to deposit the tweets. Default is \"#{options[:qname]}\"") do |queue|
      options[:qname] = queue
    end
    
    opts.on("-s", "--shost HOSTNAME", "The Solr server. Default is #{options[:shost]}") do |hostname|
      options[:shost] = hostname
    end

    opts.on("-t", "--sport PORT", "The port number to connect to on the Solr server. Default is #{options[:sport]}") do |port|
      options[:sport] = port
    end

    opts.on_tail("--help", "Show this message") do
      puts opts
      exit
    end
  end

  opts.parse!(args)
  options
end

def process_item(item)
  
  raw = JSON.parse(item)

  if DEBUG
    puts "="*20
    puts raw.class.name
    pp raw
  end

  return unless raw.is_a?(Hash)

  begin
    hash = TweetStream::Hash.new(raw)
    puts "Hash is:"
    pp hash
  rescue Exception => e
    puts e
    return
  end

  if hash[:delete] && hash[:delete][:status]
    delete_tweet(hash[:delete][:status][:id])
  elsif hash[:limit] && hash[:limit][:track]
    rate_limited(hash[:limit][:track])
  elsif hash[:text] && hash[:user]
    index_tweet(hash)
  end
end

def index_tweet(tweet)

  begin
    status = TweetStream::Status.new(tweet)
  
    pp status if DEBUG

    doc = {}

    doc[:id] = status.id
    doc[:fromID] = status.user.id
    doc[:fromName] = status.user.screen_name
    doc[:toID] = status.in_reply_to_user_id unless status.in_reply_to_user_id.nil?
    doc[:toName] = status.in_reply_to_screen_name unless status.in_reply_to_screen_name.nil?

    # Check for new style retweet
    if status.has_key? 'retweeted_status'
      doc[:isRT] = true
      doc[:originID] = status.retweeted_status.user.id
      doc[:originName] = status.retweeted_status.user.screen_name
    elsif match = status.text.match(/^ *RT *:? *@ *(\w+)/)
      # old style retweet
      doc[:originName] = match[1]
      doc[:isRT] = true
    else
      doc[:isRT] = false
    end

    doc[:timestamp] = Time.parse(status.created_at).utc.xmlschema
    doc[:text] = status.text
    
    doc[:followers] = status.user.followers_count
    doc[:following] = status.user.friends_count

    pp doc if DEBUG

    @solr.add(doc)

    @tweet_count += 1

    puts "Processed #{@tweet_count} tweets" if DEBUG

  rescue Exception => e
    puts e
  end
end

def rate_limited(skipped)
  puts "Skipped #{skipped} tweets due to rate limiting" if DEBUG
end

def delete_tweet(id)
  @solr.delete(id)
  puts "Deleted tweet with ID #{id}" if DEBUG
end

opts = parse_args(ARGV)
EM.fork(opts[:workers]) do
  @solr = Solr::Connection.new("http://#{opts[:shost]}:#{opts[:sport]}/solr", :autocommit => :off)
  unless @solr.ping
    puts "Couldn't connect to Solr server. Are you sure this is the correct URL: http://#{opts[:shost]}:#{opts[:sport]}/solr"
    exit
  end
  @tweet_count = 0
  AMQP.start(:host => opts[:qhost], :port => opts[:qport]) do
  
    queue = MQ.queue(opts[:qname])
    queue.subscribe { |item| process_item(item) }
    
  end
end

while not EM.forks.empty?
  sleep(5)
end

