# ================================================================== # Gossamer Threads Module Library - http://gossamer-threads.com/ # # GT::IPC::Run::Unix # Author : Scott Beck # CVS Info : 087,071,086,086,085 # $Id: Unix.pm,v 1.24 2004/02/17 01:33:07 jagerman Exp $ # # Copyright (c) 2004 Gossamer Threads Inc. All Rights Reserved. # ================================================================== package GT::IPC::Run::Unix; use strict; use vars qw/$EVENTS $ERROR_MESSAGE/; use base 'GT::Base'; use IO::Select; use POSIX qw(fcntl_h errno_h :sys_wait_h); sub READ_BLOCK () { 512 } @GT::IPC::Run::Unix::ISA = qw(GT::IPC::Run); $ERROR_MESSAGE = 'GT::IPC::Run'; sub execute { # ------------------------------------------------------------------------ my ($self) = @_; # unless ($self->{sigchld_installed}) { # $self->{chld_signal} = sub { # my $child; # while (($child = waitpid -1, WNOHANG) > 0) { # $self->{goners}{$child} = $?; # $self->debug( # "forked child $child exited with exit status (". # ($self->{goners}{$child} >> 8). # ")\n" # ) if $self->{_debug}; # } # $SIG{CHLD} = $self->{chld_signal}; # }; # $SIG{CHLD} = $self->{chld_signal}; # $self->{sigchld_installed} = 1; # } # Create a semaphore pipe. This is used so that the parent doesn't # begin listening until the child's stdio has been set up. my ($child_pipe_read, $child_pipe_write) = $self->oneway; die "Could not create semaphore socket: $!" unless defined $child_pipe_read; my $pid; if ($pid = fork) { # Parent my $child = delete $self->{current_child}; $self->{select}->add_stdout($pid => $child->stdout_read); $self->{select}->add_stderr($pid => $child->stderr_read); $self->{children}{$pid} = $child; $child->pid($pid); if ($child->stdin and ref($child->stdin) eq 'SCALAR') { print {$child->stdin_write} ${$child->stdin}; close $child->stdin_write; } # Cludge to stop speed forking select undef, undef, undef, 0.001; # Close what the parent will not need # close $child->stdout_write if $child->stdout_write; # close $child->stderr_write if $child->stderr_write; # close $child->stdin_read if $child->stdin_read; <$child_pipe_read>; close $child_pipe_read; close $child_pipe_write; return $pid; } else { $self->fatal(FORK => "$!") unless defined $pid; $self->debug("Forked: $$\n") if $self->{_debug} > 1; # Get out self and out filenos my $self = delete $self->{current_child}; my ($stdout_fn, $stderr_fn, $stdin_fn); $stdout_fn = fileno($self->stdout_write) if $self->stdout_write; $stderr_fn = fileno($self->stderr_write) if $self->stderr_write; $stdin_fn = fileno($self->stdin_read) if $self->stdin_read; # Close what the child won't need. # close $self->stdin_write if $self->stdin_write; # close $self->stderr_read if $self->stderr_read; # close $self->stdout_read if $self->stdout_read; # Tied handles break this untie *STDOUT if tied *STDOUT; untie *STDERR if tied *STDERR; untie *STDIN if tied *STDIN; # Redirect STDOUT to the write end of the stdout pipe. if (defined $stdout_fn) { $self->debug("Opening stdout to fileno $stdout_fn") if $self->{_debug}; open( STDOUT, ">&$stdout_fn" ) or die "can't redirect stdout in child pid $$: $!"; $self->debug("stdout opened") if $self->{_debug}; } # Redirect STDIN from the read end of the stdin pipe. if (defined $stdin_fn) { $self->debug("Opening stdin to fileno $stdin_fn") if $self->{_debug}; open( STDIN, "<&$stdin_fn" ) or die "can't redirect STDIN in child pid $$: $!"; $self->debug("stdin opened") if $self->{_debug}; } # Redirect STDERR to the write end of the stderr pipe. if (defined $stderr_fn) { $self->debug("Opening stderr to fileno $stderr_fn") if $self->{_debug}; open( STDERR, ">&$stderr_fn" ) or die "can't redirect stderr in child: $!"; } select STDERR; $| = 1; select STDOUT; $| = 1; # Tell the parent that the stdio has been set up. close $child_pipe_read; print $child_pipe_write "go\n"; close $child_pipe_write; # Program code here my $program = $self->program; if (ref($program) eq 'ARRAY') { exec(@$program) or do { print STDERR "can't exec (@$program) in child pid $$:$!\n"; eval { POSIX::_exit($?); }; eval { kill KILL => $$; }; }; } elsif (ref($program) eq 'CODE') { $? = 0; $program->(); # In case flushing them wasn't good enough. close STDOUT if defined fileno(STDOUT); close STDERR if defined fileno(STDERR); eval { POSIX::_exit($?); }; eval { kill KILL => $$; }; } else { exec($program) or do { print STDERR "can't exec ($program) in child pid $$:$!\n"; eval { POSIX::_exit($?); }; eval { kill KILL => $$; }; }; } die "How did I get here!"; } } sub put { # ------------------------------------------------------------------------ my $self = shift; my $pid = shift; print {$self->{children}{$pid}->stdin_write} @_; } sub do_one_loop { # ------------------------------------------------------------------------ my ($self, $wait) = @_; $wait = 0.05 unless defined $wait; # See if any children have exited my $child; while (($child = waitpid -1, WNOHANG) > 0) { next unless exists $self->{children}{$child}; $self->{goners}{$child} = $?; $self->{children}{$child}->exit_status($?); $self->debug( "forked child $child exited with exit status (". ($self->{goners}{$child} >> 8). ")\n" ) if $self->{_debug}; } for my $pid (keys %{$self->{goners}} ) { my $child = $self->{children}{$pid} or next; if (!$child->called_reaper) { $child->exit_callback->($pid, $self->{goners}{$pid}) if $child->exit_callback; $child->called_reaper(1); } } my ($stdout_pending, $stderr_pending) = $self->{select}->can_read($wait); my %not_pending = %{$self->{children}}; for my $pid (@$stdout_pending, @$stderr_pending) { delete $not_pending{$pid}; } for my $pid (keys %{$self->{goners}}) { my $child = $self->{children}{$pid} or next; if ($not_pending{$pid} and not $child->called_done) { $child->done_callback->($pid, $self->{goners}{$pid}) if $child->done_callback; $child->called_done(1); } } if (!@$stdout_pending and !@$stderr_pending) { $self->debug("Nothing else to do, flushing buffers") if $self->{_debug}; $self->debug( "Children: ". keys(%{$self->{children}}). "; goners: ". keys(%{$self->{goners}}) ) if $self->{_debug}; # We still have children out there return 1 if keys(%{$self->{children}}) > keys(%{$self->{goners}}); # Flush output filters and delete children to free memory and FDs $self->flush_filters; # Nothing left to do return 0; } # else we have stuff to do for my $pid (@$stdout_pending) { my $child = $self->{children}{$pid}; $self->debug("STDOUT pending for $pid") if $self->{_debug}; my $ret = sysread($child->stdout_read, my $buff, READ_BLOCK); if (!$ret) { if ($! != EAGAIN and $! != 0) { # Socket error $self->debug( "$pid: Socket Read: $!: $^E; Errno: ", 0+$! ) if $self->{_debug}; } } else { # Process callbacks $self->debug("[$pid STDOUT]: `$buff'\n") if $self->{_debug} > 1; if ($child->handler_stdout) { $child->handler_stdout->put(\$buff); } } } for my $pid (@$stderr_pending) { my $child = $self->{children}{$pid}; $self->debug("STDERR pending for $pid") if $self->{_debug}; my $ret = sysread($child->stderr_read, my $buff, READ_BLOCK); if (!$ret) { if ($! != EAGAIN and $! != 0) { # Socket error $self->debug( "$pid: Socket Read: $!: $^E; Errno: ", 0+$! ) if $self->{_debug}; } } else { # Process callbacks $self->debug("[$pid STDERR]: `$buff'\n") if $self->{_debug} > 1; if ($child->handler_stderr) { $child->handler_stderr->put(\$buff); } } } return 1; } sub flush_filters { # ------------------------------------------------------------------------ my $self = shift; for my $pid (keys %{$self->{children}}) { my $child = delete $self->{children}{$pid}; $self->select->remove_stdout($pid); $self->select->remove_stderr($pid); if ($child->handler_stdout) { $child->handler_stdout->flush; } if ($child->handler_stderr) { $child->handler_stderr->flush; } } } sub stop_blocking { # ------------------------------------------------------------------------ my ($self, $socket_handle) = @_; my $flags = fcntl($socket_handle, F_GETFL, 0) or die "getfl: $!"; $flags = fcntl($socket_handle, F_SETFL, $flags | O_NONBLOCK) or die "setfl: $!"; } sub start_blocking { # ------------------------------------------------------------------------ my ($self, $socket_handle) = @_; my $flags = fcntl($socket_handle, F_GETFL, 0) or die "getfl: $!"; $flags = fcntl($socket_handle, F_SETFL, $flags & ~O_NONBLOCK) or die "setfl: $!"; } 1;