# ================================================================== # Gossamer Threads Module Library - http://gossamer-threads.com/ # # GT::SQL::Driver::PG # CVS Info : 087,071,086,086,085 # $Id: PG.pm,v 2.3 2005/10/06 00:05:51 jagerman Exp $ # # Copyright (c) 2004 Gossamer Threads Inc. All Rights Reserved. # ================================================================== # # Description: PostgreSQL driver for GT::SQL # package GT::SQL::Driver::PG; # ==================================================================== use strict; use vars qw/@ISA $ERROR_MESSAGE/; use GT::SQL::Driver; use GT::AutoLoader; use DBI(); $ERROR_MESSAGE = 'GT::SQL'; @ISA = qw/GT::SQL::Driver/; sub protocol_version { 2 } sub connect { my $self = shift; my $dbh = $self->SUPER::connect(@_) or return; # This is really a hack to get things working somewhat accurately - ideally # all data should be in UTF8, but GT::SQL and our products do not yet have # any provision for such, and inserting iso8859-1 data into a unicode table # causes fatal errors about invalid utf8 sequences. So, we set it to # latin1 here in the hopes that it won't break too much, and let the # application deal with it. There are still inherent problems here, # however - if the database is latin5, for example, setting this to latin1 # would make postgresql attempt to convert from latin1 -> latin5 on input # and convert back on output, which is a potentially lossy conversion. $dbh->do("SET NAMES 'LATIN1'"); return $dbh; } sub dsn { # ----------------------------------------------------------------------------- # Creates a postgres-specific DSN, such as: # DBI:Pg:dbname=database;host=some_hostname # host is omitted if set to 'localhost', so that 'localhost' can be used for a # non-network connection. If you really want to connect to localhost, use # 127.0.0.1. # my ($self, $connect) = @_; $connect->{driver} ||= 'Pg'; $connect->{host} ||= 'localhost'; $self->{driver} = $connect->{driver}; my $dsn = "DBI:$connect->{driver}:"; $dsn .= "dbname=$connect->{database}"; $dsn .= ";host=$connect->{host}" unless $connect->{host} eq 'localhost'; $dsn .= ";port=$connect->{port}" if $connect->{port}; return $dsn; } sub hints { prefix_indexes => 1, fix_index_dbprefix => 1, case_map => 1, ai => sub { my ($table, $column) = @_; my $seq = "${table}_seq"; my @q; push @q, \"DROP SEQUENCE $seq"; push @q, "CREATE SEQUENCE $seq INCREMENT 1 START 1"; \@q; }, drop_pk_constraint => 1 } $COMPILE{_version} = __LINE__ . <<'END_OF_SUB'; sub _version { my $self = shift; return $self->{pg_version} if $self->{pg_version}; my $ver = $self->{dbh}->get_info(18); # SQL_DBMS_VERSION if ($ver) { local $^W; $ver = sprintf "%.2f", $ver; } return $self->{pg_version} = $ver; } END_OF_SUB sub _prepare_select { # ----------------------------------------------------------------------------- # Rewrite MySQL-style LIMIT y,x into PG's nicer LIMIT x OFFSET y format # my ($self, $query) = @_; $query =~ s/\bLIMIT\s+(\d+)\s*,\s*(\d+)/LIMIT $2 OFFSET $1/i; $query; } sub _prepare_describe { # ------------------------------------------------------------------ # Postgres-specific describe code # my ($self, $query) = @_; $query =~ /DESCRIBE\s*(\w+)/i or return $self->warn(CANTPREPARE => $query, "Invalid describe query: $query"); # atttypmod contains the scale and precision, but has to be extracted using bit operations: my $prec_bits = 2**26-2**15-1; # bits 16 through 26 give the precision (given a max prec of 1000) my $scale_bits = 2**10-1; # bits 1 through 10 give the scale + 4 (given a max scale of 1000) <>16) || ',' || ((a.atttypmod & $scale_bits)-4) || ')' ELSE t.typname END AS "Type", CASE WHEN a.attnotnull = 't' THEN '' ELSE 'YES' END AS "Null", ( SELECT CASE WHEN adsrc SIMILAR TO '''%''::[a-zA-Z0-9]+' THEN substring(adsrc from '''#"%#"''::[a-zA-Z0-9]+' for '#') WHEN adsrc SIMILAR TO '[0-9.e+-]+' THEN adsrc ELSE NULL END FROM pg_attrdef WHERE adrelid = c.relfilenode AND adnum = a.attnum ) AS "Default", ( SELECT CASE WHEN d.adsrc LIKE 'nextval(%)' THEN 'auto_increment' ELSE '' END FROM pg_attrdef d WHERE d.adrelid = c.relfilenode AND adnum = a.attnum ) AS "Extra" FROM pg_class c, pg_attribute a, pg_type t WHERE a.atttypid = t.oid AND a.attrelid = c.oid AND relkind = 'r' AND a.attnum > 0 AND c.relname = '\L$1\E' ORDER BY a.attnum QUERY # The following could be used above for Key - but it's left off because SHOW # INDEX is much more useful: # ( # SELECT CASE WHEN COUNT(*) >= 1 THEN 'PRI' ELSE '' END # FROM pg_index keyi, pg_class keyc, pg_attribute keya # WHERE keyi.indexrelid = keyc.oid AND keya.attrelid = keyc.oid and keyi.indrelid = c.oid # and indisprimary = 't' and keya.attname = a.attname # ) AS "Key", } sub column_exists { my ($self, $table, $column) = @_; my $sth = $self->{dbh}->prepare(< 0 AND c.relname = ? AND a.attname = ? EXISTS $sth->execute(lc $table, lc $column); return scalar $sth->fetchrow; } sub _prepare_show_tables { # ----------------------------------------------------------------------------- # pg-specific 'SHOW TABLES'-equivelant # <<' QUERY'; SELECT relname AS tables FROM pg_class WHERE relkind = 'r' AND NOT (relname LIKE 'pg_%' OR relname LIKE 'sql_%') ORDER BY relname QUERY } sub _prepare_show_index { # ----------------------------------------------------------------------------- # Get index list # my ($self, $query) = @_; unless ($query =~ /^\s*SHOW\s+INDEX\s+FROM\s+(\w+)\s*$/i) { return $self->warn(CANTPREPARE => $query, "Invalid/unsupported SHOW INDEX query: $query"); } <<" QUERY"; SELECT c.relname AS index_name, attname AS index_column, CASE WHEN indisunique = 't' THEN 1 ELSE 0 END AS index_unique, CASE WHEN indisprimary = 't' THEN 1 ELSE 0 END AS index_primary FROM pg_index i, pg_class c, pg_class t, pg_attribute a WHERE i.indexrelid = c.oid AND a.attrelid = c.oid AND i.indrelid = t.oid AND t.relname = '\L$1\E' ORDER BY i.indexrelid, a.attnum QUERY } sub drop_table { # ----------------------------------------------------------------------------- # Drops the table passed in - drops a sequence if needed. Takes a second # argument that, if true, causes the sequence _not_ to be dropped - used when # the table is being recreated. # my ($self, $table) = @_; my $sth = $self->{dbh}->prepare("SELECT relname FROM pg_class WHERE relkind = 'S' AND relname = '\L$table\E_seq'"); $sth->execute(); if (my $seq_name = $sth->fetchrow) { $self->do("DROP SEQUENCE $seq_name") or $self->warn(CANTEXECUTE => "DROP SEQUENCE $seq_name", $GT::SQL::error); } return $self->SUPER::drop_table($table); } sub drop_column { # ------------------------------------------------------------------- # Drops a column from a table. # my ($self, $table, $column) = @_; my $ver = $self->_version(); # Postgresql 7.3 and above support ALTER TABLE $table DROP $column return $self->SUPER::drop_column($table, $column) if $ver and $ver >= 7.03; $self->_recreate_table(); } $COMPILE{_recreate_table} = __LINE__ . <<'END_OF_SUB'; sub _recreate_table { # ----------------------------------------------------------------------------- # Adds/removes/changes a column, but very expensively as it involves recreating # and copying the entire table. Takes argument pairs, currently: # # with => 'adding_this_column' # optional # # Keep in mind that the various columns depend on the {cols} hash of the table # having been updated to reflect the change. # # We absolutely require DBI 1.20 in this subroutine for transaction support. # However, we won't get here if using PG >= 7.3, so you can have either an # outdated PG, or an outdated DBI, but not both. # my ($self, %opts) = @_; DBI->require_version(1.20); my $ver = $self->_version; my $table = $self->{name} or $self->fatal(BADARGS => 'No table specified'); my $cols = $self->{schema}->{cols}; my %pos = map { $_ => $cols->{$_}->{pos} } keys %$cols; my (@copy_cols, @select_cols); for (keys %$cols) { push @copy_cols, "$_ " . $self->column_sql($cols->{$_}); push @select_cols, $_; } if ($opts{with}) { # a column was added, so we can't select it from the old table @select_cols = grep $_ ne $opts{with}, @select_cols; } $self->{dbh}->begin_work; my $temptable = "GTTemp" . substr(time, -4) . int rand 10000; my $select_cols = join ', ', @select_cols; my $lock = "LOCK TABLE $table"; my $createtemp = "CREATE TABLE $temptable AS SELECT * FROM $table"; my $insert = "INSERT INTO $table ( $select_cols ) SELECT $select_cols FROM $temptable"; my $drop_temp = "DROP TABLE $temptable"; for my $precreate ($lock, $createtemp) { unless ($self->{dbh}->do($precreate)) { $self->warn(CANTEXECUTE => $precreate => $DBI::errstr); $self->{dbh}->rollback; return undef; } } unless ($self->drop_table($table)) { $self->{dbh}->rollback; return undef; } unless ($self->create_table) { $self->{dbh}->rollback; return undef; } for my $postcreate ($insert, $drop_temp) { unless ($self->{dbh}->do($postcreate)) { $self->warn(CANTEXECUTE => $postcreate => $DBI::errstr); $self->{dbh}->rollback; return undef; } } $self->{dbh}->commit; return 1; } END_OF_SUB sub alter_column { # ----------------------------------------------------------------------------- # Changes a column in a table. The actual path done depends on multiple # things, including your version of postgres. The following are supported # _without_ recreating the table; anything more complicated requires the table # be recreated via _recreate_table(). # # - changing/dropping a default, with >= 7.0 (doesn't require DBI >= 1.20, # everything else does) # - adding/dropping a not null contraint, with >= 7.3 # - any other changes, with >= 7.3, by adding a new column, copying data into # it, dropping the old column # # Anything else calls _recreate_table(), which also requires DBI 1.20, but is # much more involved as the table has to be dropped and recreated. # my ($self, $table, $column, $new_def, $old_col) = @_; my $ver = $self->_version; return $self->_recreate_table() if $ver < 7; my $cols = $self->{schema}->{cols}; my $new_col = $cols->{$column}; my @onoff = qw/not_null/; # true/false attributes my @changeable = qw/default size scale precision/; # changeable attributes my %add = map { ($new_col->{$_} and not $old_col->{$_}) ? ($_ => 1) : () } @onoff; my %rem = map { ($old_col->{$_} and not $new_col->{$_}) ? ($_ => 1) : () } @onoff; my %change = map { ( exists $new_col->{$_} and exists $old_col->{$_} # exists in both old and new and ( defined($new_col->{$_}) ne defined($old_col->{$_}) # one is undef, the other isn't or defined $new_col->{$_} and defined $old_col->{$_} and $new_col->{$_} ne $old_col->{$_} # both are defined, but != ) ) ? ($_ => 1) : () } @changeable; { my %add_changeable = map { (exists $new_col->{$_} and not exists $old_col->{$_}) ? ($_ => 1) : () } @changeable; my %rem_changeable = map { (exists $old_col->{$_} and not exists $new_col->{$_}) ? ($_ => 1) : () } @changeable; %add = (%add, %add_changeable); %rem = (%rem, %rem_changeable); } if ($ver < 7.03) { # In 7.0 - 7.2, defaults can be added/dropped/changed, but anything # more complicated needs a table recreation if ( keys %change == 1 and exists $change{default} and not keys %add and not keys %rem # Changed a default or keys %add == 1 and exists $add{default} and not keys %change and not keys %rem # Added a default or keys %rem == 1 and exists $rem{default} and not keys %change and not keys %add # Dropped a default ) { my $query = "ALTER TABLE $table ALTER COLUMN $column "; my $ph; if ($add{default} or $change{default}) { $query .= "SET DEFAULT ?"; $ph = $new_col->{default}; } else { $query .= "DROP DEFAULT"; } $self->{dbh}->do($query, defined $ph ? (undef, $ph) : ()) or return $self->warn(CANTEXECUTE => $query => $DBI::errstr); return 1; } return $self->_recreate_table(); } # PG 7.3 or later if ( keys %rem == 1 and $rem{not_null} and not keys %add and not keys %change # DROP NOT NULL or keys %add == 1 and $add{not_null} and not keys %rem and not keys %change # SET NOT NULL ) { # All we're doing is changing a not_null constraint my $query = "ALTER TABLE $table ALTER COLUMN $column "; $query .= $rem{not_null} ? 'DROP' : 'SET'; $query .= ' NOT NULL'; $self->{dbh}->do($query) or return $self->warn(CANTEXECUTE => $query => $DBI::errstr); return 1; } if (keys(%change) - ($change{default} ? 1 : 0) - (($ver >= 8 and $change{type}) ? 1 : 0) == 0 # No changes other than 'default' (and type, for PG >= 8) and keys(%add) - ($add{default} ? 1 : 0) - ($add{not_null} ? 1 : 0) == 0 # No adds other than default or not_null and keys(%rem) - ($rem{default} ? 1 : 0) - ($rem{not_null} ? 1 : 0) == 0 # No rems other than default or not_null ) { my @query; # Change type (PG 8+ only) if ($ver >= 8 and $change{type}) { push @query, "ALTER TABLE $table ALTER COLUMN $column TYPE $new_col->{type}"; } # Change default if ($add{default} or $change{default}) { push @query, ["ALTER TABLE $table ALTER COLUMN $column SET DEFAULT ?", $new_col->{default}]; } elsif ($rem{default}) { push @query, "ALTER TABLE $table ALTER COLUMN $column DROP DEFAULT"; } # Change not_null if ($rem{not_null}) { push @query, "ALTER TABLE $table ALTER COLUMN $column DROP NOT NULL"; } elsif ($add{not_null}) { if ($add{default}) { push @query, ["UPDATE $table SET $column = ? WHERE $column IS NULL", $new_col->{default}]; } push @query, "ALTER TABLE $table ALTER COLUMN $column SET NOT NULL"; } return $self->do_raw_transaction(@query); } # We've got more complex changes than PG's ALTER COLUMN can handle; we need # to add a new column, copy the data, drop the old column, and rename the # new one to the old name. my (@queries, %index, %unique); push @queries, "LOCK TABLE $table"; my %add_def = %$new_col; my $not_null = delete $add_def{not_null}; my $default = delete $add_def{default}; my $add_def = $self->column_sql(\%add_def); my $tmpcol = 'GTTemp' . substr(time, -4) . int(rand 10000); push @queries, "ALTER TABLE $table ADD COLUMN $tmpcol $add_def"; push @queries, "UPDATE $table SET $tmpcol = $column"; push @queries, ["UPDATE $table SET $tmpcol = ? WHERE $tmpcol IS NULL", $default] if $add{not_null} and defined $default; push @queries, ["ALTER TABLE $table ALTER COLUMN $tmpcol SET DEFAULT ?", $default] if defined $default; push @queries, "ALTER TABLE $table ALTER COLUMN $tmpcol SET NOT NULL" if $not_null; push @queries, "ALTER TABLE $table DROP COLUMN $column"; push @queries, "ALTER TABLE $table RENAME COLUMN $tmpcol TO $column"; for my $type (qw/index unique/) { while (my ($index, $columns) = each %{$new_col->{$type}}) { my $recreate; for (@$columns) { if ($_ eq $column) { $recreate = 1; last; } } next unless $recreate; if ($type eq 'index') { $index{$index} = $columns; } else { $unique{$index} = $columns; } } } $self->do_raw_transaction(@queries); while (my ($index, $columns) = each %index) { $self->create_index($table, $index, @$columns); } while (my ($index, $columns) = each %unique) { $self->create_unique($table, $index, @$columns); } 1; } sub add_column { # ----------------------------------------------------------------------------- # Adds a new column to the table. # my ($self, $table, $column, $def) = @_; # make a copy so the original reference doesn't get clobbered my %col = %{$self->{schema}->{cols}->{$column}}; # Defaults and not_null have to be set _after_ adding the column. my $default = delete $col{default}; my $not_null = delete $col{not_null}; my $ver = $self->_version; return $self->_recreate_table(with => $column) if $ver < 7 and defined $default or $ver < 7.03 and $not_null; my @queries; if (defined $default or $not_null) { $def = $self->column_sql(\%col); } push @queries, ["ALTER TABLE $table ADD $column $def"]; push @queries, ["ALTER TABLE $table ALTER COLUMN $column SET DEFAULT ?", $default] if defined $default; push @queries, ["UPDATE $table SET $column = ?", $default] if defined $default and $not_null; push @queries, ["ALTER TABLE $table ALTER COLUMN $column SET NOT NULL"] if $not_null; $self->do_raw_transaction(@queries); } sub create_pk { my ($self, $table, @cols) = @_; my $ver = $self->_version; if ($ver < 7.2) { return $self->do("ALTER TABLE $table ADD PRIMARY KEY (" . join(',', @cols) . ")"); } else { # ALTER TABLE ... ADD PRIMARY KEY (...) was added in PG 7.2 - on prior # versions we have to recreate the entire table. return $self->_recreate_table(); } } sub drop_pk { # ----------------------------------------------------------------------------- # Drop a primary key. Look for the primary key, then call drop_index with it. # my ($self, $table) = @_; my $sth = $self->prepare("SHOW INDEX FROM $table") or return; $sth->execute or return; my $pk_name; while (my $index = $sth->fetchrow_hashref) { if ($index->{index_primary}) { $pk_name = $index->{index_name}; last; } } $pk_name or return $self->warn(CANTEXECUTE => "ALTER TABLE $table DROP PRIMARY KEY" => "No primary key found for $table"); $self->do("ALTER TABLE $table DROP CONSTRAINT $pk_name"); } sub ai_insert { my ($self, $ai) = @_; return $ai, "NEXTVAL('$self->{name}_seq')"; } sub insert_multiple { # ----------------------------------------------------------------------------- # Performs multiple insertions in a single transaction, for much better speed. # my $self = shift; # ->begin_work and ->commit were not added until 1.20 return $self->SUPER::insert_multiple(@_) if $DBI::VERSION < 1.20; $self->{dbh}->begin_work; my ($cols, $args) = @_; my $names = join ",", @$cols, $self->{schema}->{ai} || (); my $ret; my $ai_insert = $self->{schema}->{ai} ? "NEXTVAL('$self->{name}_seq')" : undef; my $query = "INSERT INTO $self->{name} ($names) VALUES (" . join(',', ('?') x @$cols, $ai_insert || ()) . ')'; my $sth = $self->{dbh}->prepare($query) or return $self->warn(CANTPREPARE => $query); for (@$args) { if ($sth->execute(@$_)) { ++$ret; } else { $self->warn(CANTEXECUTE => $query); } } $self->{dbh}->commit; $ret; } sub quote { # ----------------------------------------------------------------------------- # This subroutines quotes (or not) a value. Postgres can't handle any text # fields containing null characters, so this has to go beyond the ordinary # quote() in GT::SQL::Driver by stripping out null characters. # my $val = pop; return 'NULL' if not defined $val; return $$val if ref $val eq 'SCALAR' or ref $val eq 'LVALUE'; $val =~ y/\x00//d; (values %GT::SQL::Driver::CONN)[0]->quote($val); } package GT::SQL::Driver::PG::sth; # ==================================================================== use strict; use vars qw/@ISA $ERROR_MESSAGE/; use GT::SQL::Driver; use GT::AutoLoader; $ERROR_MESSAGE = 'GT::SQL'; @ISA = qw/GT::SQL::Driver::sth/; sub insert_id { # ------------------------------------------------------------------- # Retrieves the current sequence. # my $self = shift; my ($table) = $self->{query} =~ /\s*insert\s*into\s*(\w+)/i; $table ||= $self->{name}; my $query = "SELECT CURRVAL('${table}_seq')"; my $sth = $self->{dbh}->prepare($query) or return $self->fatal(CANTPREPARE => $query => $DBI::errstr); $sth->execute or return $self->fatal(CANTEXECUTE => $query => $DBI::errstr); my $id = $sth->fetchrow; return $id; } # ------------------------------------------------------------------------------------------------ # # DATA TYPE MAPPINGS # ------------------------------------------------------------------------------------------------ # package GT::SQL::Driver::PG::Types; # =============================================================== use strict; use GT::SQL::Driver::Types; use Carp qw/croak/; use vars qw/@ISA/; @ISA = 'GT::SQL::Driver::Types'; sub DATETIME { $_[0]->base($_[1], 'TIMESTAMP WITHOUT TIME ZONE') } sub TIMESTAMP { $_[0]->base($_[1], 'TIMESTAMP WITHOUT TIME ZONE') } sub TIME { $_[0]->base($_[1], 'TIME WITHOUT TIME ZONE') } sub YEAR { croak "PostgreSQL does not support 'YEAR' columns" } # Postgres doesn't have BLOB's, but has a binary 'BYTEA' type - the one (big) # caveat to this type, however, is that it requires escaping for any input, and # unescaping for any output. 1;