# ================================================================== # Gossamer Links - enhanced directory management system # # Website : http://gossamer-threads.com/ # Support : http://gossamer-threads.com/scripts/support/ # CVS Info : 087,071,086,086,085 # Revision : $Id: Parallel.pm,v 1.8 2005/03/05 01:29:09 brewt Exp $ # # Copyright (c) 2001 Gossamer Threads Inc. All Rights Reserved. # Redistribution in part or in whole strictly prohibited. Please # see LICENSE file for full details. # ================================================================== package Links::Parallel; # ================================================================== # A way to get parallel work for ceartain tasks (not thread based). # use strict; sub new { #------------------------------------------------------------ # creats a new class, be sure to take a look at how it can # be configured # my $class = shift; my %p; ref $_[0] ? (%p = %{$_[0]} ) : (%p = @_); my $self = {}; bless $self, $class; $self->{max_workunit} = defined ( $p{max_workunit} ) ? $p{max_workunit} : 10; $self->{min_workunit} = defined ( $p{min_workunit} ) ? $p{min_workunit} : 3; $self->{max_children} = defined ( $p{max_children} ) ? $p{max_children} : 3; $self->{child_path} = defined ( $p{child_path} ) ? $p{child_path} : "./child.pl"; $self->{path_to_perl} = defined ( $p{path_to_perl} ) ? $p{path_to_perl} : "/usr/local/bin/perl"; $self->{max_children} = defined ( $p{max_children} ) ? $p{max_children} : 3; $self->{spawn_delay} = defined ( $p{spawn_delay} ) ? $p{spawn_delay} : 2; $self->{to_check} = defined ( $p{to_check} ) ? $p{to_check} : []; $self->{on_response} = defined ( $p{on_response} ) ? $p{on_response} : sub { }; # for statistics $self->{start_time} = 0; $self->{end_time} = 0; $self->{threads_spawned}= 0; $self->{threads_stats} = {}; return $self; } sub wait { #------------------------------------------------------------ # the main loop that waits until the subset is checked. # my $self = shift; my $max_children = $self->{max_children}; my @active_units = (); $self->{start_time} = time; my $temp = 0; # while there's stuff to check while ( ( @{$self->{to_check}} ) or ( @active_units) ) { # create work units my $spawned = 0; while ( ( ($#active_units+1) < $max_children ) and ( @{$self->{to_check}} ) ) { # if we've already spawned a child, wait one so we don't # spike the load sleep $self->{spawn_delay} if $spawned; $spawned++; push @active_units, $self->new_work_unit (); }; # wait for any connections, blocking call. my $rin = fhbits ( \@active_units ); select ( $rin, undef, undef, undef ); # find out who is has input my ( $i, $wild_protect ); $i = 0; $wild_protect = 0; while ( $i <= $#active_units ) { # if a unit requires attention, get input or kill it if ( $active_units[$i]->has_input () ) { $wild_protect++; my ( $id, $code, $message ) = $active_units[$i]->get_input (); if ( defined $id ) { &{$self->{on_response}}( $id, $code, $message ); } else { my ( $unit_id, $start_time, $number_checked, $time_taken ) = $active_units[$i]->get_stats(); ${$self->{thread_stats}}{$unit_id} = [$start_time, $number_checked, $time_taken]; $active_units[$i]->end_unit(); # in case the child aborted abnormally, push the remaining # urls to be checked onto the stack push @{$self->{to_check}}, @{$active_units[$i]->{to_check}}; splice ( @active_units, $i, 1 ); next; }; }; $i++; }; # protect against wild looping $wild_protect || die "Error in verifier, looping wildly"; }; $self->{end_time} = time; } sub get_stats { #------------------------------------------------------------ # Return stats for the thread information. # my $self = shift; return [ $self->{threads_spawned}, $self->{end_time} - $self->{start_time}, $self->{thread_stats} ]; } sub new_work_unit { #------------------------------------------------------------ # allocates a new work unit for the chilren # there are some optimization routines that should at some # point be implemented (for better allocation of # work units # my $self = shift; my $num_units = $#{$self->{to_check}}+1; my $max_children= $self->{max_children}; my $unit_size = int ( $num_units / ( $max_children + 1 ) ); ($unit_size > $self->{max_workunit}) and $unit_size = $self->{max_workunit}; ($unit_size < $self->{min_workunit}) and $unit_size = $self->{min_workunit}; ($unit_size > $num_units) and $unit_size = $num_units; my @to_check = @{$self->{to_check}}[0..$unit_size-1]; splice ( @{$self->{to_check}}, 0, $unit_size ); $self->{threads_spawned}++; return Links::Parallel::WorkUnit->new ( $self->{path_to_perl}, $self->{child_path}, $self->{child_args}, \@to_check ); } sub fhbits { #------------------------------------------------------------ # to set the fhandle bits for the impending select call # my ($work_units, $bits) = @_; defined $bits or ($bits = ''); foreach (@$work_units) { vec($bits,$_->fno(),1) = 1; }; return $bits; } ##################################################### package Links::Parallel::WorkUnit; use FileHandle; use strict; my $clwork_units = 0; sub new { #------------------------------------------------------------ # creates a new work unit, starts up the child process # and encapsulats all the required data..., # my ($class, $perlpath, $child, $cmdline, $to_check, $verbosity) = @_; my $self = {}; $self->{verbosity} = $verbosity || 1; ($self->{istream}, $self->{pid}) = new_handle ( $perlpath, $child, $cmdline, $to_check ); @{$self->{to_check}} = @$to_check; @{$self->{checked}} = (); $self->{unitid} = $clwork_units++; $self->{start_time} = time; $self->{number_checked} = 0; bless $self, $class; return $self; } sub new_handle { #------------------------------------------------------------ # the function that actually creates a new child process # my ($perlpath, $child, $cmdline, $to_check, $verbosity) = @_; $verbosity ||= 1; my $newfh = new FileHandle; my $pid = 0; $cmdline ||= ''; $, = "|"; if ($verbosity) { print "Launching new child ... "; } if (-e $child) { $pid = $newfh->open ( "$perlpath $child $cmdline @$to_check |" ); if ((!$pid)or($?)) { die "Error launching child '$perlpath $child $cmdline'. Status: $?"; } } else { die "Child ($child) must exist"; } print "ok ($pid)\n"; return ( $newfh, $pid ); } sub fno { #------------------------------------------------------------ # returns the file handle, useful when using the # "select" call # my $self = shift; return fileno ( $self->{istream} ); } sub has_input { #------------------------------------------------------------ # returns whether or not this workunit has anything to # report to the parent # my $self = shift; my $rin = ''; vec ( $rin, $self->fno(), 1 ) = 1; my $s = select ( $rin, undef, undef, 0 ); return $s; } sub get_input { #------------------------------------------------------------ # process the local input. # this is only here because we want to make sure that # the work unit keeps track of it's own work pool # this frees the task administrator to do it's real # work and helps with crash recovery # my $self = shift; my $fh = $self->{istream}; $fh || die "not defined!"; my $str = <$fh>; if ( defined ( $str ) ) { chop $str; $str =~ /\s*([0-9]+)\t([-0-9]*)\t(.*)/; push @{$self->{checked}}, $1; splice @{$self->{to_check}}, 0, 1; $self->{number_checked}++; return ( $1, $2, $3 ); } else { $self->end_unit (); return; }; } sub get_stats { #------------------------------------------------------------ # Display statistic information. # my $self = shift; return ( $self->{unitid}, $self->{start_time}, $self->{number_checked}, time-$self->{start_time} ); } sub end_unit { #------------------------------------------------------------ # prepares the WorkUnit for deallocation. Note how # there is a force -9 kill, without that, perl will wait # until the child finishes on it's own, which might be # soon, later or in a 100 years # my $self = shift; my $fh = $self->{istream}; kill 9, ( $self->{pid} ); $self->{istream}->close (); } sub DESTROY { #------------------------------------------------------------ # deallocs the object # we want perl to force kill the child so we can ensure we exit # quickly my $self = shift; $self->end_unit; } 1;