#!/usr/bin/perl -w

#
# rssacint_reduce.pl
#
# Copyright (C) 2016-2021 University of Southern California.
# All rights reserved.                                            
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#                                                                

=head1 NAME

rssacint_reduce.pl - summarize rssacint statistics into summarized rssacint stats

=head1 SYNOPSIS

rssacint_reduce.pl foo.rssacint bar.rssacint >foobar.rssacint

=head1 DESCRIPTION

Summarizes rssacint stats, summing counts and taking minimums and
maximums---everything but computing unique values.

We assume all records with the same key are adjacent.
If input is sorted, this is a full-fledged reducer and will output correct results.
If not, 
we pass through "hard" keys (section 2.6, which can scale to the number of IPs that are seen).
We process "easy" keys (all others).

=head1 OPTIONS

=over

=item B<--count-ips>

Count IPs, (RSSAC002v2 section 2.6). 

=item B<--check-sort>

Verify that input is sorted.  (By default, off.)

=item B<--no-cache-easy>

Disable caching of "easy" keys.  (By default, caching is on.)

=item B<--progress>

Show progress in processing (to stderr).  (By default: processing is quiet.)

=item B<-d>

Enable debugging output.

=item B<-v>

Enable verbose output.

=item B<--help>

Show help.

=item B<--man>

Show full manual.

=back

=head1 FILE FORMAT AND OPERATORS

After the header, each line of rssacint format is of
the format (OPERATOR)(KEY) (VALUE).
The operator is one character, key is any string, and value is usually 
an integer count, but can be other things.

Adjacent rows with the same key and operator are merged in an operator-specific manner.

=head2 OPERATORS

The operators are:

=over

=item B<sum (+)>
add value of adjacent rows with the same key (word count)

=item B<min (&lt;)> and B<max (&gt;)>
take the minimum (or maximum) numeric value of rows with the same key.
The value can be an integer or floating point.

=item B<count unique (!)>
count the number of unique keys and report that as the new value.
(Values that are given are ignored.)

=item B<list with duplication (-)>
Values are integers; we assemble them into a list if possible.
So 1 and 2 and 3 are merged to 1-3;
1,2,3,3 also becomes 1-3;
and 1,2,3,5 become 1-3,5.

=item B<list with no duplication (=)>
Values are integers; we assemble them into a list if possible.
However, duplicate values result in an error.
So 1 and 2 and 3 are merged to 1-3;
and 1,2,3,5 become 1-3,5,
but 1,2,3,3 is an error;

=item B<pass through (:)>
The value is passed through unchanged.
(Used for error propagation.)

=back

These operators are generic (none are specific to RSSAC).

However, with the B<--count-ips> option, input lines of the format +6(KEY1:KEY2)
are converted to !6(KEY1:KEY2) so they can be uniquely counted, then converted
back to +6(KEY1) to implement unique IP counting for RSSAC.

=head2 KEYS

The (KEY)s are have no meaning to this program, and can be anything.
However, we usually use it to process RSSAC002v2 data, and there specific keys
have the following semantics:

=over

=item B<+64:192.0.2.0>

How many queries are sent from a given IPv4 address.
IPv4 addresses must be a consistent format
(typically dotted-quad format with no leading zeros).
(Per-IP data for num-sources-ipv4 from RSSAC002v2 section 2.6.)

=item B<+66:2001:db8::1>

How many queries are sent from a given IPv6 address.
IPv6 addresses must be a consistent format
(typically IPv6 preferred form with zero replacement, form 2 in RFC-1884).
(Per-IP data for num-sources-ipv6 from RSSAC002v2 section 2.6.)

=item B<+6a:2001:db8::>

How many queries are sent from a given IPv6 /64 prefix.
IPv6 prefixes must be a consistent format
(typically IPv6 preferred form with zero replacement for the low 64 bits,
form 2 in RFC-1884).
(Per-IP-prefix data for num-sources-ipv6-aggregate RSSAC002v2 section 2.6.)

