#!/usr/bin/perl

=pod

=head1 NAME

ng-mysqld-log-filter

=head1 OPTIONS

This program does not take any arguments

=head1 DESCRIPTION

MySQL 5.6 Log Filter to compress/suppress "Unsafe query" log entries
when running in statement mode, log entries are read from stdin, processed
and (possibly) written to standard out.

This filter will compress unsafe query entries into 'signature' lines greatly
reducing the amount of log volume, it can also be configured to completely 
ignore specific query signatures.

Finally the filter can track, and dump query frequency information regarding the
most frequently encountered unsafe query signatures (including both their standard
and generalised forms)

=head1 FILES

=over 4

=item B</etc/mysql/logfilter.conf>

At startup configuration options are read from this file, which is in case-insensitive
extended ini-format, see CONFIGURATION FORMAT for details of the format and CONFIGURATION
OPTIONS for the supported options.

=item B</etc/mysql/logfilter.d/*.conf>

Additional configuration files may optionally be added to the logfilter.d directory and
will be read, in glob order after the main configuration file.  Configuration options
are processed in the order they are encountered and will override any earlier instance
of the same configuration option

=back

=head1 CONFIGURATION FORMAT

=over 4

=item B<Comments>

Comments are introduced by either the '#' or ';' character and may occur either as standalone
lines or at the end of a line (following a section or parameter)

=item B<Sections>

Sections are indicated by a line of the form [<section-name>] as in traditional ini files, but
it is also possible to specify a default value for a section using the form [<section-name>=<value>].
This value will be used for any parameters in the section which omit a value

=item B<Parameters>

Parameters are of the form "<name>=<value>", however the "=<value>" portion may be omitted if the
parameter occurs in a section which specifies a default value (see Sections above) in which case the
default value for the section will be used.

=back

=head1 CONFIGURATION OPTIONS

=over 4

=item B<[Main] Section>

=over 4

=item I<IOTimeout> (fractional seconds) [1.0]

Log entries can be multiple lines long and can only be definitively terminated by the beginning of a new
log entry, IOTimeout specifies how long to wait for additional input before assuming that a log entry is
complete.

=item I<TrackTopN> (queries) [10]

The filter will track and maintain detailed statitics on the most common query signatures that are encountered,
TrackTopN specifies how many queries details statistics should be maintained for.  These statistics will be
dumped to standard out, and, if defined I<StatsFile> when the filter shuts down.  They can additionally be dumped
to I<StatsFile> at any time by sending a SIGUSR1 to the process 

=item I<RelogInternal> (seconds) [300]

By default each query signature is logged to standard out in detail with the full and normalized query the first
time it is encountered, each instance of a query with a matching signature is, by default, logged by signature only
from that point forward, unless more than I<RelogInternal> seconds have passed since the query was last logged in
full, in which case a full entry is logged once, then only signatures are logged again until the next
I<RelogInterval> has passed and so on.

=item I<StatsFile> (path) [/var/log/mysql/mysqld-log-filter.stats]

Statistics regarding the filtered queries, including detailed information about the I<TrackTopN> most common queries
is dumped to standard output and I<StatsFile> when the filter shuts down.  Additionally sending a SIGUSR1 to the filter
at any time will cause it to dump updated statistics to the I<StatsFile>.  The I<StatsFile> can be supressed by 
configuring it to point to /dev/null

=back

=item B<[Query]> section

The [query] section of the configuration allows for actions to be configured for specific query signatures, the 
supported actions are:

=over 4

=item I<E<lt>query-signatureE<gt>=ignore>

Query signatures mapped to the ignore action will be silently discarded without processing or further output.
These queries will be included in the "Entries Processed" and "Unsafe queries ignored" statistics, but will
otherwise be completely ignored.

=item I<E<lt>query-signatureE<gt>>

Query signatures are a base32 encoded sha1 hash of the normalized query and are case insensitive

=back

=back

=head1 B<SIGNALS>

=over 4

=item I<SIGUSR1>

When a SIGUSR1 is received the filter process will write out current statistics to the file defined by the I<StatsFile>
configuration options

=back

=cut

use strict;

use Time::HiRes qw(time);
use Config::IniFiles;
use Tie::ToObject;
use Digest::SHA qw(sha1_hex);
use POSIX qw(mktime);
use IO::Handle;
use IO::Select;
use List::Util qw(max);
use List::MoreUtils qw(natatime);
use Math::GMP;

my $CFG_ROOT = "/etc/mysql/";

#####################################################################################################
# <evil>
#
# Monkey patch non-parameter support into Config::IniFiles so we can do
#
#   [<section>=<value>]
#   key1
#   key2
#
# As an equivalent to
#
#   [<section>]
#   key1=<value>
#   key2=<value>
#
my $_orig_ReadConfig_handle_line = \&Config::IniFiles::_ReadConfig_handle_line;
local *Config::IniFiles::_ReadConfig_handle_line = sub {
	my ($self, $fh, $line) = @_;

	my $sect = $self->_curr_sect;
	if ($sect =~ m/\A([^=]+)=([^=]+)$/) {
		my ($list, $val) = ($1, $2);
		# Translate <param>[comment] => <param>=$section[comment]
		my $allCmt = $self->{allowed_comment_char};
		my $sect   = $self->_curr_sect;
		if ($line =~ s/\A(\s*([^[=\s][^=\s]*)\s*)([$allCmt].*)?\z/\1 = ${val}\3/) {
			# If we translated then copy into the <list> section so that
			# overrides will stack properly
			my ($key, $comment) = ($1, $3);
			$self->AddSection($list);
			if (!$self->exists($list, $key)) {
				$self->newval($list, $key, $val);
			} else {
				$self->setval($list, $key, $val);
			}
			# We could optionally let this fall through to preserve the original
			# entry too if we wanted to
			$line = '';
		}

	}

	# Now let Config::IniFiles do it's thing
	return $_orig_ReadConfig_handle_line->($self, $fh, $line);
};

# Convenience wrapper for -importing config files
# on top of each other.....
local *Config::IniFiles::stack = sub {
	my ($self, $path) = @_;
	my $parms = $self->{startup_settings};
	$parms->{-import} = $self;
	$parms->{-file} = $path;
	return Config::IniFiles->new(%$parms);
};


# </evil>
########################################################################################################


#
# Use /dev/null as a substitute config file
# if the normal one isn't present
#
my $cfg_file = '/dev/null';
if ( -e "${CFG_ROOT}/logfilter.conf" ) {
	$cfg_file = "${CFG_ROOT}/logfilter.conf";
}

#
# Parse our main config file, this one is mandatory
# but it can be empty and we use relaxed defaults
#
my $cfg = Config::IniFiles->new(
  -file                    => $cfg_file,
  -fallback                => 'main',
  -handle_trailing_comment => 1,
  -nocase                  => 1,
  -allowempty              => 1
);

#
# Stack any optional configs on top
#
for my $f (glob("${CFG_ROOT}/logfilter.d/*.conf")) {    
       $cfg = $cfg->stack($f);
}


############################################################################
# Global config variables (pre-extracted for performance)
############################################################################
# Extract a flat hash of query actions for performance
my %queries;
{
	my %config;
	tie %config, 'Tie::ToObject', $cfg;
	if (exists $config{'query'}) {
		%queries = %{$config{'query'}};
	}
}
# How long to wait before assuming an entry is complete
my $io_timeout = $cfg->val('main', 'IOTimeout', 1.0);
# How many queries to track in detail
my $top_n_count = $cfg->val('main', 'TrackTopN', 10);
# How often to re-log entries in detail (seconds)
my $relog_interval = $cfg->val('main', 'RelogInterval', 300);
# Where to dump stats (other than to the log file)
# Use /dev/null to supress or /dev/stderr to dump to the syslog
my $stats_file = $cfg->val('main', 'StatsFile', '/var/log/mysql/mysqld-log-filter.stats');



############################################################################
# Global state variables
############################################################################
# All unsafe query signature stats
my $STATS = {};
# Tracking the top N most common unsafe queries
my $TOP_N = {
	'min'  => 0,
	'entries' => {}
};
my $ignored       = 0;           # Number of unsafe queries ignored (by config
my $start_time    = time();      # When we started
my $total_entries = 0;           # How many log entries we've seen
my $processed     = 0;           # How many unsafe query entries we've processed
my $lines         = 0;           # How many lines of log we've processed
my $bytes         = 0;           # How many bytes of log we've processed
my $bytes_out     = 0;           # How many bytes of log we've written out



############################################################################
# Utility functions
############################################################################
sub cputime {
	#.....
	# 	(14) utime  %lu
	# 			Amount of time that this process has been scheduled
	# 			in user mode, measured in clock ticks (divide by
	# 			sysconf(_SC_CLK_TCK)).  This includes guest time,
	# 			guest_time (time spent running a virtual CPU, see
	# 			below), so that applications that are not aware of
	# 			the guest time field do not lose that time from
	# 			their calculations.
	# 
	# 	(15) stime  %lu
	# 			Amount of time that this process has been scheduled
	# 			in kernel mode, measured in clock ticks (divide by
	# 			sysconf(_SC_CLK_TCK)).
	my $tick = POSIX::sysconf( &POSIX::_SC_CLK_TCK );
	open(FH, '</proc/self/stat');
	my @stat = split(/\s+/,  <FH>);
	close(FH);
	return (($stat[13]+$stat[14])/$tick);
}

#
# Translate an SQL statement in a generic form for comparison
# simillar to mysqldumpslow or pt-fingerprint
#
sub generalise_sql {
	$_ = shift;

	s/\b\d+\b/N/g;
	s/\b0x[0-9A-Fa-f]+\b/N/g;
	# Collapse empty string literals
	s/''/'S'/g;
	s/""/"S"/g;
	# Strip escaped quotes
	s/(\\')//g;
	s/(\\")//g;
	# Collapse date and datetime literals
	s/'N-N-N'/<DATE>/g;
	s/'N-N-N N:N:N'/<DATETIME>/g;
	# Collapse string literals
	s/'[^']+'/'S'/g;
	s/"[^"]+"/"S"/g;
	# Collapse IN (...) and VALUES (...) lists
	s/\b(IN|VALUES)\s*\(\s*((<DATE(TIME)?>|NULL|N|'[NS]'),\s*)+(<DATE(TIME)?>|NULL|N|'[NS]')\s*\)/\1 (...)/sg;
	# Collapse _tmp_xxx<N> table references
	s/\b(_tmp_\w+)\d/\1<N>/g;
	# Collapse absolute database name references
	s/\b`[^`]{1,64}`\./`<DB>`./g;
	
	return $_;
}

#
# Encode a binary string using base32
#
sub base32 {
	my $in = shift;
	return Math::GMP->new(unpack('H*', $in), 16)->get_str_gmp(32);
}

#
# Convert a formatted string to epoch
#
sub parse_timestamp {
	my $ts = shift;
	if ($ts =~ m/\A(\d{4})-(\d{2})-(\d{2})[ T](\d{2}):(\d{2}):(\d{2})\b/) {
		return mktime($6, $5, $4, $3, $2-1, $1-1900);
	} else {
		return -1;
	}
}

#
# Convert epoch to a formatted string
#
sub format_timestamp {
	my $ts = shift;
	return POSIX::strftime('%Y-%m-%dT%H:%M:%S', localtime($ts));
}


############################################################################
# Log entry handling functions
############################################################################

#
# Format a detailed version of an unsafe query log entry (output)
#
sub format_unsafe_detail {
	my ($entry, $preamble, $sig, $reason, $SQL, $gSQL) = @_;
	my $str = '';
	$SQL =~ s/\n/\n\t\t\t            /g;
	$gSQL =~ s/\n/\n\t\t\t            /g;
	$str .= "${preamble} Unsafe statement. signature ${sig}\n";
	$str .= "\t\t\t Problem  : ${reason}\n";
	$str .= "\t\t\t Statement: ${SQL}\n";
	$str .= "\t\t\t Abstract : ${gSQL}\n"; 
	$str .=  sprintf(
			"\t\t\t Stats    : Seen %d times since %s totalling %d bytes", 
			$STATS->{$sig}->{count},
			format_timestamp($STATS->{$sig}->{firstseen}),
			$STATS->{$sig}->{bytes},
	);
	if ($STATS->{$sig}->{lastlogged} > 0) {
		$str .=  sprintf(
			", %d times since last logged at %s",
			$STATS->{$sig}->{sincelogged},
			format_timestamp($STATS->{$sig}->{lastlogged})
		);
	}
	$str .=  "\n";
	return $str;
}

#
# A sub-handler that is called for "Unsafe for binlog" type queries only
#
sub process_unsafe_for_binlog_entry {
	my ($entry, $preamble, $reason, $SQL) = @_;

	$processed++;

	# Fingerprint the problem / query to see if it's a dupe
	my $gSQL = generalise_sql($SQL);
	my $sig = Math::GMP->new(sha1_hex($reason . '|' . $gSQL), 16)->get_str_gmp(32);

	if ($queries{$sig} eq 'ignore') {
		$ignored++;
		return;
	}

	# Generate a stats entry for this hash if it doesn't exist yet
	if (!exists $STATS->{$sig}) {
		$STATS->{$sig} = {
			bytes       => 0, # Total bytes seen for this query sig
			count       => 0, # Total times this query sig has been seen
			lastlogged  => 0, # When we last logged this query sig *IN* full form
			firstseen   => parse_timestamp($preamble), # The timestamp of the first logentry we saw for this query sig
			lastseen    => 0, # The most timestamp of the logentry we most recenty saw for this query sig
			sincelogged => 0  # The number of times we've seen this query sig since we last logged it in full form
		};
	}

	$STATS->{$sig}->{lastseen} = parse_timestamp($preamble);
	$STATS->{$sig}->{count}++;
	$STATS->{$sig}->{bytes} += length($entry);

	# Maintain full queries for the TOP N variants
	if (
		keys %{$TOP_N->{entries}} < $top_n_count
		|| $STATS->{$sig}->{count} > $TOP_N->{min}
	) {
		if (!exists $TOP_N->{entries}->{$sig}) {
			if (keys %{$TOP_N->{entries}} >= $top_n_count) {

				# Need to evict the smallest entry
				# But only if it's actually smaller than the current entry
				my @sorted = sort { $STATS->{$a}->{count} <=> $STATS->{$b}->{count} } (keys %{$TOP_N->{entries}});
				if ($STATS->{$sorted[0]}->{count} < $STATS->{$sig}->{count}) {
					delete $TOP_N->{entries}->{$sorted[0]};
				} else {
					$TOP_N->{min} = $STATS->{$sorted[0]}->{count}
				}
			}
			# Insert the current entry (only if we were able to make space)
			if (keys %{$TOP_N->{entries}} < $top_n_count) {
				$TOP_N->{min} = $STATS->{$sig}->{count};
				$TOP_N->{entries}->{$sig} = $entry;
			}
		} else {
			$TOP_N->{min} = $STATS->{$sig}->{count};
		}
	}

	my $str = '';

	if ($STATS->{$sig}->{lastlogged} < $STATS->{$sig}->{lastseen} - $relog_interval) {
		$str .= format_unsafe_detail($entry, $preamble, $sig, $reason, $SQL, $gSQL);
		$STATS->{$sig}->{lastlogged} = $STATS->{$sig}->{lastseen};  
		$STATS->{$sig}->{sincelogged} = 0;
	} else {
		$str .= "${preamble} Unsafe statement. signature ${sig}\n";
		$STATS->{$sig}->{sincelogged}++;
	}
	return $str;
}

#
# The main handler that is called for every log entry we encounter in the input stream
#
sub process_log_entry {
	my ($entry) = shift;
	chomp($entry);
	$total_entries++;
	my $output = '';
	if ($entry =~ m/^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \d+) \[Warning\] Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT\. (.*?)\. Statement: \s*((?:.|[\r\n])*?)\s*$/i) {
		$output = process_unsafe_for_binlog_entry($entry, $1, $2, $3);
	} else {
		# Not an "unsafe for binlog warning", just print it out
		$output = $entry . "\n";
	}
	$bytes_out += length($output);
	return $output;
}

#
# Generate summary stats to dump to the log or the stats file
#
sub generate_stats {
	my $elapsed = (time() - $start_time);
	my $cpu = cputime();
	
	my $str = sprintf(
		"********************************************************\n"
		. "        Filter started:\t%s\n"
		. "     Statistics dumped:\t%s\n"
		. "          Elapsed time:\t%0.2f seconds\n"
		. "           Elapsed CPU:\t%0.2f seconds\n"
		. "    Estimated Capacity:\t%d%% (of %d bytes/s, %d lines/sec, %d entries/sec)\n"
		. "  Bytes processed (in):\t%d (%d bytes/s)\n"
		. "   Bytes written (out):\t%d (%d bytes/s)\n"
		. "       Lines processed:\t%d (%d lines/s)\n"
		. "     Entries processed:\t%d (%d entries/s)\n"
		. "        Unsafe queries:\t%d\n"
		. " Unique unsafe queries:\t%d\n"
		. "Ignored unsafe queries:\t%d\n"
		. " Total log size(bytes):\t%d%%\n"
		. "********************************************************\n",
		format_timestamp($start_time),
		format_timestamp(time()),
		$elapsed,
		$cpu,
		$cpu/$elapsed*100, $bytes/$cpu, $lines/$cpu, $total_entries/$cpu,
		$bytes, $bytes/$elapsed,
		$bytes_out, $bytes_out/$elapsed,
		$lines, $lines/$elapsed,
		$total_entries, $total_entries/$elapsed,
		$processed,
		scalar (keys %{$STATS}),
		$ignored,
		-100*(1.0 - ($bytes_out/$bytes))
	);
		
	my $i=0;
	foreach (sort { $STATS->{$b}->{count} <=> $STATS->{$a}->{count} } (keys %{$TOP_N->{entries}})) {		
		$i++;
		my $entry = $TOP_N->{entries}->{$_};
		if ($entry =~ m/^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \d+) \[Warning\] Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT\. (.*?)\. Statement: \s*((?:.|[\r\n])*?)\s*$/i) {
			$str .= "\n";
			my ($preamble, $reason, $SQL) = ($1, $2, $3);
			$preamble = sprintf("Top Unsafe Query: #%02d   ", $i);
			my $gSQL = generalise_sql($SQL);
			my $sig = Math::GMP->new(sha1_hex($reason . '|' . $gSQL), 16)->get_str_gmp(32);
			$str .= format_unsafe_detail($entry, $preamble, $sig, $reason, $SQL, $gSQL);
		}
	}

	return $str;

}

#
# Dump current stats to the configured file
#
sub dump_stats {
	my $stats = generate_stats();
	open(FH, ">$stats_file");
	print FH $stats;
	close(FH);
	return $stats;
}

#
# Main loop, read from STDIN until it runs out
#
sub main_loop {
	my $IO = IO::Select->new();
	my $STDIN = IO::Handle->new_from_fd(fileno(STDIN), 'r');
	$IO->add($STDIN);
	STDOUT->autoflush(1);

	my $pending = '';
	my $buf = '';
	my $tmout = undef;

	# Because we don't want to buffer entry X until entry X+1 arrives
	# we'll use select, and if we have a partial entry in the buffer
	# we'll set a timeout, efftively any entry can be terminated by
	# either of
	# (a) The start (timestamp) of a new entry at the beginning of a line
	# (b) The timeout of a read at the beginning of a line (ie just after a \n)
	while (1) {
		my @ready = $IO->can_read($tmout);
		if ($#ready == -1) {
			# Timed out, assume that $pending is a complete entry
			# if it isn't the remaining part of it will be passed
			# though unprocessed on the read next
			print process_log_entry($pending);
			$pending = '';
			$tmout = undef;
		} else {
			my $len = $ready[0]->sysread($buf, 256*1024);
			# Empty read indicates EOF, we're done
			if ($len == 0) {
				last;
			}

			# Count the lines (just for statistics)
			$lines += $buf =~ tr/\n//;
			# Count the bytes, again just for stats
			$bytes += $len;

			# Split the buffer up into entries, noting that the first and last entries
			# could both be incomplete
			my @chunks = split(/^((?:\d{4}-\d{2}-\d{2}|\d{6}) \d{2}:\d{2}:\d{2})/m, $buf);

			# If we have something in the pending buffer AND we have an entry
			# following it, it's now complete, so process it.
			$pending .= shift @chunks;
			if (length($pending) > 0 && scalar @chunks) {
				print process_log_entry($pending);
				$pending = '';
			}

			# We want to process all but the *last* entry
			# since we don't yet know that the last entry is complete
			my $it = natatime(2,@chunks);
			my $text = join('', $it->());
			my $next = join('', $it->());
			do {
				if (length($next)) {
					print process_log_entry($text);
				} else {
					$pending .= $text;
				}
				$text = $next;
				$next = join('', $it->());
			} while ($text);
			

			# If we have a (potentially) partial entry pending then
			# we want to timeout our I/O so we don't buffer it for
			# too long, if the buffer doesn't end with a \n we'll assume
			# that it can't be complete and just wait
			if (length($pending) && substr($pending, -1, 1) == "\n") {
				my $tmp = $pending;
				$tmp =~ s/(^|\n)/\n>/m;
				$tmout = $io_timeout;
			} else {
				$tmout = undef;
			}

		}
	}

	# emit any pending / incomplete entries
	if (length($pending)) {
		print process_log_entry($pending);
	}
}


# (Re) dump stats file on SIGUSR1 (not to the log, just the file)
$SIG{USR1} = \*dump_stats;

# Run the main loop (will terminate at EOF)
main_loop();

# At shutdown we dump stats to the log AND, if configured, to the stats file
print dump_stats();

