#!/usr/bin/perl -w

#
# message_to_rssacint
#
# Copyright (C) 2016 University of Southern California.
# All rights reserved.                                            
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# 

=head1 NAME

message_to_rssacint - convert dsnanon messages to rssac intermediate format

=head1 SYNOPSIS

message_to_rssacint < foo.message.fsdb > foo.rssacint

=head1 DESCRIPTION

This program reads dnsanon message files and produces rssac
intermediate format, a simple key-value format that is all one needs
to compute RSSAC-002 statistics.

It runs in linear time and has constant memory usage.

=head1 OPTIONS

=over

=item B<--file-seqno>

File sequence number to pass along in output.  (Default: none.)
Format is either an integer, or site:integer.

=item B<--queries-each-second>

Report number of queries and responses, binned in each second.
(Default: off.)

=item B<--no-active-ranges>

Report time ranges traffic is received.  (Default: on.)

=item B<--no-cache-easy>

Disable caching of "easy" keys.  (Default: they are cached.)

=item B<-d>

Enable debugging output.

=item B<-v>

Enable verbose output.

=item B<--help>

Show help.

=item B<--man>

Show full manual.


=head1 SAMPLE USAGE

=head2 Input

	#fsdb -F t msgid time srcip srcport dstip dstport protocol id qr opcode aa tc rd ra z ad cd rcode qdcount ancount nscount arcount edns_present edns_udp_size edns_extended_rcode edns_version edns_z msglen 
	1	1451192632.254226	128.9.168.85	39142	192.228.79.201	53	udp	21461	0	0	0	0	1	0	0	0	1	0	1	0	0	0	1	4096	0	0	0	40
	2	1451192632.255816	192.228.79.201	53	128.9.168.85	39142	udp	21461	1	0	0	0	1	0	0	0	0	0	1	0	13	15	1	4096	0	0	0	528
	3	1451192637.645691	128.9.168.85	59556	192.228.79.201	53	udp	63206	0	0	0	0	1	0	0	0	1	0	1	0	0	0	1	4096	0	0	0	40
	4	1451192637.647542	192.228.79.201	53	128.9.168.85	59556	udp	63206	1	0	0	0	1	0	0	0	0	0	1	0	13	15	1	4096	0	0	0	525
	5	1451192652.006988	128.9.168.85	42548	192.228.79.201	53	tcp	33197	0	0	0	0	1	0	0	0	1	0	1	0	0	0	1	4096	0	0	0	32
	6	1451192652.008452	192.228.79.201	53	128.9.168.85	42548	tcp	33197	1	0	0	0	1	0	0	0	0	0	1	0	6	12	1	4096	0	0	0	434


=head2 Command

    ./message_to_rssacint --file-seqno=1

=head2 Output

	#fsdb -F t key count
	+64:128.9.168.85	1
	+64:128.9.168.85	1
	+64:128.9.168.85	1
	+3t04	1
	+3t14	1
	+3u04	2
	+3u14	2
	+4t0:32	1
	+4t1:434	1
	+4u0:40	2
	+4u1:525	1
	+4u1:528	1
	+50:0	3
	+51:0	3
	-rt	1451192632,1451192637,1451192652
	<ts	1451192632.254226
	=rfileseqno:-	1
	>te	1451192652.008452
	# message_to_rssacint.pl --file-seqno=1


=back

=cut

use strict;
use Pod::Usage;
use Getopt::Long;
use Net::IP;

