# Federated Computer, Inc. # David Sainty 2024 A.D. # Gossamer Threads to Discourse -- Migration-Import Script # v0.48.2 batch size 1000, moving to local instance of MariaDB for slowtwitch db require 'mysql2' require 'open-uri' require 'net/http' require 'tempfile' require 'sqlite3' require 'digest' require 'fileutils' require 'csv' require 'time' require 'concurrent' require 'sys/proctable' require 'active_record' require 'connection_pool' require File.expand_path("../../../../config/environment", __FILE__) # require_relative '../base' require File.expand_path("../../../../script/import_scripts/base", __FILE__) class GossamerForumsImporter < ImportScripts::Base def initialize super begin # Database configuration for ActiveRecord ActiveRecord::Base.establish_connection( adapter: 'postgresql', database: 'discourse', username: 'discourse', password: "nhB5FWhQkjdvaD2ViRNO63dQagDnzaTn", host: '10.0.0.2', pool: 20, # Adjust based on concurrency needs timeout: 5000 ) # Initialize MySQL client to connect to Gossamer Forums database @mysql_client = Mysql2::Client.new( host: "slowtwitch.northend.network", username: "admin", password: "yxnh93Ybbz2Nm8#mp28zCVv", database: "slowtwitch" ) # # Initialize MySQL client to connect to Gossamer Forums database # @mysql_client = Mysql2::Client.new( # host: "172.99.0.10", # username: "admin", # password: "x0YGLA9252iiTFQuqaM0ROX8FmQzZuUu", # database: "slowtwitch" # ) rescue Mysql2::Error => e puts "Error connecting to MySQL: #{e.message}" exit 1 end # # Create a mapping of old Gossamer user IDs to new Discourse user IDs # @user_id_map = {} initialize_sqlite_id_name_url_db end def initialize_sqlite_id_name_url_db @db = SQLite3::Database.new '/bitnami/discourse/sqlite/id_name_url_map.db' ###### ONLY when we need to clear the url_map and topic_import_status .... e.g. if reimporting topics-posts from scratch # @db.execute <<-SQL # DROP TABLE IF EXISTS url_map; # SQL # @db.execute <<-SQL # DROP TABLE IF EXISTS topic_last_post_time; # SQL # @db.execute <<-SQL # DROP TABLE IF EXISTS topic_post_count; # SQL # @db.execute <<-SQL # DROP TABLE IF EXISTS user_topic_count; # SQL # @db.execute <<-SQL # DROP TABLE IF EXISTS user_post_count; # SQL # @db.execute <<-SQL # DROP TABLE IF EXISTS topic_last_post_user; # SQL # @db.execute <<-SQL # DROP TABLE IF EXISTS topic_post_numbers; # SQL # @db.execute <<-SQL # DROP TABLE IF EXISTS highest_processed_post_id; # SQL # @db.execute <<-SQL # DROP TABLE IF EXISTS topic_import_status; # SQL ####### # USER IMPORT - map of old to new user ids, used for topic-post import. @db.execute <<-SQL CREATE TABLE IF NOT EXISTS user_id_map ( old_user_id INTEGER PRIMARY KEY, new_user_id INTEGER ); SQL # CATEGORY IMPORT - map of old to new category ids, used for topic-post import. @db.execute <<-SQL CREATE TABLE IF NOT EXISTS category_id_map ( old_category_id INTEGER PRIMARY KEY, new_category_id INTEGER ); SQL # USER IMPORT - map of old to new usernames for SENDING MIGRATION EMAIL @db.execute <<-SQL CREATE TABLE IF NOT EXISTS username_map ( id INTEGER PRIMARY KEY, old_username TEXT, new_username TEXT, email TEXT, real_name TEXT ); SQL # POST IMPORT - Generate a map of old_post_id (Gossamer) and the new URL for a WEB SERVER REDIRECT FILE @db.execute <<-SQL CREATE TABLE IF NOT EXISTS url_map ( old_post_id INTEGER PRIMARY KEY, new_url TEXT, title TEXT ); SQL # POST IMPORT - For each topic, the time of the last / most recent post/reply @db.execute <<-SQL CREATE TABLE IF NOT EXISTS topic_last_post_time ( topic_id INTEGER PRIMARY KEY, last_post_time INTEGER ); SQL # POST IMPORT - For each topic, increment post_count as we add posts. @db.execute <<-SQL CREATE TABLE IF NOT EXISTS topic_post_count ( topic_id INTEGER PRIMARY KEY, post_count INTEGER DEFAULT 0 ); SQL # POST IMPORT - For each user (_id), increment topic_count as we add topics (to see total topics per user) @db.execute <<-SQL CREATE TABLE IF NOT EXISTS user_topic_count ( user_id INTEGER PRIMARY KEY, topic_count INTEGER DEFAULT 0 ); SQL # POST IMPORT - For each user (_id), increment post_count as we add posts (to see total posts per user) @db.execute <<-SQL CREATE TABLE IF NOT EXISTS user_post_count ( user_id INTEGER PRIMARY KEY, post_count INTEGER DEFAULT 0 ); SQL # POST IMPORT - For each topic, the user_id for the last poster / replier @db.execute <<-SQL CREATE TABLE IF NOT EXISTS topic_last_post_user ( topic_id INTEGER PRIMARY KEY, user_id INTEGER ); SQL # POST IMPORT - The number of posts in a given topic, incremented as we add a new reply post to a topic. @db.execute <<-SQL CREATE TABLE IF NOT EXISTS topic_post_numbers ( topic_id INTEGER PRIMARY KEY, post_number INTEGER DEFAULT 0 ); SQL # POST IMPORT - Record perssitent integer value for highest processed post id -- not used @db.execute <<-SQL CREATE TABLE IF NOT EXISTS highest_processed_post_id ( id INTEGER PRIMARY KEY CHECK (id = 1), post_id INTEGER ); SQL # PERSONAL MESSAGE IMPORT - Record perssitent integer value for highest processed personal id @db.execute <<-SQL CREATE TABLE IF NOT EXISTS highest_processed_personal_id ( id INTEGER PRIMARY KEY CHECK (id = 1), personal_id INTEGER ); SQL # POST IMPORT - For each topic (topic post ID) record status 0 fail or 1 success @db.execute <<-SQL CREATE TABLE IF NOT EXISTS topic_import_status ( post_id INTEGER PRIMARY KEY, status INTEGER DEFAULT 0 ); SQL end def insert_user_id_mapping(old_user_id, new_user_id) @db.execute "INSERT OR REPLACE INTO user_id_map (old_user_id, new_user_id) VALUES (?, ?)", old_user_id, new_user_id end def fetch_user_id_mapping(old_user_id) @db.get_first_value "SELECT new_user_id FROM user_id_map WHERE old_user_id = ?", old_user_id end def insert_category_id_mapping(old_category_id, new_category_id) @db.execute "INSERT OR REPLACE INTO category_id_map (old_category_id, new_category_id) VALUES (?, ?)", old_category_id, new_category_id end def fetch_category_id_mapping(old_category_id) @db.get_first_value "SELECT new_category_id FROM category_id_map WHERE old_category_id = ?", old_category_id end def insert_username_mapping(old_username, new_username, email, real_name) @db.execute "INSERT OR REPLACE INTO username_map (old_username, new_username, email, real_name) VALUES (?, ?, ?, ?)", old_username, new_username, email, real_name end # Define a method to export the username mapping table to a CSV file def export_username_mapping_to_csv(filename) CSV.open(filename, 'w') do |csv| # Add headers csv << ['Old Username', 'New Username', 'Email', 'Full Name'] # Fetch data from the database @db.execute("SELECT old_username, new_username, email, real_name FROM username_map") do |row| csv << row end end puts "Exported changed username mappings to #{filename}" end # Insert a URL mapping into the SQLite database def insert_url_mapping(old_post_id, new_url, title) @db.execute "INSERT OR REPLACE INTO url_map (old_post_id, new_url, title) VALUES (?, ?, ?)", [old_post_id, new_url, title] end # Export the URL mappings to a CSV file def export_url_mapping_to_csv(filename) CSV.open(filename, "w") do |csv| # Add headers csv << ["Old Post ID", "New URL", "Title"] @db.execute("SELECT old_post_id, new_url, title FROM url_map") do |row| csv << row end end puts "Exported URL mappings to #{filename}" end # Method to create Nginx rewrite rules file def create_nginx_rewrite_rules(filename) File.open(filename, "w") do |file| @db.execute("SELECT old_post_id, new_url FROM url_map") do |row| old_post_id, new_url = row file.puts "rewrite ^/forum/.*P#{old_post_id}/$ #{new_url} permanent;" end end end # Fetch the highest old_post_id from the url_map table def fetch_highest_old_post_id @db.get_first_value "SELECT MAX(old_post_id) FROM url_map" end # Helper methods to interact with the SQLite database for persisting topic-post related values def fetch_db(table, key) @db.get_first_value "SELECT value FROM #{table} WHERE key = ?", key end def update_db(table, key, value) @db.execute "INSERT OR REPLACE INTO #{table} (key, value) VALUES (?, ?)", key, value end def fetch_db_topic_last_post_time(topic_id) @db.get_first_value "SELECT last_post_time FROM topic_last_post_time WHERE topic_id = ?", topic_id end def fetch_db_topic_last_post_user(topic_id) @db.get_first_value "SELECT user_id FROM topic_last_post_user WHERE topic_id = ?", topic_id end def fetch_db_topic_post_count(topic_id) @db.get_first_value "SELECT post_count FROM topic_post_count WHERE topic_id = ?", topic_id end def fetch_db_user_topic_count(user_id) @db.get_first_value "SELECT topic_count FROM user_topic_count WHERE user_id = ?", user_id end def fetch_db_user_post_count(user_id) @db.get_first_value "SELECT post_count FROM user_post_count WHERE user_id = ?", user_id end def fetch_db_topic_post_numbers(topic_id) @db.get_first_value "SELECT post_number FROM topic_post_numbers WHERE topic_id = ?", topic_id end def update_db_topic_last_post_time(topic_id, last_post_time) @db.execute "INSERT OR REPLACE INTO topic_last_post_time (topic_id, last_post_time) VALUES (?, ?)", topic_id, last_post_time end def update_db_topic_last_post_user(topic_id, user_id) @db.execute "INSERT OR REPLACE INTO topic_last_post_user (topic_id, user_id) VALUES (?, ?)", topic_id, user_id end def update_db_topic_post_count(topic_id, post_count) @db.execute "INSERT OR REPLACE INTO topic_post_count (topic_id, post_count) VALUES (?, ?)", topic_id, post_count end def update_db_user_topic_count(user_id, topic_count) @db.execute "INSERT OR REPLACE INTO user_topic_count (user_id, topic_count) VALUES (?, ?)", user_id, topic_count end def update_db_user_post_count(user_id, post_count) @db.execute "INSERT OR REPLACE INTO user_post_count (user_id, post_count) VALUES (?, ?)", user_id, post_count end def update_db_topic_post_numbers(topic_id, post_number) @db.execute "INSERT OR REPLACE INTO topic_post_numbers (topic_id, post_number) VALUES (?, ?)", topic_id, post_number end # Fetch the highest processed post_id from the highest_processed_post_id table def fetch_highest_processed_post_id @db.get_first_value "SELECT post_id FROM highest_processed_post_id WHERE id = 1" end # Update the highest processed post_id in the highest_processed_post_id table def update_highest_processed_post_id(post_id) @db.execute "INSERT OR REPLACE INTO highest_processed_post_id (id, post_id) VALUES (1, ?)", post_id end # Fetch the highest processed personal_id from the highest_processed_personal_id table def fetch_highest_processed_personal_id @db.get_first_value "SELECT personal_id FROM highest_processed_personal_id WHERE id = 1" end # Update the highest processed personal_id in the highest_processed_personal_id table def update_highest_processed_personal_id(personal_id) @db.execute "INSERT OR REPLACE INTO highest_processed_personal_id (id, personal_id) VALUES (1, ?)", personal_id end # Check if post_id exists and its status def fetch_post_status(post_id) result = @db.execute("SELECT status FROM topic_import_status WHERE post_id = ?", post_id).flatten.first result.nil? ? nil : result.to_i end # Mark post_id as complete def mark_post_as_complete(post_id) @db.execute("INSERT OR REPLACE INTO topic_import_status (post_id, status) VALUES (?, 1)", post_id) end # Mark post_id as failed def mark_post_as_failed(post_id) @db.execute("INSERT OR REPLACE INTO topic_import_status (post_id, status) VALUES (?, 0)", post_id) end # Execute an SQL query on the Gossamer Forums database def execute_query(query) @mysql_client.query(query, as: :hash) end # Execute an SQL query on the Gossamer Forums database def execute_query_concurrent(query, mysql_client) mysql_client.query(query, as: :hash) end # Sanitize the username to meet Discourse's requirements def sanitize_username(original_username, email, real_name) # original_username = username sanitized_username = original_username.gsub(/[^a-zA-Z0-9._-]/, '_') sanitized_username = "#{sanitized_username}." if sanitized_username.length < 2 # Allow two-character usernames sanitized_username = sanitized_username[0, 20] if sanitized_username.length > 20 firststep_sanitized = sanitized_username existing_user = User.find_by(username: sanitized_username) if existing_user if existing_user.email.downcase == email.downcase && existing_user.name == real_name # The existing user with the username the same as the current proposed sanitised name _is_ the same person... return sanitized_username else # We cannot clobber another person with the same proposed username, so we resolve the conflict counter = 1 while User.exists?(username: sanitized_username) sanitized_username = "#{firststep_sanitized}_#{counter}" sanitized_username = sanitized_username[0, 20] if sanitized_username.length > 20 counter += 1 end end end if original_username != sanitized_username # The Discourse username is not the same as the Gossamer Forums username puts "Sanitized username: '#{original_username}' --> '#{sanitized_username}'" insert_username_mapping(original_username, sanitized_username, email, real_name) # else # puts "UNsanitized username: '#{original_username}' --> '#{sanitized_username}'" end sanitized_username end # Sanitize email to replace restricted domains def sanitize_email(email) restricted_domains = ['mailinator.com', 'example.com'] # Add more restricted domains as needed domain = email.split('@').last if restricted_domains.include?(domain) sanitized_email = email.gsub(domain, 'example.org') # Change to a permissible domain puts "Sanitized email: '#{email}' --> '#{sanitized_email}'" return sanitized_email end email end def generate_unique_title(base_title, user_id, timestamp) unique_title = base_title suffix = 1 puts "Generating unique title for base title: #{base_title}" while Topic.find_by(title: unique_title) # while Topic.where(title: unique_title).exists? # while Topic.exists?(title: unique_title) puts "Title '#{unique_title}' already exists, generating a new one." unique_title = "#{base_title} (#{suffix})" puts "NEW unique_title #{unique_title}" suffix += 1 end puts "Final unique title: #{unique_title}" unique_title end # Helper method to download an attachment / image from a URL def download_attachment(url) begin puts "URL: '#{url}'" URI.open(url, ssl_verify_mode: OpenSSL::SSL::VERIFY_NONE).read rescue OpenURI::HTTPError => e puts "Failed to download attachment from #{url}: #{e.message}" nil rescue URI::InvalidURIError => e puts "Failed to handle invalid URL/URI for #{url}: #{e.message}" nil end end # Helper method to upload an attachment / image to Discourse def upload_attachment(user_id, file, filename, gossamer_url) begin # BAD # upload = Upload.create!( # user_id: user_id, # original_filename: filename, # filesize: file.size, # # filesize: File.size(file.path), # # content_type: `file --brief --mime-type #{file.path}`.strip, # # sha1: Digest::SHA1.file(file.path).hexdigest, # # origin: 'user_avatar', # # retain_hours: nil, # url: gossamer_url # ) # # Error -- non-existent method upload.ensure_consistency! # Use the correct content type and SHA1 hash for the upload # content_type = `file --brief --mime-type #{file.path}`.strip # Generate SHA1 hash for the file sha1 = Digest::SHA1.file(file.path).hexdigest # Ensure the file size is correctly handled file_size = File.size(file.path) puts "Attempting to upload attachment. filename #{filename} user_id #{user_id} file.size #{file.size} file_size #{file_size} sha1 #{sha1} gossamer_url #{gossamer_url}" # Create the upload record, adjusting fields as needed for 3.2.5 compatibility # upload = Upload.create!( upload = Upload.new( user_id: user_id, original_filename: filename, filesize: file.size, sha1: sha1, url: gossamer_url # content_type: content_type, # origin: 'composer', # retain_hours: nil ) # Save the upload object before generating the file path upload.save! # Use FileStore::LocalStore to store the file store = FileStore::LocalStore.new upload.url = store.store_file(file, store.get_path_for('original', upload.id, upload.sha1, File.extname(file.path))) # # Use Discourse's internal method to upload the file # file.rewind # Discourse.store.upload(file, upload.sha1) # # Move the file to the correct location # upload_path = Upload.get_path_for_file(upload.sha1) # FileUtils.mkdir_p(File.dirname(upload_path)) # FileUtils.mv(file.path, upload.path) # Save the upload object upload.save! # # Check if further processing or compatibility handling is needed # if Discourse.respond_to?(:store) # store = Discourse.store # upload.url = store.store_file(file, store.get_path_for('original', upload.id, upload.sha1, File.extname(file.path))) # else # # Fallback for earlier file store methods # upload_path = Upload.get_path_for_file(upload.sha1) # FileUtils.mkdir_p(File.dirname(upload_path)) # FileUtils.mv(file.path, upload_path) # end # Return the upload object upload rescue => e puts "FAILURE: Failed to upload attachment #{filename} for user_id #{user_id}: #{e.message}" puts e.backtrace.join("\n") # Print the full stack trace nil end end # Helper method to upload an attachment / image to Discourse def upload_attachment_two(user_id, file, filename) begin upload = UploadCreator.new(file, filename, origin: 'import').create_for(user_id) # Raise an erorr if the upload fails to catch issues early raise "Upload failed" unless upload upload rescue => e puts "Failed to upload attachment #{filename} for user_id #{user_id}: #{e.message}" puts e.backtrace.join("\n") # Print the full stack trace nil end end # Helper method to handle post attachments def handle_post_attachments(gossamer_post_id, post, user_id, mysql_client) execute_query_concurrent("SELECT * FROM gforum_PostAttachment WHERE post_id_fk = #{gossamer_post_id}", mysql_client).each do |att_row| attachment_url = "https://forum.slowtwitch.com/forum/?do=post_attachment;postatt_id=#{att_row['postatt_id']}" puts "Handling attachment: #{attachment_url}" attachment_data = download_attachment(attachment_url) next unless attachment_data mime_type = att_row['postatt_content'] # Create a Tempfile to pass to the UploadCreator temp_file = Tempfile.new(['attachment', File.extname(att_row['postatt_filename'])]) temp_file.binmode temp_file.write(attachment_data) temp_file.rewind puts "Attempting upload..." upload = upload_attachment_two(user_id, temp_file, att_row['postatt_filename']) next unless upload # Get the URL of the uploaded file from Discourse # upload_url = Upload.get_from_url(upload.url).url upload_url = upload.url puts "Appending to post.raw... #{upload_url} MIME type: #{mime_type}" if mime_type.start_with?('image/') post.raw += "\n![#{att_row['postatt_filename']}](#{upload_url})" else post.raw += "\n[#{att_row['postatt_filename']}](#{upload_url})" end post.save! temp_file.close temp_file.unlink end end # def download_file(url) # require 'open-uri' # begin # file = Tempfile.new # file.binmode # file.write(URI.open(url).read) # file.rewind # file # rescue => e # puts "Failed to download file from #{url}: #{e.message}" # puts e.backtrace.join("\n") # Print the full stack trace # nil # end # end # Helper method to upload an image to Discourse # def upload_image(user, image_data, filename) # return if image_data.nil? # # upload = Upload.create_for(user.id, File.open(image_data.path), filename, 'image/jpeg') # if upload.nil? || !upload.persisted? # puts "Failed to upload image for user #{user.username}" # return # end # # upload # end # Add 'Former_User' account if not already present def add_former_user puts "Adding 'Former User' account if it does not exist..." former_user = User.find_by(username: 'Former_User') if former_user.nil? former_user = User.create!( username: 'Former_User', name: 'Former User', email: 'former_user@example.com', password: SecureRandom.hex(16), active: true ) puts "'Former User' account created with user ID: #{former_user.id}" # Store the user ID mapping puts "for insert_user_id_mapping: 0 (former_user) discourse_user.id #{former_user.id}" insert_user_id_mapping(0, former_user.id) else puts "'Former User' account already exists with user ID: #{former_user.id}" end end # Import users from Gossamer Forums to Discourse def import_users puts "Importing Users..." users = [] # Fetch all users from Gossamer Forums execute_query("SELECT * FROM gforum_User").each do |row| users << { id: row['user_id'], username: sanitize_username(row['user_username'], row['user_email'], row['user_real_name']), email: row['user_email'], created_at: Time.at(row['user_registered']), updated_at: Time.at(row['user_last_seen']), name: row['user_real_name'], title: row['user_title'], bio_raw: row['user_about'] || "", website: row['user_homepage'], location: row['user_location'], custom_fields: { md5_password: row['user_password'], original_username: row['user_username'], original_gossamer_id: row['user_id'] } } end # Create or update users in Discourse create_users(users) do |user| # insert_user_id_mapping(user[:id], user.id) user end end # Generate SQLite user ID mapping between Discourse and Gossamer def generate_user_id_mapping puts "Generating User ID Mapping..." users = [] # Fetch all users from Gossamer Forums execute_query("SELECT * FROM gforum_User").each do |row| users << { id: row['user_id'], username: sanitize_username(row['user_username'], row['user_email'], row['user_real_name']), email: row['user_email'], created_at: Time.at(row['user_registered']), updated_at: Time.at(row['user_last_seen']), name: row['user_real_name'], title: row['user_title'], bio_raw: row['user_about'] || "", website: row['user_homepage'], location: row['user_location'], custom_fields: { md5_password: row['user_password'], original_username: row['user_username'], original_gossamer_id: row['user_id'] } } end # For each user, add user ID mapping to SQLite now that we know what the Discourse user ID is users.each do |user| # discourse_username = sanitize_username(user[:username], user[:email], user[:name]) discourse_username = user[:username] discourse_user = User.find_by(username: discourse_username) if discourse_user.nil? puts "User #{user[:username]} --> #{discourse_username} not found in Discourse. Skipping user mapping addition." next end # Store the user ID mapping # @user_id_map[user[:id]] = discourse_user.id puts "for insert_user_id_mapping: user[:id] #{user[:id]} discourse_user.id #{discourse_user.id}" insert_user_id_mapping(user[:id], discourse_user.id) end end # Import and set user Bio and Images def set_user_bio_images puts "Setting User Bio and Images..." users = [] # Fetch all users from Gossamer Forums execute_query("SELECT * FROM gforum_User").each do |row| users << { id: row['user_id'], username: sanitize_username(row['user_username'], row['user_email'], row['user_real_name']), email: row['user_email'], created_at: Time.at(row['user_registered']), updated_at: Time.at(row['user_last_seen']), name: row['user_real_name'], title: row['user_title'], bio_raw: row['user_about'] || "", website: row['user_homepage'], location: row['user_location'], custom_fields: { md5_password: row['user_password'], original_username: row['user_username'], original_gossamer_id: row['user_id'] } } end # For each user, append user bio and import user files users.each do |user| # discourse_username = sanitize_username(user[:username], user[:email], user[:name]) discourse_username = user[:username] discourse_user = User.find_by(username: discourse_username) if discourse_user.nil? puts "User #{user[:username]} --> #{discourse_username} not found in Discourse. Skipping bio-image setting." next end # Ensure user profile exists and bio_raw is a string discourse_user.user_profile ||= UserProfile.new(user_id: discourse_user.id) discourse_user.user_profile.bio_raw ||= "" # Append bio if it exists, otherwise set it to empty string to avoid nil errors if discourse_user.user_profile.bio_raw.empty? discourse_user.user_profile.bio_raw = user[:bio_raw] else discourse_user.user_profile.bio_raw += "\n\n" + user[:bio_raw] end # Ensure the bio does not exceed 3000 characters if discourse_user.user_profile.bio_raw.length > 3000 puts "Warning: About Me for user #{discourse_user.username} (ID: #{discourse_user.id}) exceeds 3000 characters. Truncating." discourse_user.user_profile.bio_raw = discourse_user.user_profile.bio_raw[0, 3000] end discourse_user.user_profile.save! # Import user files # TEMPORARY!! -- Use if script is interrupted mid-user-importing. # if user[:id] > 353 import_user_files(discourse_user) # end end end # # Import user files from Gossamer Forums to Discourse # def import_user_files(user) # print "\rImporting files for user #{user.username}..." # # original_gossamer_id = user.custom_fields['original_gossamer_id'] # if original_gossamer_id.nil? || original_gossamer_id.empty? # puts "User #{user.username} does not have a valid original_gossamer_id. Skipping file import." # return # end # # # puts "Original Gossamer ID for user #{user.username}: #{original_gossamer_id}" # # # Fetch and import user files # execute_query("SELECT * FROM gforum_User_Files WHERE ForeignColKey = #{original_gossamer_id}").each do |file| # # Construct the file URL # file_url = "https://forum.slowtwitch.com/images/users/images/#{file['ID'] % 10}/#{file['ID']}-#{file['File_Name']}" # puts "User #{user.username} User ID: #{user.id} original_gossamer_id: #{original_gossamer_id} file_url: #{file_url}" # # new_bio = user.user_profile.bio_raw + "\n\n![#{file['File_Name']}](#{file_url})" # if new_bio.length > 3000 # puts "Warning: About Me for user #{user.username} (ID: #{user.id}) exceeds 3000 characters after adding file link. Truncating." # new_bio = new_bio[0, 3000] # end # user.user_profile.bio_raw = new_bio # user.user_profile.save! # end # print "Importing files for user #{user.username}... Done.\n" # end # Helper method to convert TIFF to PNG def convert_tiff_to_png(file_path) png_path = file_path.sub('.tiff', '.png').sub('.tif', '.png') begin system("convert #{file_path} #{png_path}") png_path if File.exist?(png_path) rescue => e puts "Failed to convert image #{file_path}: #{e.message}" puts e.backtrace.join("\n") # Print the full stack trace nil end end # Helper method to resize an image to specified dimensions def resize_image(file_path, max_width, max_height) resized_path = file_path.sub(File.extname(file_path), "_resized#{File.extname(file_path)}") begin system("convert #{file_path} -resize #{max_width}x#{max_height} #{resized_path}") resized_path if File.exist?(resized_path) rescue => e puts "Failed to resize image #{file_path}: #{e.message}" puts e.backtrace.join("\n") # Print the full stack trace nil end end # Create avatar for the user using the provided image path def create_avatar(user, avatar_path) tempfile = Tempfile.new(['avatar', File.extname(avatar_path)]) FileUtils.copy_file(avatar_path, tempfile.path) filename = "avatar#{File.extname(avatar_path)}" upload = UploadCreator.new(tempfile, filename, type: "avatar").create_for(user.id) if upload.present? && upload.persisted? user.create_user_avatar user.user_avatar.update(custom_upload_id: upload.id) user.update(uploaded_avatar_id: upload.id) else puts "Failed to upload avatar for user #{user.username}: #{avatar_path}" puts upload.errors.inspect if upload end rescue StandardError => e puts "Failed to create avatar for user #{user.username}: #{avatar_path}. Error: #{e.message}" ensure tempfile.close! if tempfile end # Import user files (profile images) from Gossamer Forums to Discourse def import_user_files(user) print "\rImporting files for user #{user.username}..." original_gossamer_id = user.custom_fields['original_gossamer_id'] if original_gossamer_id.nil? || original_gossamer_id.empty? puts "User #{user.username} does not have a valid original_gossamer_id. Skipping file import." return end puts "Original Gossamer ID for user #{user.username}: #{original_gossamer_id}" images_imported = 0 execute_query("SELECT * FROM gforum_User_Files WHERE ForeignColKey = #{original_gossamer_id}").each do |file| # Encode the filename twice to match how it is stored on the server double_encoded_filename = file['File_Name'].gsub('%', '%25') encoded_filename = URI.encode_www_form_component(double_encoded_filename) # Construct the file URL file_url = "https://forum.slowtwitch.com/images/users/images/#{file['ID'] % 10}/#{file['ID']}-#{encoded_filename}" puts "User #{user.username} User ID: #{user.id} original_gossamer_id: #{original_gossamer_id} file_url: #{file_url}" # Ensure the file is a user image and has a supported MIME type next unless file['ForeignColName'] =~ /^user_image\d+$/ puts "#A" next unless ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/avif', 'image/heif', 'image/heic'].include?(file['File_MimeType']) puts "#B" # Download the attachment image_data = download_attachment(file_url) next if image_data.nil? puts "#C" # Write the image data to a temporary file temp_file = Tempfile.new(['user_image', File.extname(file['File_Name'])]) temp_file.binmode temp_file.write(image_data) temp_file.rewind # Convert TIFF to PNG if necessary # if file['File_MimeType'] == 'image/tiff' # begin # converted_tiff_to_png_path = convert_tiff_to_png(temp_file.path) # raise "Conversion of TIFF failed" if converted_tiff_to_png_path.nil? # temp_file.close # temp_file.unlink # temp_file = File.open(converted_tiff_to_png_path) # rescue => e # puts "Skipping image due to convert failure: #{temp_file.path}" # temp_file.close # temp_file.unlink # next # end # end # Resize the image for the avatar / profile picture (200x200) begin resized_image_path = resize_image(temp_file.path, 200, 200) raise "Image resize failed" if resized_image_path.nil? resized_temp_file = Tempfile.new(['user_image_resized', '.png']) FileUtils.copy_file(resized_image_path, resized_temp_file.path) rescue => e puts "Skipping image due to resize failure: #{temp_file.path}" puts e.backtrace.join("\n") # Print the full stack trace temp_file.close temp_file.unlink next end # Only handle the first image for avator and profile settings if images_imported == 0 puts "#D" # Upload the original image for profile header and user card background upload = upload_attachment(user.id, temp_file, file['File_Name'], file_url) next if upload.nil? # Upload the resized image for the avatar resized_upload = upload_attachment(user.id, resized_temp_file, file['File_Name'], file_url) next if resized_upload.nil? # Ensure the upload ID is valid and exists if resized_upload # && UserAvatar.exists?(custom_upload_id: resized_upload.id) # Set the avatar using the resized image # user.user_avatar = UserAvatar.create!(user_id: user.id, custom_upload_id: resized_upload.id) # user.save! # user.create_user_avatar # user.user_avatar.update(custom_upload_id: resized_upload.id) # user.update(uploaded_avatar_id: resized_upload.id) create_avatar(user, resized_temp_file.path) puts "Avatar set for user #{user.username} with upload ID #{resized_upload.id}" else puts "Failed to set avatar for user #{user.username}" end # Set the Profile Header UserProfile.find_by(user_id: user.id).update!(profile_background_upload_id: upload.id) # Set the User Card Background UserProfile.find_by(user_id: user.id).update!(card_background_upload_id: upload.id) images_imported += 1 end puts "#E" # Append the image to the user's bio_raw only if it does not already exist image_markdown = "\n\n![#{file['File_Name']}](#{file_url})" user.user_profile.bio_raw ||= "" unless user.user_profile.bio_raw.include?(image_markdown) user.user_profile.bio_raw += image_markdown user.user_profile.save! end # Clean up temporary files temp_file.close temp_file.unlink if temp_file.is_a?(Tempfile) resized_temp_file.close resized_temp_file.unlink end # Check for and remove exact duplicate image markdown strings user.user_profile.bio_raw = user.user_profile.bio_raw.split("\n\n").uniq.join("\n\n") user.user_profile.save! print "Importing files for user #{user.username}... Done.\n" end # Import categories from Gossamer Forums to Discourse def import_categories puts "Importing categories (forums)..." execute_query("SELECT * FROM gforum_Forum").each do |row| # Only create category if it does not exist unless CategoryCustomField.exists?(name: 'original_gossamer_id', value: row['forum_id']) category_name = row['forum_name'] category_description = row['forum_desc'] || "No description provided" puts "id #{row['forum_id']} name #{category_name} description #{category_description}" # Create category in Discourse category = create_category( { # id: row['forum_id'] + 10, name: category_name, description: category_description, created_at: row['forum_last'] ? Time.at(row['forum_last']) : Time.now, updated_at: row['forum_last'] ? Time.at(row['forum_last']) : Time.now }, row['forum_id'] # import_id argument ) # # Map Gossamer forum ID to Discourse category ID for future reference # @forum_id_map[row['forum_id']] = category.id # category.custom_fields.create!(name: 'original_gossamer_id', value: row['forum_id']) category.custom_fields['original_gossamer_id'] = row['forum_id'] category.save! # Store the user ID mapping puts "for insert_category_id_mapping: category[:id] #{category[:id]} row['forum_id'] #{row['forum_id']}" insert_category_id_mapping(row['forum_id'], category[:id]) end end puts "Importing categories... Done." end # Helper function to ensure title meets the minimum length requirement def ensure_valid_title(title, min_length = 5) if title.length < min_length title += "." * (min_length - title.length) # Append dots to make it longer end title end # # Convert Gossamer tags to Discourse markdown # def convert_gossamer_tags_to_markdown(text) # text.gsub!(/\[b\](.*?)\[\/b\]/, '**\1**') # text.gsub!(/\[i\](.*?)\[\/i\]/, '*\1*') # text.gsub!(/\[img\](.*?)\[\/img\]/, '![image](\1)') # text.gsub!(/\[quote\](.*?)\[\/quote\]/m, '[quote]\1[/quote]') # text.gsub!(/\[quote (.*?)\](.*?)\[\/quote\]/m, '[quote=\1]\2[/quote]') # text.gsub!(/\[font "(.*?)"\](.*?)\[\/font\]/m, '\2') # Ignoring font changes # text.gsub!(/\[size (\d+)\](.*?)\[\/size\]/m, '\2') # Ignoring size changes # text # end # Sanitize post message to remove Gossamer-specific tags and convert to Discourse-friendly format def sanitize_post_message(message) # Ensure the raw post string contents itself is acceptable to Discourse sanitized_message = message&.tr("\0", '') || "" # Ensure minimum length if sanitized_message.strip.empty? || sanitized_message.length < 3 sanitized_message = "Empty post contents." end # Ensure sentence structure unless sanitized_message.match?(/[.!?]\s|[.!?]$/) sanitized_message += "." end # Remove the [signature] as we don't support this in Discourse # sanitized_message.sub!(/\n?\[signature\]\n?\z/, '') sanitized_message.gsub(/\n?\[signature\]\n?/, '') # Convert Gossamer tags to Discourse markdown sanitized_message.gsub!(/\[b\](.*?)\[\/b\]/, '**\1**') sanitized_message.gsub!(/\[i\](.*?)\[\/i\]/, '*\1*') sanitized_message.gsub!(/\[u\](.*?)\[\/u\]/, '\1') sanitized_message.gsub!(/\[quote\](.*?)\[\/quote\]/m, '[quote]\1[/quote]') sanitized_message.gsub!(/\[quote\s+user=(.*?)\](.*?)\[\/quote\]/m, '[quote \1]\2[/quote]') sanitized_message.gsub!(/\[img\](.*?)\[\/img\]/, '![image](\1)') sanitized_message.gsub!(/\[url=(.*?)\](.*?)\[\/url\]/, '[\2](\1)') sanitized_message.gsub!(/\[size=(.*?)\](.*?)\[\/size\]/, '\2') sanitized_message.gsub!(/\[color=(.*?)\](.*?)\[\/color\]/, '\2') sanitized_message.gsub!(/\[font=(.*?)\](.*?)\[\/font\]/, '\2') # Remove unsupported tags sanitized_message.gsub!(/\[.*?\]/, '') # Convert inline image syntax from `!(url)` to `![url](url)` sanitized_message.gsub!(/!\((http[s]?:\/\/[^\)]+)\)/, '![\1](\1)') sanitized_message end # Fetch post views from the gforum_PostView table def fetch_post_views(post_id) result = execute_query("SELECT post_views FROM gforum_PostView WHERE post_id_fk = #{post_id} LIMIT 1").first result ? result['post_views'] : 0 end ########## THREADING START -------------------------------------------- # Method to dynamically calculate the optimal thread pool size based on system load def calculate_dynamic_pool_size # Fetch current CPU load average using Sys::ProcTable.loadavg # load_avg = Sys::ProcTable.loadavg.last # Get the 15-minute load average # load_avg = Sys::ProcTable.loadavg load_avg = File.read('/proc/loadavg').split # Calculate the pool size based on the load average # Adjust the multiplier and threshold as needed # pool_size = [(Concurrent.processor_count / (load_avg + 0.1)).to_i, 1].max # Extract the 1-minute load average from the fetched data one_minute_load_avg = load_avg[0].to_f # Determine how many logical CPU cores are available on the system cpu_count = Concurrent.processor_count # Log the current load and CPU information for debugging and monitoring purposes puts "1-minute Load Average: #{one_minute_load_avg}, CPU Count: #{cpu_count}" # Calculate the initial pool size based on the ratio of the 1-minute load average to the number of CPUs # This ratio gives an idea of how many threads should be running to efficiently utilize the CPU resources initial_pool_size = (cpu_count / one_minute_load_avg).ceil # Ensure the pool size is at least 1 to avoid creating a pool with zero threads initial_pool_size = 1 if initial_pool_size < 1 # Cap the maximum pool size to twice the number of CPUs # This prevents overloading the system with too many threads, which could lead to diminishing returns max_pool_size = cpu_count * 2 # Adjust the final pool size to be within the valid range (1 to max_pool_size) pool_size = [[initial_pool_size, max_pool_size].min, 1].max puts "Calculated and adjusted dynamic pool size: #{pool_size}" # Log the dynamically adjusted pool size pool_size end # Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0; def threaded_topic_import # Use CachedThreadPool for dynamic thread management #### pool = Concurrent::CachedThreadPool.new ###### pool = Concurrent::FixedThreadPool.new(7) pool = Concurrent::FixedThreadPool.new(12) # Define the connection pool inside the method ###### mariadb_pool = ConnectionPool.new(size: 14, timeout: 100) do mariadb_pool = ConnectionPool.new(size: 24, timeout: 100) do Mysql2::Client.new( host: "slowtwitch.northend.network", username: "admin", password: "yxnh93Ybbz2Nm8#mp28zCVv", database: "slowtwitch" ) end # mariadb_pool = ConnectionPool.new(size: 24, timeout: 100) do # Mysql2::Client.new( # host: "172.99.0.10", # username: "admin", # password: "x0YGLA9252iiTFQuqaM0ROX8FmQzZuUu", # database: "slowtwitch" # ) # end # The query selects post_ids from gforum_Post where post_root_id is 0, meaning these posts are the topic starters (OPs). # Execute the query and fetch the result # result = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id ASC") result = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id ASC") # Convert the result set to an array of post_ids parent_post_ids = result.map { |row| row['post_id'] } # parent_post_count = parent_post_ids.count # batch_size = 100 # Set our batch size for number of posts to import in a single batch #### current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed. is_complete = false # Flag to indicate whether the import process is complete. # Mutex to control access to shared resources ### mutex = Mutex.new # Mutex for MySQL2 operations -- disabled as this may not in fact be necessary - TBD. sqlite_mutex = Mutex.new # Mutex for SQLite opreations # Run until all posts have been processed. until is_complete # puts "QQ 11 -- GETTING NEXT BATCH ****************************************" #### # Query in batches, create pool, wait for termination, do it again #### current_post_batch_max = current_post_batch + batch_size # Get the next batch of posts current_post_batch = parent_post_ids.shift(batch_size) break if current_post_batch.empty? # Process each post in the current batch current_post_batch.each do |post_id| # puts "QQ 22 -- #{post_id}" ####### # Static pool size based on number of CPUs # # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) # Create a thread pool that is bounded by processors avaialable # # pool = Concurrent::FixedThreadPool.new(8) # Create a thread pool of 8 pool members #### # Dynamically calculate the pool size based on system load to optimise performance #### pool_size = calculate_dynamic_pool_size # Dynamically calculate what the pool size "ought" to be. #### pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size #### # Process each post in the current batch #### while current_post_batch < current_post_batch_max #### post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post #### # Check if the post has already been processed or is incomplete #### post_status = fetch_post_status(post_id) # Submit the import job for the current post_id to the thread pool pool.post do # Initialise a new MariaDB / Mysql2 client inside of each thread #### mysql_client = Mysql2::Client.new( #### host: "slowtwitch.northend.network", #### username: "admin", #### password: "yxnh93Ybbz2Nm8#mp28zCVv", #### database: "slowtwitch" #### ) #### puts "PP 22 -- #{post_id}" begin mariadb_pool.with do |mysql_client| # Ensure the connection is active, otherwise reconnect puts "PP 11 -- #{post_id} -- Checking MySQL connections status.." mysql_client.ping ## || mysql_client = Mysql2::Client.new( ## host: "slowtwitch.northend.network", ## username: "admin", ## password: "yxnh93Ybbz2Nm8#mp28zCVv", ## database: "slowtwitch" ## ) #### puts " FIRST Checking MySQL connection status..." #### if mysql_client.query('SELECT 1').nil? #### puts " MySQL connection is not valid" #### else #### puts " MySQL connection is valid" #### end # Use connection pooling for PostgreSQL and synchronize access to shared resources ActiveRecord::Base.connection_pool.with_connection do post_status = fetch_post_status(post_id) if post_status.nil? || post_status == 0 puts "Starting import for post_id #{post_id}" topic_import_job(post_id, sqlite_mutex, mysql_client) # Import topic and its replies sqlite_mutex.synchronize do mark_post_as_complete(post_id) # Mark as complete in SQLite table end else puts "Skipping post_id #{post_id}, already processed." end end end rescue => e puts "Error processing post ID #{post_id}: #{e.message}" puts e.backtrace.join("\n") # Print the full stack trace sqlite_mutex.synchronize do mark_post_as_failed(post_id) end if e.message =~ /MySQL client is not connected/ || e.message =~ /This connection is in use by/ sleep(1) puts "Reconnecting to MySQL for post ID #{post_id} due to connection loss..." retry end #### ensure #### # Ensure the MariaDB connection is closed after processing #### mysql_client.close if mysql_client #### puts "** CLOSED MariaDB client" #### puts "PP 22 -- #{post_id}" #### puts " FINAL Checking MySQL connection status..." #### if mysql_client.query('SELECT 1').nil? #### puts " MySQL connection is not valid" #### else #### puts " MySQL connection is valid" #### end end end end #### current_post_batch += 1 # Increment, moving to next post in the batch #### break if current_post_batch >= parent_post_count # Check if all posts have been processed #### is_complete = true if current_post_batch >= parent_post_count is_complete = parent_post_ids.empty? end # Wait for all jobs in the current batch to finish before proceeding puts "PP 33 -- Ready for shutdown" pool.shutdown # Initiate thread pool shutdown after all jobs submitted puts "PP 44 -- Now wait for termination" pool.wait_for_termination # Wait for all threads to finish exec end # # Method to ensure thread-safe updates to highest_processed_post_id # def update_highest_processed_post_id_thread_safe(post_id) # @highest_processed_mutex ||= Mutex.new # @highest_processed_mutex.synchronize do # if post_id > fetch_highest_processed_post_id # update_highest_processed_post_id(post_id) # end # end # end # Method to import an entire topic, including its first post and all subsequent replies def topic_import_job(post_id, sqlite_mutex, mysql_client) ##### def topic_import_job(post_id, sqlite_mutex) puts "TIJ ZZ post_id #{post_id}" ##### mysql_client = Mysql2::Client.new( ##### host: "slowtwitch.northend.network", ##### username: "admin", ##### password: "yxnh93Ybbz2Nm8#mp28zCVv", ##### database: "slowtwitch" ##### ) puts " FIRST Checking MySQL connection status..." if mysql_client.query('SELECT 1').nil? puts " MySQL connection is not valid, TRY TO RECONNECT II" mysql_client.ping # || mysql_client = Mysql2::Client.new( # host: "slowtwitch.northend.network", # username: "admin", # password: "yxnh93Ybbz2Nm8#mp28zCVv", # database: "slowtwitch" # ) else puts " MySQL connection is valid" end puts "TIJ AA post_id #{post_id}" # Fetch the post data for the given post_id (this is the first post in the topic) row = execute_query_concurrent("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post WHERE post_id = #{post_id}", mysql_client).first puts "TIJ BB post_id #{post_id}" # Early return if the post data is not found return unless row puts "TIJ CC post_id #{post_id}" # Extract key values from the fetched row post_id = row['post_id'].to_i puts "Processing post_id #{row['post_id']} post_root_id #{row['post_root_id']} post_subject/title #{row['post_subject']} forum_id_fk/category_id #{row['forum_id_fk']}" # Fetch the mapped Discourse user and category ID based on Gossamer data discourse_user_id = fetch_user_id_mapping(row['user_id_fk']) discourse_category_id = fetch_category_id_mapping(row['forum_id_fk']) puts "discourse_user_id #{discourse_user_id} discourse_category_id #{discourse_category_id}" return unless discourse_user_id && discourse_category_id puts "TIJ DD post_id #{post_id}" # Ensure the topic title is valid and generate a unique title if needed title = ensure_valid_title(row['post_subject']) unique_title = title # Fetch the number of views the post has had post_views = fetch_post_views(row['post_id']) # Check if the topic has already been imported using the custom field 'original_gossamer_id' unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id']) puts "TIJ EE post_id #{post_id}" ActiveRecord::Base.transaction do # Create the new topic in Discourse begin suffix = 1 topic_created = false while !topic_created begin puts "TIJ FF post_id #{post_id}" puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}" topic = Topic.create!( title: unique_title, user_id: discourse_user_id, created_at: Time.at(row['post_time']), updated_at: Time.at(row['post_time']), category_id: discourse_category_id, views: post_views || 0, posts_count: 0 ) topic.custom_fields['original_gossamer_id'] = row['post_id'] topic.save! topic_created = true # rescue ActiveRecord::RecordInvalid => e rescue => e if e.message.include?("Title has already been used") unique_title = "#{title} (#{suffix})" suffix += 1 else raise e end # puts e.backtrace.join("\n") # Print the full stack trace end end # Workaround... take a copy of topic.id current_topic_id = topic.id sqlite_mutex.synchronize do # Update the database with the last post time and user for the topic update_db_topic_last_post_time(current_topic_id, Time.at(row['post_time']).to_i) update_db_topic_last_post_user(current_topic_id, discourse_user_id) # Increment the topic count for the user update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1) end # Sanitize and prepare the post message for Discourse sanitized_post_message = sanitize_post_message(row['post_message']) puts "CREATE TOPIC POST for current_topic_id #{current_topic_id} discourse_user_id #{discourse_user_id}" post_number = 1 # Increment the post count for the topic # This is a first post... post_number = fetch_db_topic_post_numbers(current_topic_id).to_i + 1 sqlite_mutex.synchronize do update_db_topic_post_numbers(current_topic_id, post_number) end puts "TIJ GG post_id #{post_id}" # Create the initial post in the new topic post = Post.create!( topic_id: current_topic_id, user_id: discourse_user_id, raw: sanitized_post_message, created_at: Time.at(row['post_time']), updated_at: Time.at(row['post_time']), reads: post_views || 0, post_number: post_number ) post.custom_fields['original_gossamer_id'] = row['post_id'] post.save! sqlite_mutex.synchronize do # Increment the post count for the topic and user update_db_topic_post_count(current_topic_id, fetch_db_topic_post_count(current_topic_id).to_i + 1) update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1) end puts "TIJ HH post_id #{post_id}" # Handle any attachments associated with the post handle_post_attachments(row['post_id'], post, discourse_user_id, mysql_client) # Create URL mappings for the new topic new_url = "https://new/t/#{topic.slug}/#{current_topic_id}" sqlite_mutex.synchronize do insert_url_mapping(row['post_id'], new_url, unique_title) end # Fetch and import all replies to this topic replies = execute_query_concurrent("SELECT post_id, user_id_fk, post_message, post_time FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC", mysql_client) # Import each reply sequentially replies.each do |reply_row| ## begin # Fetch the discourse user ID for the reply reply_user_id = fetch_user_id_mapping(reply_row['user_id_fk']) if reply_user_id.nil? || reply_user_id == 0 puts "reply_user_id is NIL/ZERO for reply post_id #{reply_row['post_id']}" former_user = User.find_by(username: 'Former_User') reply_user_id = former_user.id puts "reply_user_id is NOW Former_User id #{reply_user_id} for reply post_id #{reply_row['post_id']}" end puts "TIJ II post_id #{post_id}" # Sanitize and prepare the reply message for Discourse sanitized_reply_message = sanitize_post_message(reply_row['post_message']) puts "CREATE REPLY in current_topic_id #{current_topic_id} for reply post_id #{reply_row['post_id']}" ### def get_topic_id ### return topic.id ### end # Increment the post count for the topic post_number = fetch_db_topic_post_numbers(current_topic_id).to_i + 1 sqlite_mutex.synchronize do update_db_topic_post_numbers(current_topic_id, post_number) ### update_db_topic_post_numbers(get_topic_id, post_number) end # Fetch the number of views the post has had reply_post_views = fetch_post_views(reply_row['post_id']) # crazy sanity check if topic.nil? puts "ERROR: Topic is nil for reply post_id #{reply_row['post_id']}, attempting to BYPASS anyway" end puts "TIJ JJ post_id #{post_id} reply post_id #{reply_row['post_id']} reply_post_views #{reply_post_views || 0} post_number #{post_number} current_topic_id #{current_topic_id} reply_post_views #{reply_post_views || 0}" # Create the reply post in the existing topic post = Post.create!( topic_id: current_topic_id, user_id: reply_user_id, raw: sanitized_reply_message, created_at: Time.at(reply_row['post_time']), updated_at: Time.at(reply_row['post_time']), reads: reply_post_views || 0, post_number: post_number ) post.custom_fields['original_gossamer_id'] = reply_row['post_id'] post.save! puts "TIJ KK post_id #{post_id}" # Increment the post count for the topic and user sqlite_mutex.synchronize do update_db_topic_post_count(current_topic_id, fetch_db_topic_post_count(current_topic_id).to_i + 1) update_db_user_post_count(reply_user_id, fetch_db_user_post_count(reply_user_id).to_i + 1) end # Update last post time and user for the topic if fetch_db_topic_last_post_time(current_topic_id).nil? || Time.at(reply_row['post_time']).to_i > fetch_db_topic_last_post_time(current_topic_id).to_i sqlite_mutex.synchronize do update_db_topic_last_post_time(current_topic_id, Time.at(reply_row['post_time']).to_i) update_db_topic_last_post_user(current_topic_id, reply_user_id) end end # Handle any attachments associated with the reply handle_post_attachments(reply_row['post_id'], post, reply_user_id, mysql_client) # # Update the highest processed post_id in the database (thread-safe) # update_highest_processed_post_id_thread_safe(reply_row['post_id']) # rescue ActiveRecord::RecordInvalid => e ## rescue => e ## puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}" ## puts e.backtrace.join("\n") # Print the full stack trace ## end end # # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe) # update_highest_processed_post_id_thread_safe(post_id) rescue ActiveRecord::RecordInvalid => e puts "Error importing topic with post_id #{row['post_id']}: #{e.message}" raise ActiveRecord::Rollback end end else puts "Topic for post_id #{row['post_id']} already exists, skipping creation." end puts " LAST Removing MySQL connection" ##### mysql_client.close # if mysql_client end ########## THREADING END -------------------------------------------- # Import topics and posts from Gossamer Forums to Discourse def import_topics_and_posts_with_attachments puts "Importing topics and posts with attachments..." # topic_last_post_time = {} # topic_post_count = Hash.new(0) # user_topic_count = Hash.new(0) # user_post_count = Hash.new(0) # topic_last_post_user = {} # topic_post_numbers = Hash.new { |hash, key| hash[key] = 0 } # Fetch the highest old_post_id from the url_map table highest_old_post_id = fetch_highest_old_post_id.to_i puts "Highest (OP) old_post_id in url_map: #{highest_old_post_id}" highest_processed_post_id = fetch_highest_processed_post_id.to_i puts "Highest processed post_id: #{highest_processed_post_id}" # OVERRIDE........ # Attachment example: highest_processed_post_id = 1359862 # Execute the query to get all posts ordered by post_id execute_query("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_likes, post_replies FROM gforum_Post ORDER BY post_id").each do |row| post_id = row['post_id'].to_i # Skip posts that have already been processed # FIXED PROPERLY FOR FUTURE RUNS # next if post_id < highest_old_post_id next if post_id <= highest_processed_post_id puts "Processing post_id #{row['post_id']} post_id #{row['post_root_id']} post_subject/title #{row['post_subject']} forum_id_fk/category_id #{row['forum_id_fk']}" # discourse_user_id = @user_id_map[row['user_id_fk']] # Fetch the Discourse user and category IP mappings discourse_user_id = fetch_user_id_mapping(row['user_id_fk']) discourse_category_id = fetch_category_id_mapping(row['forum_id_fk']) puts "discourse_user_id #{discourse_user_id} discourse_category_id #{discourse_category_id}" next unless discourse_user_id && discourse_category_id # Check if the post is a topic (post_root_id == 0) if row['post_root_id'] == 0 puts "#1" # Ensure the title is valid title = ensure_valid_title(row['post_subject']) unique_title = title # Confirm the number of views the post has had post_views = fetch_post_views(row['post_id']) # Skip if the topic already exists unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id']) # Create the topic begin suffix = 1 topic_created = false while !topic_created begin puts "#2" # unique_title = generate_unique_title(title, discourse_user_id, Time.at(row['post_time'])) puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}" topic = Topic.create!( title: unique_title, user_id: discourse_user_id, created_at: Time.at(row['post_time']), updated_at: Time.at(row['post_time']), category_id: discourse_category_id, views: post_views || 0, posts_count: 0 ) topic.custom_fields['original_gossamer_id'] = row['post_id'] topic.save! topic_created = true rescue ActiveRecord::RecordInvalid => e if e.message.include?("Title has already been used") unique_title = "#{title} (#{suffix})" suffix += 1 else raise e end end end # Track last post time and user for the topic # topic_last_post_time[topic.id] = Time.at(row['post_time']) # topic_last_post_user[topic.id] = discourse_user_id update_db_topic_last_post_time(topic.id, Time.at(row['post_time']).to_i) update_db_topic_last_post_user(topic.id, discourse_user_id) # Increment the count of the number of topics created by each user # user_topic_count[discourse_user_id] += 1 update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1) # # Ensure the raw post stirng contents itself is acceptable to Discourse # sanitized_post_message = row['post_message']&.tr("\0", '') || "" # # # Convert Gossamer tags to Discourse markdown # sanitized_post_message = convert_gossamer_tags_to_markdown(sanitized_post_message) # # # Remove the [signature] label from appearing at the end of the messages after import # sanitized_post_message.sub(/\n?\[signature\]\n?\z/, '') # sanitized_post_message.gsub(/\n?\[signature\]\n?/, '') # Sanitize the post message sanitized_post_message = sanitize_post_message(row['post_message']) puts "CREATE POST topic.id #{topic.id} discourse_user_id #{discourse_user_id}" # Increment the number of posts in the given topic. # topic_post_numbers[topic.id] += 1 post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1 update_db_topic_post_numbers(topic.id, post_number) # Create the initial post in the topic post = Post.create!( topic_id: topic.id, user_id: discourse_user_id, # raw: import_attachments(row['post_message'], row['post_id']), # raw: row['post_message'] || "", raw: sanitized_post_message, created_at: Time.at(row['post_time']), updated_at: Time.at(row['post_time']), like_count: row['post_likes'] || 0, reads: post_views || 0, post_number: post_number ) # post_number: topic_post_numbers[topic.id] post.custom_fields['original_gossamer_id'] = row['post_id'] post.save! # Track the number of posts in the topic and by the user # topic_post_count[topic.id] += 1 # user_post_count[discourse_user_id] += 1 update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1) update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1) # Handle attachments for the post handle_post_attachments(row['post_id'], post, discourse_user_id) # Create URL mappings # old_url = "https://old/forum/#{row['forum_name']}/topics/#{row['post_id']}" new_url = "https://new/t/#{topic.slug}/#{topic.id}" insert_url_mapping(row['post_id'], new_url, unique_title) # Update the highest processed post_id puts "Updated highest processed post_id #{post_id}" update_highest_processed_post_id(post_id) rescue ActiveRecord::RecordInvalid => e puts "Error importing topic with post_id #{row['post_id']}: #{e.message}" end end else puts "#3" # Confirm the number of views the post has had post_views = fetch_post_views(row['post_id']) # Find the root topic for the post root_topic_field = TopicCustomField.find_by(name: 'original_gossamer_id', value: row['post_root_id']) if root_topic_field topic_id = root_topic_field.topic_id # Find the parent post for the reply parent_post_field = PostCustomField.find_by(name: 'original_gossamer_id', value: row['post_father_id']) reply_to_post_number = parent_post_field ? Post.find(parent_post_field.post_id).post_number : nil # Create the post in the existing topic begin puts "#4" # Sanitize the post message sanitized_post_message = sanitize_post_message(row['post_message']) # topic_post_numbers[topic_id] += 1 post_number = fetch_db_topic_post_numbers(topic_id).to_i + 1 update_db_topic_post_numbers(topic_id, post_number) # Create the post in the existing topic post = Post.create!( topic_id: topic_id, user_id: discourse_user_id, # raw: import_attachments(row['post_message'], row['post_id']), # raw: row['post_message'] || "", raw: sanitized_post_message, created_at: Time.at(row['post_time']), updated_at: Time.at(row['post_time']), reply_to_post_number: reply_to_post_number, like_count: row['post_replies'] || 0, reads: post_views || fetch_db_topic_post_count(topic_id).to_i, post_number: post_number ) post.custom_fields['original_gossamer_id'] = row['post_id'] post.save! # Track the number of posts in the topic and by the user # topic_post_count[topic_id] += 1 # user_post_count[discourse_user_id] += 1 update_db_topic_post_count(topic_id, fetch_db_topic_post_count(topic_id).to_i + 1) update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1) # Update last post time and user for the topic # if topic_last_post_time[topic_id].nil? || Time.at(row['post_time']) > topic_last_post_time[topic_id] # topic_last_post_time[topic_id] = Time.at(row['post_time']) # topic_last_post_user[topic_id] = discourse_user_id # end if fetch_db_topic_last_post_time(topic_id).nil? || Time.at(row['post_time']).to_i > fetch_db_topic_last_post_time(topic_id).to_i update_db_topic_last_post_time(topic_id, Time.at(row['post_time']).to_i) update_db_topic_last_post_user(topic_id, discourse_user_id) end # Handle attachments for the post handle_post_attachments(row['post_id'], post, discourse_user_id) # Update the highest processed post_id puts "Updated highest processed post_id #{post_id}" update_highest_processed_post_id(post_id) rescue ActiveRecord::RecordInvalid => e puts "Error importing post with post_id #{row['post_id']}: #{e.message}" end else puts "Warning: Root topic not found for post_id #{row['post_id']} with post_root_id #{row['post_root_id']}" end end end puts "Importing topics and posts with attachments... Done." end def update_topic_stats # Update topics with the correct last post time, post count, and last post user puts "Update topics with the correct last post time, post count, and last post user" # topic_last_post_time.each do |topic_id, last_post_time| # Topic.find(topic_id).update!( # updated_at: last_post_time, # posts_count: topic_post_count[topic_id], # last_posted_at: last_post_time, # bumped_at: last_post_time, # last_post_user_id: topic_last_post_user[topic_id] # ) # end @db.execute("SELECT * FROM topic_last_post_time").each do |row| topic_id, last_post_time = row # Validation error: Topic.find(topic_id).update!( topic = Topic.find(topic_id) # Ensure we are only updating necessary fields topic.update_columns( updated_at: Time.at(last_post_time), posts_count: fetch_db_topic_post_count(topic_id).to_i, last_posted_at: Time.at(last_post_time), bumped_at: Time.at(last_post_time), last_post_user_id: fetch_db_topic_last_post_user(topic_id).to_i ) end end def update_user_stats # Update user profiles with the number of topics and posts created puts "Update user profiles with the number of topics and posts created" # user_topic_count.each do |user_id, count| # user = User.find(user_id) # user.update!(topic_count: count) # end # user_post_count.each do |user_id, count| # user = User.find(user_id) # user.update!(post_count: count) # end @db.execute("SELECT * FROM user_topic_count").each do |row| user_id, count = row # user = User.find(user_id) # user.update!(topic_count: count) puts "update_user_stats user_id #{user_id} topic_count #{count}" user_stat = UserStat.find_or_initialize_by(user_id: user_id) user_stat.update_columns(topic_count: count) end @db.execute("SELECT * FROM user_post_count").each do |row| user_id, count = row # user = User.find(user_id) # user.update!(post_count: count) puts "update_user_stats user_id #{user_id} post_count #{count}" user_stat = UserStat.find_or_initialize_by(user_id: user_id) user_stat.update_columns(post_count: count) # Determine the new Trust Level based on post_count user = User.find(user_id) new_trust_level = case count when 0..2 then 1 # basic user when 3..50 then 2 # member else 3 # regular or above when 51..100 end # Fetch the current user and check if Trust Level needs updating user = User.find(user_id) current_trust_level = user.trust_level || 1 # default to 1 if not set # Only update trust level if the new level is higher than the current one if new_trust_level > current_trust_level user.update!(trust_level: new_trust_level) puts "update_user_stats user_id #{user_id} trust_level updated to #{new_trust_level}" else puts "update_user_stats user_id #{user_id} trust_level remains at #{current_trust_level}" end end end # Import personal messages from gforum_Message table (both inbox and sent messages) def import_personal_messages puts "Importing personal (inbox and sentmail) messages..." # Fetch the highest_processed_personal_id highest_processed_personal_id = fetch_highest_processed_personal_id.to_i puts "Highest processed personal_id: #{highest_processed_personal_id}" # OVERRIDE - to speed getting to problem msg # # # highest_processed_personal_id = 1543840 puts "Highest processed personal_id override: #{highest_processed_personal_id}" execute_query("SELECT * FROM gforum_Message").each do |row| begin msg_id = row['msg_id'].to_i puts "msg_id #{msg_id}" # Skip posts that have already been processed next if msg_id <= highest_processed_personal_id from_user_id = fetch_user_id_mapping(row['from_user_id_fk']) to_user_id = fetch_user_id_mapping(row['to_user_id_fk']) next unless from_user_id && to_user_id # Skip if the message already exists unless TopicCustomField.exists?(name: 'original_gossamer_msg_id', value: row['msg_id']) # Sanitize the message, ensuring we have an empty string or the content without any \0 sanitized_message = sanitize_post_message(row['msg_body']) # Set default message body if the sanitized message is blank sanitized_message = "" if sanitized_message.strip.empty? || sanitized_message.split.size < 3 # If we do not change the "min personal message post length" to 1, we need this. ... We may need this anyway. sanitized_message = sanitized_message.ljust(10, '.') if sanitized_message.length < 10 # Check and set a default title if the original title is nil or empty sanitized_title = row['msg_subject']&.strip sanitized_title = "" if sanitized_title.nil? || sanitized_title.empty? # Check for an existing private message topic between the same two users with a similar title # topic = Topic.joins(:topic_allowed_users, :custom_fields) # .where(archetype: Archetype.private_message) # .where("topic_allowed_users.user_id = ? AND topic_allowed_users.user_id = ?", from_user_id, to_user_id) # .where("title = ? OR title = ?", sanitized_title, "Re: #{sanitized_title}") # .where(topic_custom_fields: { name: 'original_gossamer_msg_id' }) # .first topic = Topic.joins(:topic_allowed_users) .where(archetype: Archetype.private_message) .where("topics.title = ? OR topics.title = ?", sanitized_title, sanitized_title.gsub(/^Re: /, "")) .where("topic_allowed_users.user_id IN (?)", [from_user_id, to_user_id]) .group("topics.id") .having("COUNT(topic_allowed_users.user_id) = 2") .first # .where("topics.title = ? OR topics.title = ?", sanitized_title, sanitized_title.gsub(/^Re: /, "")) # .where("topics.topic_allowed_users.user_id = ? AND topics.topic_allowed_users.user_id = ?", from_user_id, to_user_id) # .where("((topics.user_id = ? AND topics.topic_allowed_users.user_id = ?) OR (topics.user_id = ? AND topics.topic_allowed_users.user_id = ?))", from_user_id, to_user_id, to_user_id, from_user_id) if topic.nil? puts "IMPORTING new message topic sanitized: #{sanitized_title} user_id #{from_user_id} to_user_id #{to_user_id}" # Create a private message topic in Discourse topic = Topic.create!( title: sanitized_title, user_id: from_user_id, archetype: Archetype.private_message, created_at: Time.at(row['msg_time']), updated_at: Time.at(row['msg_time']), last_posted_at: Time.at(row['msg_time']), last_post_user_id: from_user_id ) topic.custom_fields['original_gossamer_msg_id'] = row['msg_id'] topic.save! # Add recipient user to the private message topic topic.topic_allowed_users.create!(user_id: to_user_id) # Add sender user to the private message topic topic.topic_allowed_users.create!(user_id: from_user_id) else puts "APPENDING to existing message topic sanitized: #{sanitized_title} user_id #{from_user_id} to_user_id #{to_user_id}" # Increment the number of replies for the topic topic.increment!(:posts_count) end # Create the message as a post in the private topic post = Post.create!( topic_id: topic.id, user_id: from_user_id, raw: sanitized_message, created_at: Time.at(row['msg_time']), updated_at: Time.at(row['msg_time']) ) post.custom_fields['original_gossamer_msg_id'] = row['msg_id'] post.save! # Update the topic's last reply information topic.update!( last_posted_at: post.updated_at, last_post_user_id: post.user_id ) update_highest_processed_personal_id(msg_id) # N/A. These were never a thing in Slowtwitch... # handle_post_attachments(row['msg_id'], post, from_user_id) end rescue StandardError => e puts "Error importing message #{row['msg_id']}: #{e.message}" end end end # Main method to perform the import def perform_import # Secret trick to disable RateLimiting protection in Discourse RateLimiter.disable # ActiveRecord::Base.connection.disconnect! rescue nil # ActiveRecord::Base.clear_active_connections! rescue nil # ActiveRecord::Base.establish_connection( # adapter: 'postgresql', # host: '10.0.0.2', # Use the external DB host # port: 5432, # Default PostgreSQL port # database: 'discourse', # username: 'discourse', # password: 'nhB5FWhQkjdvaD2ViRNO63dQagDnzaTn', # connect_timeout: 10 # ) # ENV['PGHOST'] = '10.0.0.2' ENV['PGPORT'] = '5432' ENV['PGDATABASE'] = 'discourse' ENV['PGUSER'] = 'discourse' ENV['PGPASSWORD'] = 'nhB5FWhQkjdvaD2ViRNO63dQagDnzaTn' # ActiveRecord::Base.establish # Set our unique timestamp for this migration run timestamp = Time.now.strftime("-%Y%m%d%H%M%S") puts "Starting Gossamer Forums import... #{timestamp}" # add_former_user # import_users # generate_user_id_mapping # export_username_mapping_to_csv("/bitnami/discourse/sqlite/gossamer-migration-username-mapping#{timestamp}") ## set_user_bio_images # import_categories ####### import_topics_and_posts_with_attachments threaded_topic_import update_topic_stats update_user_stats export_url_mapping_to_csv("/bitnami/discourse/sqlite/gossamer-migration-url-mapping#{timestamp}") create_nginx_rewrite_rules("/bitnami/discourse/sqlite/gossamer-redirects#{timestamp}.conf") import_personal_messages puts "Gossamer Forums import complete! #{timestamp}" end end GossamerForumsImporter.new.perform_import