#!/usr/bin/perl

=pod

=head1 NAME

fifo-filter

=head1 SYNOPSIS

Usage

fifo-filter /path/to/fifo /path/to/logfile process-filter-output --someoption=1

=head1 DESCRIPTION

Provide a reliable wrapper to handle filtering of content through a fifo
which must not block or lose content

The filter will be bypassed in the following situations
  1. SIGUSR1 is received (Until SIGUSR2 is receieved)
  2. The filter backlog exceeds qsoftlimit entries for more than qsoftdelay seconds
  3. The filter backlog exceeds qhardlimit entries
  4. The filter lag exceeds qsoftlag seconds for more than qsoftdelay seconds
  5. The filter lag exceeds qhardlag seconds

In all cases except #1 an attempt will be made to re-start the filter and return
from bypass mode every 5 minutes, in all cases a SIGHUP resets the error counters
closes and re-opens the log file and tries to restart the filter.

=head1 OPTIONS

=over 4

=item B</path/to/fifo>

The location of the fifo that this process will read from

=item B</path/to/logfile>

The path to the output (filtered) logfile

=item B<...>

The rest of the arguments are the command and arguments for the filter command itself,
the command will be executed with the default shell performing the usual $PATH search

=back

=head1 SIGNALS

=over 4

=item B<SIGHUP>

Re-open the log and respawn the filter process

=item B<SIGUSR1>

Bypass the filter (write direct to the log file)

=item B<SIGUSR2>

Re-enable the filter (write through the filter process)

=back

=cut

use strict;
use warnings;

use POSIX;
use Fcntl qw(:DEFAULT :flock);
use IO::Handle;
use IO::File;
use IO::Select;
use Time::HiRes qw(gettimeofday sleep alarm time);
use List::Util qw(min max);
use IPC::Open2;

if (!defined $Fcntl::{'F_SETPIPE_SZ'}) {
	use constant {
		F_SETPIPE_SZ => 1031
	};
}

# Basic usage check
if ($#ARGV <  2) {
	print "Usage: $0 /path/to/fifo /path/to/logfile <filter-command>\n";
	exit(1);
}

# Rename ourself for ps
my $PSNAME = $0 . ' ' . join(' ', @ARGV);
$0 = $PSNAME;

