285 lines
9.1 KiB
Perl
285 lines
9.1 KiB
Perl
|
# ==================================================================
|
||
|
# Gossamer Links - enhanced directory management system
|
||
|
#
|
||
|
# Website : http://gossamer-threads.com/
|
||
|
# Support : http://gossamer-threads.com/scripts/support/
|
||
|
# CVS Info : 087,071,086,086,085
|
||
|
# Revision : $Id: Parallel.pm,v 1.8 2005/03/05 01:29:09 brewt Exp $
|
||
|
#
|
||
|
# Copyright (c) 2001 Gossamer Threads Inc. All Rights Reserved.
|
||
|
# Redistribution in part or in whole strictly prohibited. Please
|
||
|
# see LICENSE file for full details.
|
||
|
# ==================================================================
|
||
|
|
||
|
package Links::Parallel;
|
||
|
# ==================================================================
|
||
|
# A way to get parallel work for ceartain tasks (not thread based).
|
||
|
#
|
||
|
use strict;
|
||
|
|
||
|
sub new {
|
||
|
#------------------------------------------------------------
|
||
|
# creats a new class, be sure to take a look at how it can
|
||
|
# be configured
|
||
|
#
|
||
|
my $class = shift;
|
||
|
my %p;
|
||
|
|
||
|
ref $_[0] ? (%p = %{$_[0]} ) : (%p = @_);
|
||
|
|
||
|
my $self = {};
|
||
|
bless $self, $class;
|
||
|
|
||
|
$self->{max_workunit} = defined ( $p{max_workunit} ) ? $p{max_workunit} : 10;
|
||
|
$self->{min_workunit} = defined ( $p{min_workunit} ) ? $p{min_workunit} : 3;
|
||
|
$self->{max_children} = defined ( $p{max_children} ) ? $p{max_children} : 3;
|
||
|
$self->{child_path} = defined ( $p{child_path} ) ? $p{child_path} : "./child.pl";
|
||
|
$self->{path_to_perl} = defined ( $p{path_to_perl} ) ? $p{path_to_perl} : "/usr/local/bin/perl";
|
||
|
$self->{max_children} = defined ( $p{max_children} ) ? $p{max_children} : 3;
|
||
|
|
||
|
$self->{spawn_delay} = defined ( $p{spawn_delay} ) ? $p{spawn_delay} : 2;
|
||
|
|
||
|
$self->{to_check} = defined ( $p{to_check} ) ? $p{to_check} : [];
|
||
|
$self->{on_response} = defined ( $p{on_response} ) ? $p{on_response} : sub { };
|
||
|
|
||
|
# for statistics
|
||
|
$self->{start_time} = 0;
|
||
|
$self->{end_time} = 0;
|
||
|
$self->{threads_spawned}= 0;
|
||
|
|
||
|
$self->{threads_stats} = {};
|
||
|
|
||
|
return $self;
|
||
|
}
|
||
|
|
||
|
sub wait {
|
||
|
#------------------------------------------------------------
|
||
|
# the main loop that waits until the subset is checked.
|
||
|
#
|
||
|
my $self = shift;
|
||
|
my $max_children = $self->{max_children};
|
||
|
my @active_units = ();
|
||
|
$self->{start_time} = time;
|
||
|
my $temp = 0;
|
||
|
|
||
|
# while there's stuff to check
|
||
|
while ( ( @{$self->{to_check}} ) or ( @active_units) ) {
|
||
|
# create work units
|
||
|
my $spawned = 0;
|
||
|
while ( ( ($#active_units+1) < $max_children ) and ( @{$self->{to_check}} ) ) {
|
||
|
# if we've already spawned a child, wait one so we don't
|
||
|
# spike the load
|
||
|
sleep $self->{spawn_delay} if $spawned;
|
||
|
$spawned++;
|
||
|
push @active_units, $self->new_work_unit ();
|
||
|
};
|
||
|
|
||
|
# wait for any connections, blocking call.
|
||
|
my $rin = fhbits ( \@active_units );
|
||
|
select ( $rin, undef, undef, undef );
|
||
|
|
||
|
# find out who is has input
|
||
|
my ( $i, $wild_protect );
|
||
|
$i = 0; $wild_protect = 0;
|
||
|
while ( $i <= $#active_units ) {
|
||
|
|
||
|
# if a unit requires attention, get input or kill it
|
||
|
if ( $active_units[$i]->has_input () ) {
|
||
|
$wild_protect++;
|
||
|
my ( $id, $code, $message ) = $active_units[$i]->get_input ();
|
||
|
if ( defined $id ) {
|
||
|
&{$self->{on_response}}( $id, $code, $message );
|
||
|
} else {
|
||
|
my ( $unit_id, $start_time, $number_checked, $time_taken ) = $active_units[$i]->get_stats();
|
||
|
${$self->{thread_stats}}{$unit_id} = [$start_time, $number_checked, $time_taken];
|
||
|
$active_units[$i]->end_unit();
|
||
|
# in case the child aborted abnormally, push the remaining
|
||
|
# urls to be checked onto the stack
|
||
|
push @{$self->{to_check}}, @{$active_units[$i]->{to_check}};
|
||
|
splice ( @active_units, $i, 1 );
|
||
|
next;
|
||
|
};
|
||
|
};
|
||
|
$i++;
|
||
|
};
|
||
|
|
||
|
# protect against wild looping
|
||
|
$wild_protect || die "Error in verifier, looping wildly";
|
||
|
};
|
||
|
$self->{end_time} = time;
|
||
|
}
|
||
|
|
||
|
sub get_stats {
|
||
|
#------------------------------------------------------------
|
||
|
# Return stats for the thread information.
|
||
|
#
|
||
|
my $self = shift;
|
||
|
return [ $self->{threads_spawned}, $self->{end_time} - $self->{start_time}, $self->{thread_stats} ];
|
||
|
}
|
||
|
|
||
|
sub new_work_unit {
|
||
|
#------------------------------------------------------------
|
||
|
# allocates a new work unit for the chilren
|
||
|
# there are some optimization routines that should at some
|
||
|
# point be implemented (for better allocation of
|
||
|
# work units
|
||
|
#
|
||
|
my $self = shift;
|
||
|
my $num_units = $#{$self->{to_check}}+1;
|
||
|
my $max_children= $self->{max_children};
|
||
|
my $unit_size = int ( $num_units / ( $max_children + 1 ) );
|
||
|
|
||
|
($unit_size > $self->{max_workunit}) and $unit_size = $self->{max_workunit};
|
||
|
($unit_size < $self->{min_workunit}) and $unit_size = $self->{min_workunit};
|
||
|
($unit_size > $num_units) and $unit_size = $num_units;
|
||
|
|
||
|
my @to_check = @{$self->{to_check}}[0..$unit_size-1];
|
||
|
splice ( @{$self->{to_check}}, 0, $unit_size );
|
||
|
|
||
|
$self->{threads_spawned}++;
|
||
|
|
||
|
return Links::Parallel::WorkUnit->new ( $self->{path_to_perl}, $self->{child_path}, $self->{child_args}, \@to_check );
|
||
|
}
|
||
|
|
||
|
sub fhbits {
|
||
|
#------------------------------------------------------------
|
||
|
# to set the fhandle bits for the impending select call
|
||
|
#
|
||
|
my ($work_units, $bits) = @_;
|
||
|
defined $bits or ($bits = '');
|
||
|
foreach (@$work_units) {
|
||
|
vec($bits,$_->fno(),1) = 1;
|
||
|
};
|
||
|
return $bits;
|
||
|
}
|
||
|
|
||
|
#####################################################
|
||
|
package Links::Parallel::WorkUnit;
|
||
|
|
||
|
use FileHandle;
|
||
|
use strict;
|
||
|
my $clwork_units = 0;
|
||
|
|
||
|
sub new {
|
||
|
#------------------------------------------------------------
|
||
|
# creates a new work unit, starts up the child process
|
||
|
# and encapsulats all the required data...,
|
||
|
#
|
||
|
my ($class, $perlpath, $child, $cmdline, $to_check, $verbosity) = @_;
|
||
|
my $self = {};
|
||
|
$self->{verbosity} = $verbosity || 1;
|
||
|
($self->{istream}, $self->{pid}) = new_handle ( $perlpath, $child, $cmdline, $to_check );
|
||
|
@{$self->{to_check}} = @$to_check;
|
||
|
@{$self->{checked}} = ();
|
||
|
$self->{unitid} = $clwork_units++;
|
||
|
$self->{start_time} = time;
|
||
|
$self->{number_checked} = 0;
|
||
|
bless $self, $class;
|
||
|
|
||
|
return $self;
|
||
|
}
|
||
|
|
||
|
sub new_handle {
|
||
|
#------------------------------------------------------------
|
||
|
# the function that actually creates a new child process
|
||
|
#
|
||
|
my ($perlpath, $child, $cmdline, $to_check, $verbosity) = @_;
|
||
|
$verbosity ||= 1;
|
||
|
|
||
|
my $newfh = new FileHandle;
|
||
|
my $pid = 0;
|
||
|
$cmdline ||= '';
|
||
|
$, = "|";
|
||
|
|
||
|
if ($verbosity) {
|
||
|
print "Launching new child ... ";
|
||
|
}
|
||
|
if (-e $child) {
|
||
|
$pid = $newfh->open ( "$perlpath $child $cmdline @$to_check |" );
|
||
|
if ((!$pid)or($?)) { die "Error launching child '$perlpath $child $cmdline'. Status: $?"; }
|
||
|
} else {
|
||
|
die "Child ($child) must exist";
|
||
|
}
|
||
|
print "ok ($pid)\n";
|
||
|
return ( $newfh, $pid );
|
||
|
}
|
||
|
|
||
|
sub fno {
|
||
|
#------------------------------------------------------------
|
||
|
# returns the file handle, useful when using the
|
||
|
# "select" call
|
||
|
#
|
||
|
my $self = shift;
|
||
|
return fileno ( $self->{istream} );
|
||
|
}
|
||
|
|
||
|
sub has_input {
|
||
|
#------------------------------------------------------------
|
||
|
# returns whether or not this workunit has anything to
|
||
|
# report to the parent
|
||
|
#
|
||
|
my $self = shift;
|
||
|
my $rin = '';
|
||
|
vec ( $rin, $self->fno(), 1 ) = 1;
|
||
|
my $s = select ( $rin, undef, undef, 0 );
|
||
|
return $s;
|
||
|
}
|
||
|
|
||
|
sub get_input {
|
||
|
#------------------------------------------------------------
|
||
|
# process the local input.
|
||
|
# this is only here because we want to make sure that
|
||
|
# the work unit keeps track of it's own work pool
|
||
|
# this frees the task administrator to do it's real
|
||
|
# work and helps with crash recovery
|
||
|
#
|
||
|
my $self = shift;
|
||
|
my $fh = $self->{istream};
|
||
|
$fh || die "not defined!";
|
||
|
my $str = <$fh>;
|
||
|
if ( defined ( $str ) ) {
|
||
|
chop $str;
|
||
|
$str =~ /\s*([0-9]+)\t([-0-9]*)\t(.*)/;
|
||
|
push @{$self->{checked}}, $1;
|
||
|
splice @{$self->{to_check}}, 0, 1;
|
||
|
$self->{number_checked}++;
|
||
|
return ( $1, $2, $3 );
|
||
|
} else {
|
||
|
$self->end_unit ();
|
||
|
return;
|
||
|
};
|
||
|
}
|
||
|
|
||
|
sub get_stats {
|
||
|
#------------------------------------------------------------
|
||
|
# Display statistic information.
|
||
|
#
|
||
|
my $self = shift;
|
||
|
return ( $self->{unitid}, $self->{start_time}, $self->{number_checked}, time-$self->{start_time} );
|
||
|
}
|
||
|
|
||
|
sub end_unit {
|
||
|
#------------------------------------------------------------
|
||
|
# prepares the WorkUnit for deallocation. Note how
|
||
|
# there is a force -9 kill, without that, perl will wait
|
||
|
# until the child finishes on it's own, which might be
|
||
|
# soon, later or in a 100 years
|
||
|
#
|
||
|
my $self = shift;
|
||
|
my $fh = $self->{istream};
|
||
|
|
||
|
kill 9, ( $self->{pid} );
|
||
|
$self->{istream}->close ();
|
||
|
}
|
||
|
|
||
|
sub DESTROY {
|
||
|
#------------------------------------------------------------
|
||
|
# deallocs the object
|
||
|
# we want perl to force kill the child so we can ensure we exit
|
||
|
# quickly
|
||
|
my $self = shift;
|
||
|
$self->end_unit;
|
||
|
}
|
||
|
|
||
|
1;
|