# ================================================================== # Gossamer Threads Module Library - http://gossamer-threads.com/ # # GT::IPC::Run # Author : Scott Beck # CVS Info : 087,071,086,086,085 # $Id: Run.pm,v 1.22 2006/05/26 21:56:30 brewt Exp $ # # Copyright (c) 2004 Gossamer Threads Inc. All Rights Reserved. # ================================================================== # # Description: # Runs programs or code references in parallel # package GT::IPC::Run; use strict; use base 'GT::Base'; use vars qw/@EXPORT_OK $SYSTEM $DEBUG $ERRORS/; use Exporter(); use Socket; use Symbol qw/gensym/; use POSIX qw(fcntl_h errno_h :sys_wait_h); use GT::IPC::Filter::Line; use GT::IPC::Run::Select; use GT::IPC::Run::Child; my $can_run_socket = undef; *import = \&Exporter::import; @EXPORT_OK = qw/run/; $DEBUG = 0; sub READ_BLOCK () { 512 } sub IS_WIN32 () { $^O eq 'MSWin32' } $ERRORS = { SEMAPHORE => "Could not create semephore socket; Reason: %s", FORK => "Could not fork; Reason: %s" }; BEGIN { # http://support.microsoft.com/support/kb/articles/Q150/5/37.asp # defines EINPROGRESS as 10035. We provide it here because some # Win32 users report POSIX::EINPROGRESS is not vendor-supported. if (IS_WIN32) { eval '*EINPROGRESS = sub { 10036 };'; eval '*EWOULDBLOCK = sub { 10035 };'; eval '*F_GETFL = sub { 0 };'; eval '*F_SETFL = sub { 0 };'; require GT::IPC::Run::Win32; import GT::IPC::Run::Win32; $SYSTEM = 'GT::IPC::Run::Win32'; } else { require GT::IPC::Run::Unix; import GT::IPC::Run::Unix; $SYSTEM = 'GT::IPC::Run::Unix'; } } sub new { # ------------------------------------------------------------------------ my $self = bless {}, $SYSTEM; $self->{select} = new GT::IPC::Run::Select; return $self; } sub run { # ------------------------------------------------------------------------ my ($program, $out, $err, $in) = @_; my $self = new GT::IPC::Run; my $ref; $self->fatal("No program specified to start") unless defined $program; $ref = ref $program; $self->fatal("Invalid program passed to start $program") if $ref ne 'CODE' and $ref ne 'ARRAY' and $ref; $ref = defined($out) ? ref($out) : undef; my $out_is_handle = _is_handle($out); $self->fatal( BADARGS => "stdout handler is not a code ref or scalar ref" ) if defined $ref and $ref ne 'CODE' and $ref ne 'SCALAR' and !$out_is_handle and $ref !~ /\AGT::IPC::Filter::/; $ref = defined($err) ? ref($err) : undef; my $err_is_handle = _is_handle($err); $self->fatal( BADARGS => "stderr handler is not a code ref or scalar ref" ) if defined $ref and $ref ne 'CODE' and $ref ne 'SCALAR' and !$err_is_handle and $ref !~ /\AGT::IPC::Filter::/; $ref = ref $in; my $in_is_handle = _is_handle($in); $self->fatal( BADARGS => "stdin handler is not a scalar ref or filehandle" ) if $ref ne 'SCALAR' and !$in_is_handle and $ref !~ /\AGT::IPC::Filter::/ and defined $in; my $pid = $self->start( program => $program, stdout => $out, stderr => $err, stdin => $in, debug => $DEBUG ); 1 while $self->do_one_loop; my $exit_code = $self->exit_code($pid); return $exit_code; } sub start { # ------------------------------------------------------------------------ my $self = shift; $self->fatal(BADARGS => "Arguments to start() must be a hash") if @_ & 1; my %opts = @_; my $ref; $self->{_debug} = delete $opts{debug}; $self->{_debug} = $DEBUG unless defined $self->{_debug}; my $program = delete $opts{program}; $self->fatal("No program specified to start") unless defined $program; $ref = ref $program; $self->fatal("Invalid program passed to start $program") if $ref ne 'CODE' and $ref ne 'ARRAY' and $ref; my $out = delete $opts{stdout}; my $actual_out; $ref = defined($out) ? ref($out) : undef; my $out_is_handle = _is_handle($out); # Default to line filter for stderr if ($ref and $ref eq 'CODE') { $actual_out = new GT::IPC::Filter::Line($out); } elsif ($ref and $ref eq 'SCALAR') { $actual_out = new GT::IPC::Filter::Line(sub { $$out .= "$_[0]\n" }); } elsif ($ref and $ref =~ /\AGT::IPC::Filter::/) { $actual_out = $out; } elsif (defined($out) and !$out_is_handle) { $self->fatal( BADARGS => "stdout handler is not a code ref or scalar ref" ); } my $err = delete $opts{stderr}; my $actual_err; my $err_is_handle = _is_handle($err); $ref = defined($err) ? ref($err) : undef; # Default to line filter for stderr if ($ref and $ref eq 'CODE') { $actual_err = new GT::IPC::Filter::Line($err); } elsif ($ref and $ref eq 'SCALAR') { $actual_err = new GT::IPC::Filter::Line(sub { $$err .= "$_[0]\n" }); } elsif ($ref and $ref =~ /\AGT::IPC::Filter::/) { $actual_err = $err; } elsif (defined($err) and !$err_is_handle) { $self->fatal( BADARGS => "stderr handler is not a code ref or scalar ref" ); } my $in = delete $opts{stdin}; my $in_is_handle = _is_handle($in); $ref = ref $in; $self->fatal( BADARGS => "stdin handler is not a scalar ref or filehandle" ) if $ref ne 'SCALAR' and !$in_is_handle and defined $in; my $exit_callback = delete $opts{reaper}; $self->fatal( BADARGS => "The exit callback specified is not a code reference" ) if defined $exit_callback and ref($exit_callback) ne 'CODE'; my $done_callback = delete $opts{done_callback}; $self->fatal( BADARGS => "The done callback specified is not a code reference" ) if defined $done_callback and ref($done_callback) ne 'CODE'; $self->fatal( BADARGS => "Unknown arguments ", join(", ", keys %opts) ) if keys %opts; # get the sockets we need for stdin/stdout/stderr communication my ($stderr_read, $stderr_write) = $self->oneway; $self->fatal("could not make stderr pipe: $!") unless defined $stderr_read and defined $stderr_write; my ($stdout_read, $stdout_write) = $self->twoway; $self->fatal("could not make stdout pipe: $!") unless defined $stdout_read and defined $stdout_write; my ($stdin_read, $stdin_write) = $self->oneway; $self->fatal("could not make stdin pipes: $!") unless defined $stdin_read and defined $stdin_write; # Defaults to blocking $self->stop_blocking($stdout_read); $self->stop_blocking($stdout_write); $self->stop_blocking($stderr_read); $self->stop_blocking($stderr_write); # Change the ones they have overridden if ($in_is_handle) { $stdin_read = $in; undef $stdin_write; undef $in; } elsif (!$in) { undef $stdin_write; undef $stdin_read; } if ($out_is_handle) { $stdout_write = $out; undef $stdout_read; undef $out; } elsif (!$out) { undef $stdout_write; undef $stdout_read; } if ($err_is_handle) { $stderr_write = $err; undef $stderr_read; } elsif (!$err) { undef $stderr_write; undef $stderr_read; } # Temporary location for these $self->{current_child} = new GT::IPC::Run::Child( program => $program, stderr_read => $stderr_read, stderr_write => $stderr_write, stdout_read => $stdout_read, stdout_write => $stdout_write, stdin_write => $stdin_write, stdin_read => $stdin_read, stdin => $in, handler_stdout => $actual_out, handler_stderr => $actual_err, exit_callback => $exit_callback, done_callback => $done_callback, exit_status => 0, pid => 0 ); # Run the program/code ref my $pid = $self->execute; return $pid; } sub do_loop { # ---------------------------------------------------------------------------- my ($self, $wait) = @_; 1 while $self->do_one_loop($wait); } sub exit_code { # ---------------------------------------------------------------------------- my ($self, $pid) = @_; $self->fatal( BADARGS => "No pid passed to exit_code" ) unless defined $pid; return $self->{goners}{$pid}; } sub twoway { # ------------------------------------------------------------------------ my ( $self, $conduit_type ) = @_; # Try UNIX-domain socketpair if no preferred conduit type is # specified, or if the specified conduit type is 'socketpair'. if ( ( not defined $conduit_type or $conduit_type eq 'socketpair' ) and not defined $can_run_socket ) { my ($rw1, $rw2) = (gensym, gensym); eval { socketpair( $rw1, $rw2, AF_UNIX, SOCK_STREAM, PF_UNSPEC ) or die "socketpair 1 failed: $!"; }; # Socketpair succeeded. if ( !length $@ ) { $self->debug("Using socketpair for twoway") if $self->{_debug}; # It's two-way, so each reader is also a writer. select( ( select($rw1), $| = 1 )[0] ); select( ( select($rw2), $| = 1 )[0] ); return ( $rw1, $rw2, $rw1, $rw2 ); } elsif ($DEBUG) { $self->debug("Error with socketpair: $@\n"); } } # Try the pipe if no preferred conduit type is specified, or if the # specified conduit type is 'pipe'. if ( ( not defined $conduit_type or $conduit_type eq 'pipe' ) and not defined $can_run_socket ) { my ($read1, $write1, $read2, $write2) = (gensym, gensym, gensym, gensym); eval { pipe($read1, $write1) or die "pipe 1 failed: $!"; pipe($read2, $write2) or die "pipe 2 failed: $!"; }; # Pipe succeeded. if (!length $@) { $self->debug("Using pipe for twoway") if $self->{_debug}; # Turn off buffering. POE::Kernel does this for us, but someone # might want to use the pipe class elsewhere. select((select($write1), $| = 1)[0]); select((select($write2), $| = 1)[0]); return($read1, $write1, $read2, $write2); } elsif ($self->{_debug}) { $self->debug("Error with pipe(): $@"); } } # Try a pair of plain INET sockets if no preffered conduit type is # specified, or if the specified conduit type is 'inet'. if ( ( not defined $conduit_type or $conduit_type eq 'inet' ) and ( $can_run_socket or not defined $can_run_socket ) ) { my ($rw1, $rw2) = (gensym, gensym); # Try using a pair of plain INET domain sockets. eval { ($rw1, $rw2) = $self->make_socket }; # make_socket # returns em # non-blocking # Sockets worked. if (!length $@) { $self->debug("Using inet socket for twoway") if $self->{_debug}; # Try sockets more often. $can_run_socket = 1; # Turn off buffering. POE::Kernel does this for us, but someone # might want to use the pipe class elsewhere. select((select($rw1), $| = 1)[0]); select((select($rw2), $| = 1)[0]); return($rw1, $rw2, $rw1, $rw2); } elsif ($self->{_debug}) { $self->debug("Error with socket: $@"); } # Sockets failed. Don't dry them again. } $self->debug("Nothing worked") if $self->{_debug}; # There's nothing left to try. return(undef, undef, undef, undef); } sub oneway { # ------------------------------------------------------------------------ my ( $self, $conduit_type ) = @_; # Generate symbols to be used as filehandles for the pipe's ends. my $read = gensym; my $write = gensym; # Try UNIX-domain socketpair if no preferred conduit type is # specified, or if the specified conduit type is 'socketpair'. if ( ( not defined $conduit_type or $conduit_type eq 'socketpair' ) and not defined $can_run_socket ) { eval { socketpair($read, $write, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair failed: $!"; }; # Socketpair succeeded. if (!length $@) { $self->debug("Using socketpair for oneway") if $self->{_debug}; # It's one-way, so shut down the unused directions. shutdown($read, 1); shutdown($write, 0); # Turn off buffering. POE::Kernel does this for us, but someone # might want to use the pipe class elsewhere. select((select($write), $| = 1)[0]); return($read, $write); } elsif ($self->{_debug}) { $self->debug("Could not make socketpair: $@"); } } # Try the pipe if no preferred conduit type is specified, or if the # specified conduit type is 'pipe'. if ( ( not defined $conduit_type or $conduit_type eq 'pipe' ) and not defined $can_run_socket ) { eval { pipe($read, $write) or die "pipe failed: $!" }; # Pipe succeeded. if (!length $@) { $self->debug("Using pipe for oneway") if $self->{_debug}; # Turn off buffering. POE::Kernel does this for us, but # someone might want to use the pipe class elsewhere. select((select($write),$| = 1 )[0]); return($read, $write); } elsif ($self->{_debug}) { $self->debug("Could not make pipe: $@"); } } # Try a pair of plain INET sockets if no preffered conduit type is # specified, or if the specified conduit type is 'inet'. if ( ( not defined $conduit_type or $conduit_type eq 'inet' ) and ( $can_run_socket or not defined $can_run_socket ) ) { # Try using a pair of plain INET domain sockets. eval { ($read, $write) = $self->make_socket }; if (!length $@) { $self->debug("Using inet socket for oneway") if $self->{_debug}; # Try sockets more often. $can_run_socket = 1; # It's one-way, so shut down the unused directions. shutdown($read, 1); shutdown($write, 0); # Turn off buffering. POE::Kernel does this for us, but someone # might want to use the pipe class elsewhere. select((select($write), $| = 1)[0]); return($read, $write); } else { $self->debug("Could not make socket: $@") if $self->{_debug}; $can_run_socket = 0; } } $self->debug("Nothing worked") if $self->{_debug}; return(undef, undef); } # Make a socket. This is a homebrew socketpair() for systems that # don't support it. The things I must do to make Windows happy. sub make_socket { # ------------------------------------------------------------------------ my ($self) = @_; ### Server side. my $acceptor = gensym(); my $accepted = gensym(); my $tcp = getprotobyname('tcp') or die "getprotobyname: $!"; socket( $acceptor, PF_INET, SOCK_STREAM, $tcp ) or die "socket: $!"; setsockopt($acceptor, SOL_SOCKET, SO_REUSEADDR, pack("l", 1)) or die "reuse: $!"; my $server_addr = inet_aton('127.0.0.1') or die "inet_aton: $!"; $server_addr = pack_sockaddr_in( 0, $server_addr ) or die "sockaddr_in: $!"; bind($acceptor, $server_addr) or die "bind: $!"; $self->stop_blocking($acceptor); $server_addr = getsockname($acceptor); listen($acceptor, SOMAXCONN) or die "listen: $!"; ### Client side. my $connector = gensym(); socket($connector, PF_INET, SOCK_STREAM, $tcp) or die "socket: $!"; $self->stop_blocking($connector); unless (connect( $connector, $server_addr)) { die "connect: $!" if $! and ($! != EINPROGRESS) and ($! != EWOULDBLOCK); } my $connector_address = getsockname($connector); my ( $connector_port, $connector_addr ) = unpack_sockaddr_in($connector_address); ### Loop around 'til it's all done. I thought I was done writing ### select loops. Damnit. my $in_read = ''; my $in_write = ''; vec($in_read, fileno($acceptor), 1) = 1; vec($in_write, fileno($connector), 1) = 1; my $done = 0; while ( $done != 0x11 ) { my $hits = select( my $out_read = $in_read, my $out_write = $in_write, undef, 5 ); # For some reason this always dies when called # successivly (quickly) on the 5th or 6th call die "select: $^E" if $hits < 0; #next unless $hits; # try again? # return $self->make_socket unless $hits; # Accept happened. if ( vec( $out_read, fileno($acceptor), 1 ) ) { my $peer = accept( $accepted, $acceptor ) or die "accept: $!"; my ( $peer_port, $peer_addr ) = unpack_sockaddr_in($peer); if ( $peer_port == $connector_port and $peer_addr eq $connector_addr ) { vec( $in_read, fileno($acceptor), 1 ) = 0; $done |= 0x10; } } # Connect happened. if ( vec( $out_write, fileno($connector), 1 ) ) { $! = unpack( 'i', getsockopt( $connector, SOL_SOCKET, SO_ERROR ) ); die "connect: $!" if $!; vec( $in_read, fileno($acceptor), 1 ) = 0; $done |= 0x01; } } # Turn blocking back on, damnit. $self->start_blocking($accepted); $self->start_blocking($connector); return ( $accepted, $connector ); } sub _is_handle { my $ref = ref($_[0]); return ( ($ref and $ref eq 'GLOB') or ($ref and $_[0] =~ /=GLOB\(/) ); } 1; __END__ =head1 NAME GT::IPC::Run - Run programs or code in parallel =head1 SYNOPSIS use GT::IPC::Run; # stderr and stdout filters default to a # GT::IPC::Line::Filter my $exit_code = run '/bin/ls', # Program to run \*stdout_handle, # stdout event \&stderr_handler, # stderr event \$stdin; # stdin my $io = new GT::IPC::Run; use GT::IPC::Filter::Line; my $pid = $io->start( stdout => GT::IPC::Filter::Line->new( regex => "\r?\n", output => sub { print "Output: $_[0]\n" } ), program => sub { print "I got forked\n" }, ); while ($io->do_one_loop) { if (defined(my $exit = $io->exit_code($pid))) { print "$pid exited ", ($exit>>8), "\n"; } } =head1 DESCRIPTION Module to simplify running a program or code reference in parallel. Allows catching and filtering the output of the program and filtering it. =head1 FUNCTIONS GT::IPC::Run will import one function C if you request it to. =head2 run Run is a simple interface to running a program or a subroutine in a separate process and catching the output, both stderr and stdout. This function takes four arguments, only the first argument is required. =over 4 =item First Argument The first argument to C is the program to run or the code reference to run. This argument can be one of three things. If a code reference if passed as the first argument to C, GT::IPC::Run will fork off and run the code reference. You SHOULD NOT exit in the code reference if you want your code to work on Windows. Calling C is ok, as your code is evaled. There are some things you CAN NOT do if you want your code to work on Windows. You SHOULD NOT make any calles to C or C. For some reason, on Windows, this breaks filehandle inheritance so all your output from that moment on (including the C or C) call will go to the real output channel, STDERR or STDOUT. You SHOULD NOT change STDERR or STDOUT. The child process on Windows can affect the filehandles in the parent. This is probably because of the way C on Windows is emulated as threads. You probably should not C either, though this is not confirmed I really doubt it will work the way you plan. If an array reference is passed in it will be dereferenced and passed to C. If a scalar is passed in it will be passed to C. On Windows the arguments are passed to Win32::Process::Create as the program you wish to run. See L. =item Second Argument The second argument to C is what you want to happen to STDOUT as it comes in. This argument can be one of three things. If it is a reference to a GT::IPC::Filter:: class, that will be used to call your code. See L for details. If it is a code reference, a new GT::IPC::Filter::Line object will be created and your code reference will be passed in. Exactly: $out = GT::IPC::Filter::Line->new($out); GT::IPC::Filter::Line will call your code reference for each line of output from the program, the end of the line will be stripped. See L for details. If the argument is a scalar reference, again, a new GT::IPC::Filter::Line object will be created. Exactly: $out = GT::IPC::Filter::Line->new(sub { $$out .= $_[0] }); =item Third Argument The third argument to L is used to handle STDERR if and when what you are running produces it. This can be the exact same thing as the second argument, but will work on STDERR. =item Forth Argument This argument is how to handle STDIN. It may be one of two things. If it is a SCALAR, it will be printed to the input of what you are running. =back =head1 METHODS =head2 new The is a simple method that takes no arguments and returns a GT::IPC::Run object. It may take options in the future. =head2 start This is the more complex method to start a program running. When you call this method, the program you specify is started right away and it's PID (process ID) is returned to you. After you call this you will either need to call C or C to start getting the programs or code references output. See L<"do_loop"> and L<"do_one_loop"> else where in this document. This method takes a hash of arguments. The arguments are: =over 4 =item program The name of the program, or code reference you wish to run. This is treated the same way as the first argument to L. See L<"run"> else where in this document for a description of how this argument is treated. =item stdout This is how you want STDOUT treated. It can be the same things as the second argument to L. See L<"run"> else where in this document for a description of how this argument is treated. =item stderr This is how you want STDERR treated. It can be the same things as the third argument to L. See L<"run"> else where in this document for a description of how this argument is treated. =item stdin This argument is how to handle STDIN. It may be one of two things. It is treated like the forth argument to L. See L<"run"> else where in this document for a description of how this argument is treated. =item reaper This is a code reference that will be ran once a process has exited. Note: the process may not be done sending us STDOUT or STDERR when it exits. The code reference is called with the pid as it's first argument and the exit status of the program for its second argument. The exit status is the same as it is returned by waitpid(). The exit status is somewhat fiddled on Windows to act the way you want it to, e.g. C<$exit_status EE 8> will be the number the program exited with. =item done_callback This is a code reference that works similarly to reaper except that it is only called after the child has died AND all STDOUT/STDERR output has been sent, unlike reaper which is called on exit, regardless of any output that may still be pending. The code reference is called wih the pid and exit status of the program as its two arguments. =back =head2 do_one_loop This method takes one argument, the time to wait for C to return something in milliseconds. This does one select loop on all the processes. You will need to called this after you call C. Typically: my $ipc = new GT::IPC::Run; my $pid = $ipc->start(program => 'ls'); 1 while $ipc->do_one_loop; my $exit_status = $ipc->exit_code($pid); =head2 do_loop This is similar to C, except it does not return unless all processes are finished. Almost the same as: 1 while $ipc->do_one_loop; You can pass the wait time to C and it will be passed on to C. The wait time is in milliseconds. =head2 exit_code This method takes a pid as an argument and returns the exit status of that processes pid. If the process has not exited yet or GT::IPC::Run did not launch the process, returns undefined. The exit code returned by this is the same as returned by waitpid. See L and L. =head1 SEE ALSO See L, L, L, L, and L. =head1 MAINTAINER Scott Beck =head1 COPYRIGHT Copyright (c) 2004 Gossamer Threads Inc. All Rights Reserved. http://www.gossamer-threads.com/ =head1 VERSION Revision: $Id: Run.pm,v 1.22 2006/05/26 21:56:30 brewt Exp $ =cut