# Collect our arguments in one place
my $ARGS = {
	'fifo'     => $ARGV[0],
	'logfile'  => $ARGV[1],
	'cmd'      => [@ARGV[2..$#ARGV]]
};

# Configuratuion tunables
my $CFG = {
	'chunksize'    =>   16,   # How big a chunk I/O should be performed in (kiB)
	'pipebuf'      =>  256,   # How large the pipe buffer should be (kiB: 4..1024)
	'retrycount'   =>    5,   # How many filter retries before suspending the filter [count]
	'runtime'      =>   10,   # How long the filter needs to stay running for to be considered 'alive'
	'holdtime'     =>   30,   # How long to suspend the filter for [fractional seconds]
	'sigdelay'     =>   10,   # How long to wait between filter shutdown signals (SIGTERM, SIGKILL) [fractional seconds]
	'qsoftmax'     => 1000,   # The soft I/O queue size limit [entries]
	'qhardmax'     => 3000,   # The hard I/O queue size limit [entries]
	'qsoftdelay'   =>   90,   # The soft I/O queue grace period [fractional seconds]
	'qsoftlag'     =>   30,   # The soft I/O queue lag limit [fractional seconds]
	'qhardlag'     =>  180,   # The hard I/O queue lag limit [fractional seconds]
	'maxwritetime' =>    1    # Maximum time per-call for the I/O writeout loop before reading [fractional seconds]
};

# Our global filehandles
my $FH = {
	'FIFO'    => undef, # Input FIFO
	'LOGFILE' => undef, # Output Log File
	'FILTER'  => undef, # Filter Subprocess (PIPE)
	'OUTPUT'  => undef  # undef/LOGFILE/FILTER => queue/bypass/filter mode
};

# # What signals are pending processing
my $SIGQUEUE = [];

# Filter state information
my $FILTER = {
	'pid'      => undef, # pid of the filter process
	'retries'  => 0,     # How many times we've tried to (re)start this round
	'started'  => 0,     # When we last began trying to start the filter process
	'signaled' => 0      # What signal we last sent to the filter process (SIGINT ==> close())
};


# Try to acquire any existing fifo exclusively, creating it if required
# we'll also take out an exclusive flock() to make sure we're the only reader 
{
	my $fd;
	if (! -p $ARGS->{'fifo'}) {
		POSIX::mkfifo($ARGS->{'fifo'}, 0600);
	}
	if (-p $ARGS->{'fifo'}) {
		# We need to open this RDWR so we block our own EOF
		sysopen($fd, $ARGS->{'fifo'}, O_RDWR|O_NONBLOCK);
		if (!flock($fd, LOCK_EX | LOCK_NB)) {
			die "Failed to lock fifo '" . $ARGS->{'fifo'} . "' ($!)\n";
		}
	} else{
		die "Failed to open or create fifo '" . $ARGS->{'fifo'} . "' ($!)\n";
	}
	$FH->{'FIFO'} = IO::Handle->new_from_fd(fileno($fd), 'r') ||
 		die "Failed to open FIFO filehandle ($!)\n";
}

# Try to open the log file creating it if required, we don't really need to do
# this here, but it gives us the oportunity to barf upfront
{
	$FH->{'LOGFILE'} = IO::File->new($ARGS->{'logfile'}, O_CREAT|O_WRONLY|O_APPEND, 0600) ||
		die "Failed to open or create log file '" . $ARGS->{'logfile'} . "' ($!)\n";
}


#############################################################################
# (UN)BLOCK signals
#############################################################################
sub sigstate($@) {
	my $action = shift;
	my $sigset = POSIX::SigSet->new(@_);
	sigprocmask($action, $sigset);
}


sub debug($$) {
	my $prefix = shift;
	my $_ = shift;
	s/\r/\\r/g;
	s/\n/\\n\n/g;
	s/\t/\\t/g;
	print STDERR "$prefix $_\n";
}


#############################################################################
# Set the output to the FH speified by $1, logging it along with the reason
# optionally provided in $2
#############################################################################
sub set_output($;$) {
	my $output = shift;
	my $reason = shift || 'UNKNOWN reason';
	my $mode = 'UNKNOWN';
	if (!defined $output) {
		$mode = 'queue';
	} elsif ($output == $FH->{'LOGFILE'}) {
		$mode = 'bypass';
	} elsif (defined $FH->{'FILTER'} && $output == $FH->{'FILTER'}) {
		$mode = 'filter'
	}
	debug('**', "Entered $mode mode because: $reason");
	$0 = "$PSNAME [$mode]";
	$FH->{'OUTPUT'} = $output;
}

#############################################################################
# Begin the process of shutting down the filter process, note that this 
# process is async and is *NOT* guaranteed to be complete when this function
# returns
#############################################################################
sub close_filter(;$) {
	my $reason = shift || 'close_filter() called without reason';
	if (defined $FH->{'OUTPUT'} && defined $FH->{'FILTER'} && $FH->{'OUTPUT'} == $FH->{'FILTER'}) {
		# Go into queue (not necessarily bypass) mode
		set_output(undef, $reason);
	}
	if ($FILTER->{'pid'} > 0 && kill(0, $FILTER->{'pid'}) > 0) {
		if ($FILTER->{'signaled'} == 0) {
			debug('--', 'Initiated shutdown of filter process with pid ' . $FILTER->{'pid'} . ' because: ' . $reason);
			$FH->{'FILTER'}->close();
			$FH->{'FILTER'} = undef;
			$FILTER->{'started'}  = 0;
			$FILTER->{'signaled'} = SIGINT;
			alarm($CFG->{'sigdelay'});
		}
	}
}	


#############################################################################
# Begin the process of spawning a new filter process, not that this process
# is async and *NOT* gauaranteed to be complete when this function returns
#############################################################################
sub spawn_filter {
	# Check for existing filter process (TODO: kill(0?)
	if (defined $FILTER->{'pid'} && $FILTER->{'pid'} > 0) {
		debug('!!', "Filter with pid " . $FILTER->{'pid'} . " is already running");
		return undef;
	}
	if (time() - $FILTER->{'started'} > $CFG->{'runtime'}) { # If the filter ran for 30 seconds or more, reset the retries
		$FILTER->{'retries'} = 0;
	}
	$FILTER->{'retries'}++;
	if ($FILTER->{'retries'} > $CFG->{'retrycount'}) {
		debug('!!', "Too many retries to start filter process, suspending filter for " . $CFG->{'holdtime'} . " seconds");
		alarm($CFG->{'holdtime'});
		return undef;
	}

	sigstate(SIG_BLOCK, SIGCHLD, SIGPIPE);
	my $pipe;
	$FILTER->{'pid'} = open2('>&' . $FH->{'LOGFILE'}->fileno(), $pipe, @{$ARGS->{'cmd'}});
	if ($FILTER->{'pid'} > 0) {
		$FILTER->{'started'} = time();
		fcntl($pipe, F_SETPIPE_SZ, $CFG->{'pipebuf'}*1024);		
		$FH->{'FILTER'} = IO::Handle->new_from_fd(fileno($pipe), 'w');
		$FH->{'FILTER'}->blocking(0);
		$FH->{'FILTER'}->autoflush(1);
		if (!defined $FH->{'OUTPUT'} || $FH->{'OUTPUT'} != $FH->{'LOGFILE'}) {
			set_output($FH->{'FILTER'}, "spawned new filter process with pid " . $FILTER->{'pid'});
		} else {
			debug('--', "Spawned new filter process with pid " . $FILTER->{'pid'});
		}
	}
	sigstate(SIG_UNBLOCK, SIGCHLD, SIGPIPE);
}

#############################################################################
# Write out any queued I/O to the current output target 
# returns the timeout for the next select() call
#############################################################################
sub process_write_queue {
	my $queue = shift;
	my $started = time();

	# If there's nothing queued we don't need to retry until we get input
	if (scalar @{$queue} == 0) {
		return undef;
	}

	# If the OUTPUT handle is undef we're biding our time, come back in a second or so
	if (!defined $FH->{'OUTPUT'}) {
		return 1.0;
	}

	# Otherwise process queued chunks until we finish or get a write failure
	while (my $entry = shift @{$queue}) {
		my $ts    = @{$entry}[0];
		my $chunk = @{$entry}[1];
		my $rv = $FH->{'OUTPUT'}->syswrite($chunk);
		if (!$rv || $rv != length($chunk)) { # Failed write
			# TODO: Should N consequtive failed writes should cause a filter restart?
			if (defined($rv)) { # Partial write
				#debug("!!", "Partial filter write (" . $rv . " of " . length($chunk) . " bytes)");
				unshift(@{$queue}, [$ts, substr($chunk, $rv)]);
			} else {
				#debug("!!", "Filter write failed ($!)");
				unshift(@{$queue}, [$ts, $chunk]);
			}
			return 0.5;
		}
		if (scalar $queue > 0 &&  time() - $started > $CFG->{'maxwritetime'}) {
			#debug('!!', 'Writeout delay elapsed');
			# We've been here for > 1 second, don't let writes starve reads for too long
			# just peek in select() by setting an immediate timeout
			return 0;
		}
	}
	# I/O queue is empty now, no timeout for the next select()
	return undef;
}

#############################################################################
# Process any signal events that have been added to the queue by signaled()
# We do this outside the signal handler to avoid delays and race conditions
# eg when trying to block signals
#############################################################################
sub process_signal_queue($) {
	my $signals = shift;
	while (my $sig = shift($signals)) {
		if ($sig eq 'CHLD') { # A child died
			while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
				if ($pid == $FILTER->{'pid'}) {	# Our filter died, try to restart it
					my $msg = 'filter process with pid ' . $pid . ' exited ';
					if (WIFEXITED($?)) {
						$msg .= ' with status ' . WEXITSTATUS($?);
					} elsif (WIFSIGNALED($?)) {
						$msg .= ' due to signal ' . WTERMSIG($?);
					} else {
						$msg .= ' with $? of ' . $?;
					}
					$FILTER->{'pid'} = 0;
					$FILTER->{'signaled'} = 0;
					alarm(0);
					if (defined($FH->{'OUTPUT'}) && defined($FH->{'FILTER'}) && $FH->{'OUTPUT'} == $FH->{'FILTER'}) {
						set_output(undef, $msg);
					} else {
						debug('--', $msg);
					}
					$FH->{'FILTER'} = undef;
					spawn_filter();
				}
			}
		} elsif ($sig eq 'PIPE') { # A write failed
			# Restart the filter if it's currently still running
			close_filter('received SIGPIPE'); # Will get restarted on SIGCHLD
		} elsif ($sig eq 'ALRM') { # A timer expired
			if ($FILTER->{'pid'} > 0 && kill(0, $FILTER->{'pid'}) > 0 ) {
				# We're trying to kill a filter...
				if ($FILTER->{'signaled'} == SIGINT) {
					$FILTER->{'signaled'} = SIGTERM;
				} elsif ($FILTER->{'signaled'} == SIGTERM) {
					$FILTER->{'signaled'} = SIGKILL;
				} else {
					# Didn't respond to SIGKILL!!!
					# TODO: Should we just ignore it and start a new filter process regardless?
					debug('**', 'Filter process with pid ' . $FILTER->{'pid'} . ' is apparently invincible...');
					continue;
				}
				debug('--', 'Filter process with pid ' . $FILTER->{'pid'} . ' failed to shutdown, sending sig ' . $FILTER->{'signaled'});
				kill($FILTER->{'signaled'}, $FILTER->{'pid'});
				alarm($CFG->{'sigdelay'});
			} else {
				# We're waiting to try a respawn
				debug('**', 'Hold time expired, trying to restart filter');
				$FILTER->{'retries'} = 0;
				spawn_filter();
			}
		} elsif ($sig eq 'HUP') { # we've been asked to re-open the log (and re-spawn the filter)
			# Re-open the log file...
			my $fh = IO::File->new($ARGS->{'logfile'}, O_CREAT|O_WRONLY|O_APPEND, 0600);
			if ($fh) {
				debug('--', 'Re-opened log file ' . $ARGS->{'logfile'} . ' due to SIGUP');
				my $tmp = $FH->{'LOGFILE'};
				$FH->{'LOGFILE'} = $fh;
				if (defined $FH->{'OUTPUT'} && $FH->{'OUTPUT'} == $tmp) {
					set_output($FH->{'LOGFILE'}, 'recieved SIGHUP');
				}
				$tmp->close();
				close_filter('received SIGHUP'); # Will get restared on SIGCHLD
			} else {
				debug('**', "Failed to open or re-create log file '" . $ARGS->{'logfile'} . "' ($!)");
			}
		} elsif ($sig eq 'USR1') { # Go into bypass mode
			set_output($FH->{'LOGFILE'}, 'received SIGUSR1');
		} elsif ($sig eq 'USR2') { # Go into queue/filter mode
			set_output($FH->{'FILTER'}, 'received SIGUSR2');
		} else {
			print STDERR "Unhandled signal '$sig' encounted in queue\n";
		}
	}
}



sub main_loop {
	my @queued;                # Complete line(s) ready to be written out (each entry may be *multiple* lines)
	my $pending    = '';       # Incomplete data (ie no EOLs) waiting on further input
	my $overflowed = INT_MAX;  # When we most recently overflowed the queue soft limit (size)
	my $lagged     = INT_MAX;  # When we most recently exceeded the queue lag soft limit
	my $bypassed   = INT_MAX;  # When we most recently bypassed the filter
	my $eofsince   = INT_MAX;  # When the current set of empty reads started
	my $eofcount   = 0;        # How many (sequential) times we got an empty read
	my $tmout      = undef;
	my $IO         = IO::Select->new();
	$IO->add($FH->{'FIFO'});

	debug("--", "Starting main event loop");
	while (1) {
		process_signal_queue($SIGQUEUE);
		my @ready = $IO->can_read($tmout);
		process_signal_queue($SIGQUEUE);
		if ($#ready == -1) {
			$tmout = process_write_queue(\@queued)
		} else {
			my ($buf, $len);
			$len = $ready[0]->sysread($buf, $CFG->{'chunksize'}*1024);
			#debug("<<", $buf);
			$buf = $pending . $buf;
			if ($len && $len > 0) {
				# Reset the empty read counter/timers
				$eofcount = 0;
				$eofsince = undef;
				my $eol = rindex($buf, "\n");
				if ($eol >= 0) {
					# We have whole line(s), enqueue them
					push(@queued, [time(), substr($buf, 0, $eol+1)]);
					$pending = substr($buf, $eol+1);
				}
			} else {
				# An empty read on a fifo when select(2) indicates that the fd is read-ready normally
				# indicates that there are no more writers, ie an end-of-stream condition, but it can
				# also happen when another process is trying to read the fifo and we end up racing
				# against it.  Since we hold our own write handle on the fifo to avoid the
				# end-of-stream condition it's most likely that we're racing against another process
				# that's trying to read from our fifo so we shouldn't just exit, at least not straight
				# away.

				# How many times / how long we've been seeing this condition
				if (not defined $eofsince) {
					$eofsince = time();
					# We only emit this log entry on (each) initial empty read to avoid flooding the log
					# 
					debug("!!", "Empty read, check for other processes reading from the fifo!\n");
				}
				$eofcount++;

				$tmout = process_write_queue(\@queued);

				# We don't want to spin too hard on empty reads so back-off after a few times
				# but we deliberately wait until after we've processed any queued events
				if ($eofcount > 5 && time() > ($eofsince - 5)) {
					if ($eofcount <= 10 || ($eofcount % 30 == 0)) {
						# Don't log too often, otherwise it'll get noisy really fast	
						debug("!!", "Too many/too fast empty reads from fifo, delaying next select() poll by 1 second");
					}
					# Note that a signal may kick us out of this sleep, but since we're just trying
					# to avoid busy-wait polling that's not really a concern here.
					sleep(1);
				}
			}
		}

		# (Try to) write out our queued content
		$tmout = process_write_queue(\@queued);

		# Do we need to go into bypass mode?
		my $now    = time();
		my $reason = undef;
		if (scalar @queued && ($now - $queued[0]->[0]) > $CFG->{'qsoftlag'}) { # Queue not empty, and over soft lag threshold
			$lagged = min($lagged, $now);
			debug('--', 'Queue has exceeded soft lag limit for ' . ($now-$lagged)  . ' seconds');
			if ($now - $queued[0]->[0] > $CFG->{'qhardlag'}) {
				# Over the hard lag limit
				$reason = 'queue exceeded hard lag limit of ' . $CFG->{'qhardlag'} . ' seconds';
			} elsif ($now - $lagged > $CFG->{'qsoftdelay'}) {
				# Over the soft lag limit for more than the threshold time
				$reason = 'queue exceeded soft lag limit of ' . $CFG->{'qsoftlag'} . ' seconds for ' . (int($now-$lagged)) . ' seconds';
			}
		}
		if (scalar @queued > $CFG->{'qsoftmax'}) { # Allows for up to 32K*1000 => 32Mb
			$overflowed = min($overflowed, $now);
			if (scalar @queued > $CFG->{'qhardmax'}) { # Allows for up to 32k*3000 =>  96Mb
				# Over the hard size limit
				$reason = 'queue exceeded hard limit of ' . $CFG->{'qhardmax'} . ' entries';
			} elsif ($now - $overflowed > $CFG->{'qsoftdelay'}) {
				# Over the soft size limit for more than the threshold time
				$reason = 'queue exceeded soft limit of ' . $CFG->{'qsoftmax'} . ' entries for ' . (int($now-$overflowed)) . ' seconds';
			}
		}
		if (defined($reason) && (!defined($FH->{'OUTPUT'}) || $FH->{'OUTPUT'} != $FH->{'LOGFILE'})) {
			# Go into bypass mode because $reason
			$bypassed = min($bypassed, $now);
			set_output($FH->{'LOGFILE'}, $reason);
			close_filter('entered bypass mode');
		} elsif (!defined($reason) && $bypassed < INT_MAX) {
			# TODO: Should there be a "settle" time for bypass mode here?
			# We're in bypass mode and have fallen below the soft limit, go back to filter (or queue) mode
			set_output($FH->{'FILTER'}, 'queue size and lag fell back below soft limits');
			$overflowed = INT_MAX;
			$lagged = INT_MAX;
			$bypassed = INT_MAX;
		}

	}
}


#
# We want to handle signals in-band, so just use a stub to 
# collect them and push them into a queue for processing
#
sub signaled {
	my $signame = shift;
	#debug('--', "SIG$signame");
	push($SIGQUEUE, $signame);
}

$SIG{'PIPE'} = \&signaled;
$SIG{'HUP'}  = \&signaled;
$SIG{'CHLD'} = \&signaled;
$SIG{'ALRM'} = \&signaled;
$SIG{'USR1'} = \&signaled;
$SIG{'USR2'} = \&signaled;

spawn_filter();
main_loop();