Getopt::Long::Configure ("bundling");
pod2usage(2) if ($#ARGV >= 0 && $ARGV[0] eq '-?');
my(@orig_argv) = @ARGV;
my($prog) = $0;
my $debug = undef;
my $verbose = undef;
my $cache_easy = 1;
my $file_seqno = undef;
my $active_ranges = 1;
my $queries_each_second = undef;
&GetOptions(
	'active-ranges!' => \$active_ranges,
	'cache-easy!' => \$cache_easy,
	'file-seqno=i' => \$file_seqno,
	'queries-each-second!' => \$queries_each_second,
 	'help|?' => sub { pod2usage(1); },
	'man' => sub { pod2usage(-verbose => 2); },
	'd|debug+' => \$debug,   
        'v|verbose+' => \$verbose) or pod2usage(2);

$active_ranges = 1 if ($queries_each_second);
my($in_schema) = "#fsdb -F t msgid time srcip srcport dstip dstport protocol id qr opcode aa tc rd ra z ad cd rcode qdcount ancount nscount arcount edns_present edns_udp_size edns_extended_rcode edns_version edns_z msglen";
#"#fsdb -F t msgid time srcip srcport dstip dstport protocol id qr opcode aa tc rd ra z ad cd rcode qdcount ancount nscount arcount edns_present edns_size ends_version ends_flags msglen";
# "#fsdb msgid time srcip srcport dstip dstport protocol id qr opcode aa tc rd ra z ad cd rcode qdcount ancount nscount arcount edns_present edns_size ends_version ends_flags msglen";
my($out_schema) = "#fsdb -F t key count";

binmode STDOUT, ":utf8";
print $out_schema . "\n";


my($failed_ips) = 0;
my $MAX_FAILED_IPS = 1000;

my(%easy) = ();
# cache stats counting
# For one file, I see 147 fastpath new seconds, 6165597 fastpath same updates, and 21074 slow paths.
# I.e., caching works very well.
#foreach (qw(x+s x+fi x+fs)) {
#    $easy{$_} = 0;
#}

sub truncate_ip6($$$) {
    my($srcip, $msgid, $time) = $_[0];
    # As nice as Net::IP is, it's slow to build objects, so we avoid
    # it for IPv4.
    my($srcip_ni) = new Net::IP($srcip);
    if (!$srcip_ni) {
	next if ($failed_ips > $MAX_FAILED_IPS);
	print "+eip-fail:$msgid,$time,$srcip\t1\n";
	return undef;
    };
    my $a_str = $srcip_ni->ip();
    $a_str =~ s/....:....:....:....$/:/;
    my $as = new Net::IP($a_str);
    return $as->short();
}

######################################################################
# Rangelist code.
# same code in message_to_rssacinc.pl and rssacint_reduce.pl

#
# Take the string form of a rangelist and break it into start and end arrays.
#
sub decompose_rangelist($) {
    my($rl_str) = $_[0];
    my(@ss, @es);
    foreach (split(/,/, $rl_str)) {
	my($s, $e) = split(/-/, $_);
	die "unparsable range $_\n" if (!defined($s));
	$e //= $s;
	push(@ss, $s);
	push(@es, $e);
    };
    return (\@ss, \@es);
			}

sub min($$) {
    return $_[0] < $_[1] ? $_[0] : $_[1];
}

#
# Take two range lists (format like: 1-2,4,6-7)
# and merge them.
#
sub merge_ranges($$;$) {
    my(@rangelists) = ($_[0], $_[1]);
    my($report_overlap_as_error) = $_[2];

    #
    # fast path
    # optimize appending a simple other on a ranged one
    # merge_ranges("1-2", "3") => "1-3"
    #
    my($one, $other) = (undef, undef);
    if ($rangelists[1] =~ /^\d+$/) {
	($one, $other) = (0, 1);
    } elsif ($rangelists[0] =~ /^\d+$/) {
	($one, $other) = (1, 0);
    };
    if (defined($other) && $rangelists[$one] =~ /\-(\d+)$/) {
	# can try to fastpath
	# one:  1-2   (or more complex)
	# other:    3 (hopefully)
	my($one_e) = $1;
	my($other_s) = $rangelists[$other];
        if ($one_e == $other_s) {
            # no change needed
	    # $easy{'x+fs'}++;
            if ($report_overlap_as_error) {
		print ":eoverlapping-regions\t$rangelists[$one]+$other_s\n" if ($report_overlap_as_error > 1);
		return $rangelists[$one] . "/e";
	    } else {
		return $rangelists[$one];
	    };
        } elsif ($one_e + 1 == $other_s) {
	    # $easy{'x+fi'}++;
            $rangelists[$one] =~ s/(\D?)(\d+)$/$1$other_s/;
            return $rangelists[$one];
    	};
	# fall through for slow path
    };
    # $easy{'x+s'}++;

    #
    # slow path
    #
    # Decompose comma-separated list into array of ranges (start and ends).
    #    
    my(@ss, @es);
    foreach (0..1) {
        ($ss[$_], $es[$_]) = decompose_rangelist($rangelists[$_]);
    };

    #
    # Count how many lists each range occurs in.
    # If there is overlap, make more intermediate ranges.
    #
    # On exit of this loop, we have ONE rangelist in an array, plus counts.
    #
    # (And ick: this code is ALL corner cases.)
    #
    my(@count, @s, @e);
  buildcount:
    while (1) {
	#
	# Check if either side has drained.
	#
	foreach $one (0, 1) {
	    # print "checking $one for emtpy, is $#{$ss[$one]}\n";
	    $other = 1 - $one;
	    if ($#{$ss[$one]} == -1) {
		push(@count, (1) x ($#{$ss[$other]} + 1));
		push(@s, @{$ss[$other]});
		push(@e, @{$es[$other]});
		last buildcount;
	    };
	};
	#
	# assert(have stuff left in both)
	#
	# Make $one be the one the starts first
	# (so we only have a million cases, not four million.)
	#
	my($new_count) = 1;
	if ($ss[0][0] < $ss[1][0]) {
	    ($one, $other) = (0, 1);
	} elsif ($ss[0][0] > $ss[1][0]) {
	    ($one, $other) = (1, 0);
	} else {
	    # both start at same time
	    $new_count = 2;
	    # $one becomes the one that ends first
	    if ($es[0][0] <= $es[1][0]) {
		($one, $other) = (0, 1);
	    } else {
		($one, $other) = (1, 0);
	    };
	};
	#
	# assert($lists[$one] starts first (or at same time))
	#
	my($consume_one) = undef;
	push(@count, $new_count);
	push(@s, $ss[$one][0]);
	if ($ss[$one][0] < $ss[$other][0]) {
	    # one starts first
	    if ($es[$one][0] < $ss[$other][0]) {
		# and ends before other
		# +----+
		#         +----+
		# or abutts other (in which case we will merge later)
		# +----+
		#       +----+
		push(@e, $es[$one][0]);
		$consume_one = 1;
	    } elsif ($es[$one][0] >= $ss[$other][0]) {
		# and overlaps with other
		# +----+
		#      +----+
		# or
		# +----+
		#    +----+
		push(@e, $ss[$other][0]-1);
		$ss[$one][0] = $ss[$other][0];
		$consume_one = 0;
	    } else {
		die "invariant violated: one $one starts first\n";
	    };
	} elsif ($ss[$one][0] == $ss[$other][0]) {
	    # start at same time
	    push(@e, $es[$one][0]);
	    $consume_one = 1;
	    if ($es[$one][0] < $es[$other][0]) {
		# but one ends first
		# +----+
		# +--------+
		$ss[$other][0] = $es[$one][0]+1;
	    } elsif ($es[$one][0] == $es[$other][0]) {
		# complete overlap
		# +----+
		# +----+
		#
		# so also consume other here:
		shift @{$ss[$other]};
		shift @{$es[$other]};
	    } else {
		die "invariant violated: one $one and other $other start at same time and other ends first\n";
	    };
	} else {
	    die "invariant violated: one $one starts after other $other\n";
	};
	if ($consume_one) {
	    shift @{$ss[$one]};
	    shift @{$es[$one]};
	};
    };

    #
    # We now have a clean, single rangelist in an array, with counts.
    #
    # Now concatinate adjacent ranges and report overlap.
    #
    my($out, $error_out) = ("", "");
    while ($#s != -1) {
	if ($count[0] == 2) {
	    if ($report_overlap_as_error) {
		print ":eoverlapping-regions\t$s[0]-$e[0]\n" if ($report_overlap_as_error > 1);
		$error_out = "/e";
	    };
	};
	# merge?
	if ($#s >= 1) {
	    if ($e[0]+1 >= $s[1]) {
		$s[1] = $s[0];
		shift @count;
		shift @s;
		shift @e;
		# no output
		next;
	    };
	};
	$out .= "," if ($out ne "");
	$out .= ($s[0] == $e[0] ? $s[0] : $s[0] . "-" . $e[0]);
	shift @count;
	shift @s;
	shift @e;
    };
    return $out . $error_out;
}

######################################################################

# same code in message_to_rssacinc.pl and rssacint_reduce.pl
sub reduce_pair($$$;$$) {
    my($op, $last_value, $value, $last_unique, $unique) = @_;
    # reduce!
    if ($op eq '+') {
        $value += $last_value;             
    } elsif ($op eq '!') {
	die "internal error: undef unique\n" if (!defined($unique));
	if (!defined($last_unique)) {
	    $value = 1;
	} else {
	    if ($unique ne $last_unique) {
		$value = $last_value + 1;
	    } else {
		$value = $last_value;
	    };
	};
    } elsif ($op eq '-') {
	# range
	$value = merge_ranges($last_value, $value);
    } elsif ($op eq '=') {
	# ranges with overlap reporting
	$value = merge_ranges($last_value, $value, 2);
    } elsif ($op eq '<') {
        # lexical comparision! (not numeric)
        $value = $last_value if ($last_value lt $value);
    } elsif ($op eq '>') {
        $value = $last_value if ($last_value gt $value);
    } else {
        print "+eunknown-op:$op\t1\n";
        print "$_\n";  # pass it anyway
	$value = $last_value;
    };
    return $value;
}


######################################################################


sub cachable_output($$$) {
    my($op, $key, $value) = @_;

    if (!$cache_easy) {
	print "$key\t$value\n";
	return;
    };
    if (defined($easy{$key})) {
	$easy{$key} = reduce_pair($op, $easy{$key}, $value);
    } else {
	$easy{$key} = $value;
    };
}

sub flush_cachable_output() {
    foreach (sort keys %easy) {
	print "$_\t$easy{$_}\n";
    };
}       

sub process_file($) {
    my($file) = @_;
    my($open_mode, $open_place) = ("<:utf8", $file);
    if ($file eq '-') {
	($open_mode, $open_place) = ("<&=", 0);
    };
    my $in;
    if (!open($in, $open_mode, $open_place)) {
	print "+eopen-fail:$file\t1\n";
	return;
    };
    if (defined($file_seqno)) {
	my(@parts) = split(/:/, $file_seqno);
	unshift(@parts, "-") if ($#parts < 1);
	# extra: keep track of what files we scanned, with site
        cachable_output("=", "=rfileseqno:$parts[0]", $parts[1]);
    };
    while (<$in>) {
	chomp;
        if (/^#/) {
            if (/^#fsdb/) {
                die "unexpected schema: $_\n"
                    if ($_ !~ /^$in_schema/);
		next;
            };
	    print "$_\n";
            next;
        };
	my($msgid, $time, $srcip, $srcport, $dstip, $dstport, $protocol, $id, $qr, $opcode, $aa, $tc, $rd, $ra, $z, $ad, $cd, $rcode, $qdcount, $ancount, $nscount, $arcount, $edns_present, $edns_udp_size, $edns_extended_rcode, $edns_version, $edns_z, $msglen) = split(/\s+/);
	# extra: keep track of measurement period
        cachable_output("<", "<ts", $time);
        cachable_output(">", ">te", $time);
	# extra: keep track of which seconds have activity
	my($time_secs) = undef;
	if ($active_ranges) {
	    if ($time =~ /^(\d+)(\.\d*)$/) {
	        $time_secs = $1;
		cachable_output("-", "-rt", $time_secs);
	    };
	};
        my($v46) = ($srcip =~ /\./ ? 4 : 6);
	my($short_protocol) = ($protocol =~ /^(.)/);
	# rssac-002v2 section 2.3: number of queries
        cachable_output("+", "+3$short_protocol$qr$v46", 1);
	# exta: queries per second
	if ($queries_each_second && defined($time_secs)) {
	    # we don't split out v4 / v6
	    cachable_output("+", "+3S$qr:$time_secs", 1);
	};
	# rssac-002v2 section 2.4: query and response size distribution
        cachable_output("+", "+4$short_protocol$qr:$msglen", 1);
	my($final_rcode) = ($edns_extended_rcode eq '-' || $edns_extended_rcode eq '0') ? $rcode : $edns_extended_rcode;
	# rssac-002v2 section 2.5: rcode distribution (responses only, but we do both)
        cachable_output("+", "+5$qr:$final_rcode", 1);
	# rssac-002v2 section 2.6: unique sources
        if ($qr == 0) {
            print "+6$v46:$srcip\t1\n";
            if ($v46 == 6) {
                my($ip6a) = truncate_ip6($srcip, $msgid, $time);
                print "+6a:$ip6a\t1\n" if ($ip6a);
            };
        };
    };
    close $in;
    flush_cachable_output();
    print "+eip-fail:too-many\t$failed_ips\n" if ($failed_ips > $MAX_FAILED_IPS);
    print "# message_to_rssacint.pl " . join(" ", @orig_argv) . "\n";
};

push (@ARGV, "-") if ($#ARGV == -1);
foreach (@ARGV) {
    process_file($_);
};

exit 0;

=head1 SEE ALSO

L<dnsanon(1)>,
L<message_to_rssacint(1)>,
L<rssacint_reduce(1)>,
L<rssacfin_to_rssacyaml(1)>


=head1 AUTHOR and COPYRIGHT

This program was written by John Heidemann.

Copyright (C) 2016 University of Southern California.

This program is distributed under terms of the GNU general
public license, version 2.  See the file COPYING
with the distribution for details.

=cut