=item B<+3t04>, B<+3t14>,  B<+3t06>, B<+3t16>, B<+3u04>, B<+3u14>,  B<+3u06>, B<+3u16>

Now many TCP (t) or UDP (u) 
queries were received (0) or responses were sent (1)
for with IPv4 (4) or IPv6 (6).
(RSSAC002v2 section 2.3.)

=item B<+4t0:100>, B<+4t1:100>, B<+4u0:100>, B<+4u1:100>

How many TCP (t) or UDP (u) 
queries were received (0) or responses were sent (1)
for with IPv4 (4) or IPv6 (6)
of a given size (the number after the :).
We process all sizes, and leave binning to L<rssacfin_to_rssacyaml(1)>.
(RSSAC002v2 section 2.4.)

=item B<+50:2>, B<+51:2>

How many
of each given DNS reply code (where reply code is the value after the colon),
queries were received (0) or responses were sent (1).
(RSSAC002v2 section 2.5.)

=item B<&lt;ts>

The earliest timestamp (in Unix epoch seconds) seen.

=item B<&gt;ts>

The latest timestamp (in Unix epoch seconds) seen.

=item B<-ts>

A list of all seconds (in Unix epoch seconds) that see traffic.

=item B<=rfileseqno:site>

A list of all file sequence numbers (given as integers)
seen for a given site.


=back

=head1 SAMPLE USAGE

=head2 Input

	#fsdb -F t key count
	+64:128.9.168.85	1
	+64:128.9.168.85	1
	+64:128.9.168.85	1
	+3t04	1
	+3t14	1
	+3u04	2
	+3u14	2
	+4t0:32	1
	+4t1:434	1
	+4u0:40	2
	+4u1:525	1
	+4u1:528	1
	+50:0	3
	+51:0	3
	-rt	1451192632,1451192637,1451192652
	<ts	1451192632.254226
	=rfileseqno:-	1
	>te	1451192652.008452
	# message_to_rssacint.pl --file-seqno=1

=head2 Command

    < $< LC_COLLATE=C sort -k 1,1 | ../rssacint_reduce >$@

=head2 Output

	#fsdb -F t key count
	# message_to_rssacint.pl --file-seqno=1
	+64:128.9.168.85	3
	+3t04	1
	+3t14	1
	+3u04	2
	+3u14	2
	+4t0:32	1
	+4t1:434	1
	+4u0:40	2
	+4u1:525	1
	+4u1:528	1
	+50:0	3
	+51:0	3
	-rt	1451192632,1451192637,1451192652
	<ts	1451192632.254226
	=rfileseqno:-	1
	>te	1451192652.008452
	# rssacint_reduce.pl -

=cut

use strict;
use Pod::Usage;
use Getopt::Long;
use Net::IP;

