Saturday 4 February 2012

MRTG Log Aggregator

Occasionally, I have needed to provide percentiles on a combined set of interfaces.
This requires a way of adding together samples from a number of log files, even though the sample timestamps might differ from file to file by a few minutes.

Here then is my current hack for doing this. The merged data set is implemented here as a doubly-linked list using nested hashes, not because I make use of these here, but because I lifted it from one of my other log manipulation tools. I will probably return to clean it up as time goes on.


#!/usr/bin/env perl
#
# NAME:         aggregate.pl
#
# AUTHOR:       Philip Damian-Grint
#
# DESCRIPTION:  Synthesize a new MRTG log file from 2 or more other log files.
#
#               This utility expects and generates version 2 MRTG log files,
#               (See http://oss.oetiker.ch/mrtg/doc/mrtg-logfile.en.html), based on a 
#
#               default sampling time of 5 minutes
#               In general there are 600 samples each of 5mins, 30mins, 120mins 
#               and 86400mins. Each dataset is a quintuple:
#               {epoch, in_average, out_average, in_maximum, out_maximum}
#
#               The file with the newest timestamp is used as a template for generating
#               the output file, processed backwards in time.
#
#               Samples from the second and further logfiles are combined with the template
#               according to the following rules:
#
#               1.  Samples from the input logfile which fall between two samples in the
#                   template, are combined into the sample with the higher timestamp
#
#               2.  Samples are combined using basic addition only
#
#               Each of the input files are checked for time synchronisation. If the
#               starting times of any of the second and subsequent input files are more 
#               than 5 minutes adrift from the first input file, the utility aborts.
#
# INPUTS:       Options, Logfile1, Logfile2, ...
#               aggregate.pl [--verbose] Logfile1 [, Logfile2, ...]
#
# OUTPUTS:      Logfile in MRTG format version 2
#               This is written to STDOUT
#
# NOTES:        1.   It should go without saying that running this against live log files while
#                    MRTG is running will have unpredictable results - copy the logfiles to
#                    a location where they will not be disturbed while being processed.
#
#               2.  It is possible that due to occasional variations at sample period
#                   boundaries (e.g. 5mins / 30 mins) and between files, some "samples" in the
#                   merged file might combine one or two samples more than expected.
#                   It would be possible to avoid this by say, adding a further field to each hash
#                   record to count and possibly restrict the samples combined from subsequent files.
#
# HISTORY:      3/2/2012: v1.0 created
#               8/2/2012: v1.1 header detection corrected
#

# PRAGMAS
use strict;

# GLOBALS
local $| = 1;                               # Autoflush STDOUT

# MODULES
use Getopt::Long;

# VARIABLES

# Parameters
my $verbose;

# Working Storage
my @fields;                                 # Holds fields from last record read
my $file_no;                                # Tracks current file being processed
my $inbytes_master;                         # Inbytes counter from the first file
my @keys;                                   # Holds sorted keys for merged dataset
my $outbytes_master;                        # Outbytes counter from the first file
my $prev_time;                              # Remember our previous timestamp
my $record_no;                              # Tracks last record read from current file
my $time_master;                            # First timestamp from first file
my $run_state;                              # Tracks processing phase (first file, subsequent file...)
my %samples;                                # Doubly-linked list representing merged file

# Subroutines
sub record_count {
    print "\r".++$record_no." of ".$file_no;
}

# INITIALISATION

GetOptions ("verbose" => \$verbose );       # Check for verbosity
$prev_time = 0;                             # Reset previous timestamp copy
$run_state = 'INIT';                        # Reset state
$time_master = 0;                           # Reset starting epoch

# MAIN BODY

# Process All Logfiles
while (<>) {
    chomp();                                # Remove carriage return etc
    @fields = ();                           # Clear our temporary holding area
    @fields = (split);                      # Split up our tuple

    # Start of File Processing    
    if (scalar(@fields) == 3) {             # Check for start of file
        print "\nStart of input file, datestamp: ".(scalar localtime(@fields[0]))."\n" if ($verbose);
        $record_no = 0;                     # Reset record counter

        # First file
        if ($run_state eq 'INIT') {         # If this is our first file
            $time_master = @fields[0];      # Capture the header timestamp
            $inbytes_master = @fields[1];   # Capture the header inbytes
            $outbytes_master = @fields[2];  # Capture the header outbytes
            $run_state = 'FIRST';           # And update our state
            $file_no = 1;                   # Start counting input files

        # Subsequent files
        } else {
            # At the end of the first file (only)
            if ($run_state eq 'FIRST') {
                @keys = reverse sort { $a <=> $b } (keys %samples); # Sort our keys
                $run_state = 'SUBSQ';                               # Note that first file has ended
            }
            # And in all cases
            $file_no++;                     # Count input files
            $inbytes_master += @fields[1];  # Add header inbytes to master
            $outbytes_master += @fields[2]; # Add header outbytes to master
            
            # Other files must be within 5 minutes of the first
            die("Header timestamp difference > 5 minutes found in file ".$file_no."\n") if (abs($time_master - @fields[0]) > 300);
        }
        &record_count if ($verbose);        # Update our on-screen counter
        $prev_time = @fields[0];            # Take a copy of this timestamp
        next;                               # Now start processing non-header records
    }

    # Check for "all-files" data mangling
    die("\nIncreasing timestamp found in record ".$record_no." of file ".$file_no."\n") if (@fields[0] > $prev_time);
        
    # First file just populates our template
    if ($run_state eq 'FIRST') {

        # Check for "first-file" data mangling
        die("\nDuplicate timestamp found in record ".$record_no." of file ".$file_no."\n") if (exists ($samples{@fields[0]}));

        # Create a hash entry indexed by datestamp
        $samples{@fields[0]}= {PREV => ($prev_time == @fields[0]) ? undef : $prev_time, NEXT => undef, TUPLE => [@fields[1], @fields[2], @fields[3], @fields[4]]};

        # If not the first item in the list, update the last item's NEXT pointer
        $samples{$prev_time}{NEXT} = @fields[0] if ($record_no > 1);

    # Subsequent files must be merged
    } else {
        foreach (@keys) {
            if ($_ <= @fields[0]) {
                $samples{$_}{TUPLE}[0] += @fields[1];
                $samples{$_}{TUPLE}[1] += @fields[2];
                $samples{$_}{TUPLE}[2] += @fields[3];
                $samples{$_}{TUPLE}[3] += @fields[4];
                last;
            } 
        }
    }
    $prev_time = @fields[0];                # Take a copy of this timestamp
    &record_count if ($verbose);
}

# Were we only given one file? @keys only populated on detection of a second file
die("\nError - only one input file supplied\n") unless (@keys);

# Output Merged File

# First our updated header record
print "$time_master $inbytes_master $outbytes_master\n";

# And then our records in reverse order
foreach (@keys) {
    print "$_ $samples{$_}{TUPLE}[0] $samples{$_}{TUPLE}[1] $samples{$_}{TUPLE}[2] $samples{$_}{TUPLE}[3]\n";
}