#!/usr/bin/perl -w
use strict;
use Time::HiRes qw(time);
use Errno qw(EAGAIN EINTR);
use POSIX qw(F_GETFL F_SETFL O_NONBLOCK);
sub blocking(*$) {
my $handle = shift;
no warnings;
if ($^O eq 'MSWin32' || $^O eq 'VMS') {
# There seems to be no way to query the state
return undef unless @_;
# FIONBIO enables non-blocking sockets on windows and vms.
# FIONBIO is (0x80000000|(4<<16)|(ord('f')<<8)|126),
# as per winsock.h, ioctl.h
my $fionbio = 0x8004667e;
my $val = pack("L!", shift() ? 0 : 1);
ioctl($handle, $fionbio, $val) || die "Can't set ioctl flags: $!";
} else {
my $flags = fcntl($handle, F_GETFL, 0) ||
die "Can't get fcntl flags: $!\n";
return $flags & O_NONBLOCK() ? 0 : 1 unless @_;
fcntl($handle, F_SETFL,
shift() ?
$flags & O_NONBLOCK() ? $flags & ~O_NONBLOCK : return :
$flags & O_NONBLOCK() ? return : $flags | O_NONBLOCK) or
die "Can't set fcntl flags: $!";
}
}
my $step = 2;
my $max_pending = 1e5;
# We will set $stdin_fd to undef on EOF
defined(my $stdin_fd = fileno(STDIN)) || die "STDIN not connected";
defined(my $stdout_fd = fileno(STDOUT)) || die "STDOUT not connected";
# We will set $read_mask to undef on EOF or throttle
my $read_mask = "";
vec($read_mask, $stdin_fd, 1) = 1;
my $write_mask = "";
vec($write_mask, $stdout_fd, 1) = 1;
my $next = time() + $step;
my $stdin_buffer = "";
my $stdout_buffer = "";
my $accumulator = 0;
# Make filehandles non blocking
blocking(STDIN, 0);
blocking(STDOUT, 0);
while (defined $stdin_fd || $stdout_buffer ne "") {
my $wait = $next - time();
my $nfd = select(my $r = $read_mask,
my $w = $stdout_buffer eq "" ? undef : $write_mask,
undef,
defined $read_mask ? $wait > 0 ? $wait : 0 : undef);
if ($nfd) {
if (defined $stdin_fd && vec($r, $stdin_fd, 1)) {
my $rc = sysread(STDIN, $stdin_buffer, 4096, length $stdin_buffer);
if ($rc) {
$accumulator += $1 while $stdin_buffer =~ s/^(.*)\n//;
} elsif (defined $rc) {
# EOF
$accumulator += $stdin_buffer if $stdin_buffer ne "";
$stdout_buffer .= "$accumulator\n";
$stdin_fd = undef;
$read_mask = undef;
} elsif ($! != EAGAIN && $! != EINTR) {
die "Error reading from STDIN: $!";
}
}
if (defined $w && vec($w, $stdout_fd, 1)) {
my $rc = syswrite(STDOUT, $stdout_buffer, 4096);
if (defined $rc) {
substr($stdout_buffer, 0, $rc, "");
if (length $stdout_buffer < $max_pending &&
defined $stdin_fd && !defined $read_mask) {
# Unthrottle
$read_mask = "";
vec($read_mask, $stdin_fd, 1) = 1;
}
} elsif ($! != EAGAIN && $! != EINTR) {
die "Error writing to STDOUT: $!";
}
}
} elsif (!defined $nfd && $! != EAGAIN && $! != EINTR) {
die "Select error: $!";
}
if (defined $stdin_fd && $wait <= 0) {
$stdout_buffer .= "$accumulator\n";
$accumulator = 0;
$next = time() + $step;
# Throttle
$read_mask = undef if length $stdout_buffer > $max_pending;
}
}