Getopt::Long::Configure ("bundling");
pod2usage(2) if ($#ARGV >= 0 && $ARGV[0] eq '-?');
#my(@orig_argv) = @ARGV;
my($prog) = $0;
my $debug = undef;
my $verbose = undef;
my $cache_easy = 1;
my $check_sort = 0;
my $count_ips = 0;
my $show_progress = 0;
&GetOptions(
	'cache-easy!' => \$cache_easy,
	'check-sort!' => \$check_sort,
	'count-ips!' => \$count_ips,
	'progress!' => \$show_progress,
 	'help|?' => sub { pod2usage(1); },
	'man' => sub { pod2usage(-verbose => 2); },
	'd|debug+' => \$debug,   
        'v|verbose+' => \$verbose) or pod2usage(2);

my($in_schema) = "#fsdb -F t key count";
my($out_schema) = $in_schema;
my %unknown_ops;

# Hard sections are not cachable because they have many possible keys (more than fit in memory).
my($hard_sections) = "6";
my($hard_ops) = "qQ";


binmode STDOUT, ":utf8";
print $out_schema . "\n";


######################################################################
# Rangelist code.
# same code in message_to_rssacinc.pl and rssacint_reduce.pl

#
# Take the string form of a rangelist and break it into start and end arrays.
#
sub decompose_rangelist($) {
    my($rl_str) = $_[0];
    my(@ss, @es);
    foreach (split(/,/, $rl_str)) {
	my($s, $e) = split(/-/, $_);
	die "unparsable range $_\n" if (!defined($s));
	$e //= $s;
	push(@ss, $s);
	push(@es, $e);
    };
    return (\@ss, \@es);
			}

sub min($$) {
    return $_[0] < $_[1] ? $_[0] : $_[1];
}

#
# Take two range lists (format like: 1-2,4,6-7)
# and merge them.
#
sub merge_ranges($$;$) {
    my(@rangelists) = ($_[0], $_[1]);
    my($report_overlap_as_error) = $_[2];

    #
    # fast path
    # optimize appending a simple other on a ranged one
    # merge_ranges("1-2", "3") => "1-3"
    #
    my($one, $other) = (undef, undef);
    if ($rangelists[1] =~ /^\d+$/) {
	($one, $other) = (0, 1);
    } elsif ($rangelists[0] =~ /^\d+$/) {
	($one, $other) = (1, 0);
    };
    if (defined($other) && $rangelists[$one] =~ /\-(\d+)$/) {
	# can try to fastpath
	# one:  1-2   (or more complex)
	# other:    3 (hopefully)
	my($one_e) = $1;
	my($other_s) = $rangelists[$other];
        if ($one_e == $other_s) {
            # no change needed
            if ($report_overlap_as_error) {
		print ":eoverlapping-regions\t$rangelists[$one]+$other_s\n" if ($report_overlap_as_error > 1);
		return $rangelists[$one];
	    } else {
		return $rangelists[$one];
	    };
        } elsif ($one_e + 1 == $other_s) {
            $rangelists[$one] =~ s/(\D?)(\d+)$/$1$other_s/;
            return $rangelists[$one];
    	};
	# fall through for slow path
    };

    #
    # slow path
    #
    # Decompose comma-separated list into array of ranges (start and ends).
    #    
    my(@ss, @es);
    foreach (0..1) {
        ($ss[$_], $es[$_]) = decompose_rangelist($rangelists[$_]);
    };

    #
    # Count how many lists each range occurs in.
    # If there is overlap, make more intermediate ranges.
    #
    # On exit of this loop, we have ONE rangelist in an array, plus counts.
    #
    # (And ick: this code is ALL corner cases.)
    #
    my(@count, @s, @e);
  buildcount:
    while (1) {
	#
	# Check if either side has drained.
	#
	foreach $one (0, 1) {
	    # print "checking $one for emtpy, is $#{$ss[$one]}\n";
	    $other = 1 - $one;
	    if ($#{$ss[$one]} == -1) {
		push(@count, (1) x ($#{$ss[$other]} + 1));
		push(@s, @{$ss[$other]});
		push(@e, @{$es[$other]});
		last buildcount;
	    };
	};
	#
	# assert(have stuff left in both)
	#
	# Make $one be the one the starts first
	# (so we only have a million cases, not four million.)
	#
	my($new_count) = 1;
	if ($ss[0][0] < $ss[1][0]) {
	    ($one, $other) = (0, 1);
	} elsif ($ss[0][0] > $ss[1][0]) {
	    ($one, $other) = (1, 0);
	} else {
	    # both start at same time
	    $new_count = 2;
	    # $one becomes the one that ends first
	    if ($es[0][0] <= $es[1][0]) {
		($one, $other) = (0, 1);
	    } else {
		($one, $other) = (1, 0);
	    };
	};
	#
	# assert($lists[$one] starts first (or at same time))
	#
	my($consume_one) = undef;
	push(@count, $new_count);
	push(@s, $ss[$one][0]);
	if ($ss[$one][0] < $ss[$other][0]) {
	    # one starts first
	    if ($es[$one][0] < $ss[$other][0]) {
		# and ends before other
		# +----+
		#         +----+
		# or abutts other (in which case we will merge later)
		# +----+
		#       +----+
		push(@e, $es[$one][0]);
		$consume_one = 1;
	    } elsif ($es[$one][0] >= $ss[$other][0]) {
		# and overlaps with other
		# +----+
		#      +----+
		# or
		# +----+
		#    +----+
		push(@e, $ss[$other][0]-1);
		$ss[$one][0] = $ss[$other][0];
		$consume_one = 0;
	    } else {
		die "invariant violated: one $one starts first\n";
	    };
	} elsif ($ss[$one][0] == $ss[$other][0]) {
	    # start at same time
	    push(@e, $es[$one][0]);
	    $consume_one = 1;
	    if ($es[$one][0] < $es[$other][0]) {
		# but one ends first
		# +----+
		# +--------+
		$ss[$other][0] = $es[$one][0]+1;
	    } elsif ($es[$one][0] == $es[$other][0]) {
		# complete overlap
		# +----+
		# +----+
		#
		# so also consume other here:
		shift @{$ss[$other]};
		shift @{$es[$other]};
	    } else {
		die "invariant violated: one $one and other $other start at same time and other ends first\n";
	    };
	} else {
	    die "invariant violated: one $one starts after other $other\n";
	};
	if ($consume_one) {
	    shift @{$ss[$one]};
	    shift @{$es[$one]};
	};
    };

    #
    # We now have a clean, single rangelist in an array, with counts.
    #
    # Now concatinate adjacent ranges and report overlap.
    #
    my($out, $error_out) = ("", "");
    while ($#s != -1) {
	if ($count[0] == 2) {
	    if ($report_overlap_as_error) {
		print ":eoverlapping-regions\t$s[0]-$e[0]\n" if ($report_overlap_as_error > 1);
		$error_out = "/e";
	    };
	};
	# merge?
	if ($#s >= 1) {
	    if ($e[0]+1 >= $s[1]) {
		$s[1] = $s[0];
		shift @count;
		shift @s;
		shift @e;
		# no output
		next;
	    };
	};
	$out .= "," if ($out ne "");
	$out .= ($s[0] == $e[0] ? $s[0] : $s[0] . "-" . $e[0]);
	shift @count;
	shift @s;
	shift @e;
    };
    return $out . $error_out;
}

######################################################################

my($current_q_matcher) = undef;
my($current_q_query_retx) = undef;
my($current_q_response_retx) = undef;
my($current_q_rtt) = undef;
my($current_q_final_rtt) = undef;
my($current_q_qr) = undef;
my($current_q_time) = undef;
my($current_q_last_qr) = undef;
my($current_q_last_time) = undef;

sub reset_current_q($) {
    $current_q_matcher = $_[0];
    $current_q_query_retx = undef;
    $current_q_response_retx = undef;
    $current_q_rtt = undef;
    $current_q_final_rtt = undef;
    $current_q_qr = undef;
    $current_q_time = undef;
    $current_q_last_qr = undef;
    $current_q_last_time = undef;
}

# same code in message_to_rssacinc.pl and rssacint_reduce.pl
sub reduce_pair($$$$;$$) {
    my($op, $matcher, $last_value, $value, $last_unique, $unique) = @_;
    # reduce!
    # return undef if should not be reduced
    if ($op eq '+') {
        $value += $last_value;             
    } elsif ($op eq '!') {
	die "internal error: undef unique\n" if (!defined($unique));
	if (!defined($last_unique)) {
	    $value = 1;
	} else {
	    if ($unique ne $last_unique) {
		$value = $last_value + 1;
	    } else {
		$value = $last_value;
	    };
	};
    } elsif ($op eq '-') {
	# range
	$value = merge_ranges($last_value, $value);
    } elsif ($op eq '=') {
	# ranges with overlap reporting
	$value = merge_ranges($last_value, $value, 2);
    } elsif ($op eq '<') {
        # lexical comparision! (not numeric)
        $value = $last_value if ($last_value lt $value);
    } elsif ($op eq '>') {
        $value = $last_value if ($last_value gt $value);
    } elsif ($op eq ':') {
	#        print "$_\n";  # pass it anyway
        # pass it through
	#return undef;
    } elsif ($op eq 'q') {
	if (!defined($current_q_matcher) || $current_q_matcher ne $matcher) {
	    # new query, reset
	    reset_current_q($matcher);
	    ($current_q_last_qr, $current_q_last_time) = split(/,/, $last_value);
	};
	($current_q_qr, $current_q_time) = split(/,/, $value);
	if ($current_q_last_qr eq '0' && $current_q_qr eq '1') {
	    # query -> response => happiness
	    $current_q_rtt = sprintf("%.6f", $current_q_time - $current_q_last_time);
	} elsif ($current_q_last_qr eq '0' && $current_q_qr eq '0') {
	    # query query
	    $current_q_query_retx++;
	    $current_q_last_time = $current_q_time;
	} elsif ($current_q_last_qr eq '1' && $current_q_qr eq '1') {
	    # response response (probably AXFR)
	    $current_q_response_retx++;
	    $current_q_final_rtt = $current_q_time - $current_q_last_time;
	} elsif ($current_q_last_qr eq '1' && $current_q_qr eq '0') {
	    die "query reuse\n";
	} else {
	    die "rssacint_reduce: unknown combation of current and old queries\n";
	};
    } else {
	# pass other operators through unchanged
	$unknown_ops{$op} //= 0;
	$unknown_ops{$op}++;
        # print "+eunknown-op:$op\t1\n";
	return undef;
    };
    return $value;
}

######################################################################

sub output_last($$) {
    my($key, $value) = @_;
    return if (!defined($key));   # no real value there
    if ($count_ips) {
	# hack: go back to summing
	$key =~ s/^\!6([^:]*)$/\+6$1/;
    };
    if ($key =~ /^q/) {
	# output queued RTT information
	substr($key, 0, 1) = "Q";
	$value = $current_q_rtt // "-";
    };
    print "$key\t$value\n";
}

sub process_file($) {
    my($file) = @_;
    my($open_mode, $open_place) = ("<:utf8", $file);
    if ($file eq '-') {
	($open_mode, $open_place) = ("<&=", 0);
    };
    my $in;
    if (!open($in, $open_mode, $open_place)) {
	print ":eopen-fail\t$file\n";
	return;
    };
    my($last_matcher, $last_unique, $last_value) = (undef, undef, undef);
    my($last_sorting_key);
    my(%easy) = ();
    my $record_count = 0;
    my $bytes_count = 0;
    while (<$in>) {
	chomp;
        if (/^#/) {
            if (/^#fsdb/) {
                die "unexpected schema: $_\n"
                    if ($_ !~ /^$in_schema/);
		next;
            };
	    print "$_\n";;
            next;
        };
	if ($show_progress) {
	    $bytes_count += length($_) + 1;
	    if (((++$record_count) & 0x7ffff) == 0) {
		my($bytes_count_str) = $bytes_count;
		$bytes_count_str =~ s/(^[-+]?\d+?(?=(?>(?:\d{3})+)(?!\d))|\G\d{3}(?=\d))/${1}_/g;
		print STDERR "progress: $record_count records ($bytes_count_str bytes)\n";
	    };
	};
	
        my($key, $value) = split(/\t/);

	if ($check_sort) {
	    if (defined($last_sorting_key)) {
		die "input is not sorted, compare:\n\t$last_sorting_key\n\t$key\n"
		    if ($last_sorting_key gt $key);
	    };
	    # keep our own copy so as not get confused by the cache
	    $last_sorting_key = $key;
	};

	if ($count_ips && $key =~ /^.6/) {
	    #
	    # Initially, section 2.6 things are of the form:
	    #    +64:1.2.3.4 10
	    #    +64:1.2.3.4 5
	    #    +64:1.2.3.5 11
	    #	 +66:1:2:3:4::8 12
	    #	 +66:a:b:c:d::f 13
	    #	 +6a:1:2:3:4:: 14
	    #	 +6a:a:b:c:d:: 15
	    # to let us count how many times each contacts us.
	    #
	    # With --count-ips we collapse that into:
	    #    !64:1.2.3.4 1
	    #    !64:1.2.3.4 1
	    #    !64:1.2.3.5 1
	    #	 !66:1:2:3:4::8 1
	    #	 !66:a:b:c:d::f 1
	    #	 !6a:1:2:3:4:: 1
	    #	 !6a:a:b:c:d:: 1
	    #
	    # so that our ! code can then count them.
	    #
	    $key =~ s/^\+6/\!6/;
	    $value = 1;
	};

	my($op, $section, $rest) = ($key =~ /^(.)(.)(.*)$/);
	if (!defined($op)) {
	    print ":ecannot-parse:$key\t1\n";
	    next;
	};
        if ($op eq ':') {
            # just propagate :
            print "$_\n";
            next;
        };
	my($matcher, $subkey, $unique);
	if ($op eq '!') {
	    ($subkey, $unique) = ($rest =~ /^([^:]+):(.*)$/);
	    if (!defined($unique)) {
		print ":ecannot-rest:$rest\t1\n";
		next;
	    };
	    $matcher = "$op$section$subkey";
	} else {
	    $matcher = $key;
	};	    

	#
	# if it's easy, keep it in the cache
	#
	if ($cache_easy && index($hard_ops, $op) == -1 && index($hard_sections, $section) == -1) {
	    # assert($matcher eq $key);
	    # assert($op ne '!')
	    if (defined($easy{$matcher})) {
		$easy{$matcher} = reduce_pair($op, $matcher, $easy{$matcher}, $value, undef, undef);
		die ("rssacint_reduce: internal error: reduce_pair on easy key $matcher returned undef.\n")
		    if (!defined($easy{$matcher}));
	    } else {
		$easy{$matcher} = $value;
	    };
	    next;
	};
	#
	# can we do it?
	#
	if ($last_matcher) {
            if ($matcher eq $last_matcher) {
		my $new_value = reduce_pair($op, $matcher, $last_value, $value, $last_unique, $unique);
		if (defined($new_value)) {
		    # success
		    $value = $new_value;
		} else {
		    # reduce fail, so output both
		    output_last($last_matcher, $last_value);
		};
            } else {
                # moved on¸ so dump saved value
		output_last($last_matcher, $last_value);
            };
	};
        $last_matcher = $matcher;
	$last_unique = $unique;
        $last_value = $value;
    };
    output_last($last_matcher, $last_value);
    close $in;
    #
    # dump the cache
    #
    foreach (sort keys %easy) {
	print "$_\t$easy{$_}\n";
    };
    if (%unknown_ops) {
	print "# warning: found the following unkown ops: " . join('', keys %unknown_ops) . "\n";
    };
    print "# rssacint_reduce.pl $file\n";
}

push (@ARGV, "-") if ($#ARGV == -1);
foreach (@ARGV) {
    process_file($_);
};

exit 0;

=head1 SEE ALSO

L<dnsanon(1)>,
L<message_to_rssacint(1)>,
L<rssacint_reduce(1)>,
L<rssacfin_to_rssacyaml(1)>


=head1 AUTHOR and COPYRIGHT

This program was written by John Heidemann.

Copyright (C) 2016-2020 University of Southern California.

This program is distributed under terms of the GNU general
public license, version 2.  See the file COPYING
with the distribution for details.

=cut


