2031 lines
81 KiB
Ruby
2031 lines
81 KiB
Ruby
# Federated Computer, Inc.
|
|
# David Sainty <saint@federated.computer> 2024 A.D.
|
|
# Gossamer Threads to Discourse -- Migration-Import Script
|
|
# v0.48.6 48-48-48, add handling for deleted-former user as OP poster
|
|
|
|
require 'mysql2'
|
|
require 'open-uri'
|
|
require 'net/http'
|
|
require 'tempfile'
|
|
require 'sqlite3'
|
|
|
|
require 'digest'
|
|
require 'fileutils'
|
|
require 'csv'
|
|
require 'time'
|
|
|
|
require 'concurrent'
|
|
|
|
require 'sys/proctable'
|
|
|
|
require 'active_record'
|
|
require 'connection_pool'
|
|
|
|
require File.expand_path("../../../../config/environment", __FILE__)
|
|
# require_relative '../base'
|
|
require File.expand_path("../../../../script/import_scripts/base", __FILE__)
|
|
|
|
class GossamerForumsImporter < ImportScripts::Base
|
|
def initialize
|
|
super
|
|
begin
|
|
|
|
# Database configuration for ActiveRecord
|
|
# This is not used, except for pool size... issue with our Bitnami Discourse?
|
|
ActiveRecord::Base.establish_connection(
|
|
adapter: 'postgresql',
|
|
database: 'slowtwitch',
|
|
username: 'admin',
|
|
password: "yxnh93Ybbz2Nm8#mp28zCVv",
|
|
host: 'slowtwitch.northend.network',
|
|
pool: 40, # Adjust based on concurrency needs
|
|
timeout: 5000
|
|
)
|
|
|
|
# Initialize MySQL client to connect to Gossamer Forums database
|
|
@mysql_client = Mysql2::Client.new(
|
|
host: "172.99.0.10",
|
|
username: "admin",
|
|
password: "x0YGLA9252iiTFQuqaM0ROX8FmQzZuUu",
|
|
database: "slowtwitch"
|
|
)
|
|
rescue Mysql2::Error => e
|
|
puts "Error connecting to MySQL: #{e.message}"
|
|
exit 1
|
|
end
|
|
|
|
# # Create a mapping of old Gossamer user IDs to new Discourse user IDs
|
|
# @user_id_map = {}
|
|
initialize_sqlite_id_name_url_db
|
|
end
|
|
|
|
def initialize_sqlite_id_name_url_db
|
|
@db = SQLite3::Database.new '/bitnami/discourse/sqlite/id_name_url_map.db'
|
|
|
|
###### ONLY when we need to clear the url_map and topic_import_status .... e.g. if reimporting topics-posts from scratch
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS url_map;
|
|
# SQL
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS topic_last_post_time;
|
|
# SQL
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS topic_post_count;
|
|
# SQL
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS user_topic_count;
|
|
# SQL
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS user_post_count;
|
|
# SQL
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS topic_last_post_user;
|
|
# SQL
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS topic_post_numbers;
|
|
# SQL
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS highest_processed_post_id;
|
|
# SQL
|
|
# @db.execute <<-SQL
|
|
# DROP TABLE IF EXISTS topic_import_status;
|
|
# SQL
|
|
#######
|
|
|
|
# USER IMPORT - map of old to new user ids, used for topic-post import.
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS user_id_map (
|
|
old_user_id INTEGER PRIMARY KEY,
|
|
new_user_id INTEGER
|
|
);
|
|
SQL
|
|
# CATEGORY IMPORT - map of old to new category ids, used for topic-post import.
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS category_id_map (
|
|
old_category_id INTEGER PRIMARY KEY,
|
|
new_category_id INTEGER
|
|
);
|
|
SQL
|
|
# USER IMPORT - map of old to new usernames for SENDING MIGRATION EMAIL
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS username_map (
|
|
id INTEGER PRIMARY KEY,
|
|
old_username TEXT,
|
|
new_username TEXT,
|
|
email TEXT,
|
|
real_name TEXT
|
|
);
|
|
SQL
|
|
# POST IMPORT - Generate a map of old_post_id (Gossamer) and the new URL for a WEB SERVER REDIRECT FILE
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS url_map (
|
|
old_post_id INTEGER PRIMARY KEY,
|
|
new_url TEXT,
|
|
title TEXT
|
|
);
|
|
SQL
|
|
# POST IMPORT - For each topic, the time of the last / most recent post/reply
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS topic_last_post_time (
|
|
topic_id INTEGER PRIMARY KEY,
|
|
last_post_time INTEGER
|
|
);
|
|
SQL
|
|
# POST IMPORT - For each topic, increment post_count as we add posts.
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS topic_post_count (
|
|
topic_id INTEGER PRIMARY KEY,
|
|
post_count INTEGER DEFAULT 0
|
|
);
|
|
SQL
|
|
# POST IMPORT - For each user (_id), increment topic_count as we add topics (to see total topics per user)
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS user_topic_count (
|
|
user_id INTEGER PRIMARY KEY,
|
|
topic_count INTEGER DEFAULT 0
|
|
);
|
|
SQL
|
|
# POST IMPORT - For each user (_id), increment post_count as we add posts (to see total posts per user)
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS user_post_count (
|
|
user_id INTEGER PRIMARY KEY,
|
|
post_count INTEGER DEFAULT 0
|
|
);
|
|
SQL
|
|
# POST IMPORT - For each topic, the user_id for the last poster / replier
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS topic_last_post_user (
|
|
topic_id INTEGER PRIMARY KEY,
|
|
user_id INTEGER
|
|
);
|
|
SQL
|
|
# POST IMPORT - The number of posts in a given topic, incremented as we add a new reply post to a topic.
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS topic_post_numbers (
|
|
topic_id INTEGER PRIMARY KEY,
|
|
post_number INTEGER DEFAULT 0
|
|
);
|
|
SQL
|
|
# POST IMPORT - Record perssitent integer value for highest processed post id -- not used
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS highest_processed_post_id (
|
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
|
post_id INTEGER
|
|
);
|
|
SQL
|
|
# PERSONAL MESSAGE IMPORT - Record perssitent integer value for highest processed personal id
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS highest_processed_personal_id (
|
|
id INTEGER PRIMARY KEY CHECK (id = 1),
|
|
personal_id INTEGER
|
|
);
|
|
SQL
|
|
# POST IMPORT - For each topic (topic post ID) record status 0 fail or 1 success
|
|
@db.execute <<-SQL
|
|
CREATE TABLE IF NOT EXISTS topic_import_status (
|
|
post_id INTEGER PRIMARY KEY,
|
|
status INTEGER DEFAULT 0
|
|
);
|
|
SQL
|
|
end
|
|
|
|
def insert_user_id_mapping(old_user_id, new_user_id)
|
|
@db.execute "INSERT OR REPLACE INTO user_id_map (old_user_id, new_user_id) VALUES (?, ?)", old_user_id, new_user_id
|
|
end
|
|
|
|
def fetch_user_id_mapping(old_user_id)
|
|
@db.get_first_value "SELECT new_user_id FROM user_id_map WHERE old_user_id = ?", old_user_id
|
|
end
|
|
|
|
def insert_category_id_mapping(old_category_id, new_category_id)
|
|
@db.execute "INSERT OR REPLACE INTO category_id_map (old_category_id, new_category_id) VALUES (?, ?)", old_category_id, new_category_id
|
|
end
|
|
|
|
def fetch_category_id_mapping(old_category_id)
|
|
@db.get_first_value "SELECT new_category_id FROM category_id_map WHERE old_category_id = ?", old_category_id
|
|
end
|
|
|
|
def insert_username_mapping(old_username, new_username, email, real_name)
|
|
@db.execute "INSERT OR REPLACE INTO username_map (old_username, new_username, email, real_name) VALUES (?, ?, ?, ?)", old_username, new_username, email, real_name
|
|
end
|
|
|
|
# Define a method to export the username mapping table to a CSV file
|
|
def export_username_mapping_to_csv(filename)
|
|
CSV.open(filename, 'w') do |csv|
|
|
# Add headers
|
|
csv << ['Old Username', 'New Username', 'Email', 'Full Name']
|
|
|
|
# Fetch data from the database
|
|
@db.execute("SELECT old_username, new_username, email, real_name FROM username_map") do |row|
|
|
csv << row
|
|
end
|
|
end
|
|
puts "Exported changed username mappings to #{filename}"
|
|
end
|
|
|
|
# Insert a URL mapping into the SQLite database
|
|
def insert_url_mapping(old_post_id, new_url, title)
|
|
@db.execute "INSERT OR REPLACE INTO url_map (old_post_id, new_url, title) VALUES (?, ?, ?)", [old_post_id, new_url, title]
|
|
end
|
|
|
|
# Export the URL mappings to a CSV file
|
|
def export_url_mapping_to_csv(filename)
|
|
CSV.open(filename, "w") do |csv|
|
|
# Add headers
|
|
csv << ["Old Post ID", "New URL", "Title"]
|
|
@db.execute("SELECT old_post_id, new_url, title FROM url_map") do |row|
|
|
csv << row
|
|
end
|
|
end
|
|
puts "Exported URL mappings to #{filename}"
|
|
end
|
|
|
|
# Method to create Nginx rewrite rules file
|
|
def create_nginx_rewrite_rules(filename)
|
|
File.open(filename, "w") do |file|
|
|
@db.execute("SELECT old_post_id, new_url FROM url_map") do |row|
|
|
old_post_id, new_url = row
|
|
file.puts "rewrite ^/forum/.*P#{old_post_id}/$ #{new_url} permanent;"
|
|
end
|
|
end
|
|
end
|
|
|
|
# Fetch the highest old_post_id from the url_map table
|
|
def fetch_highest_old_post_id
|
|
@db.get_first_value "SELECT MAX(old_post_id) FROM url_map"
|
|
end
|
|
|
|
# Helper methods to interact with the SQLite database for persisting topic-post related values
|
|
def fetch_db(table, key)
|
|
@db.get_first_value "SELECT value FROM #{table} WHERE key = ?", key
|
|
end
|
|
|
|
def update_db(table, key, value)
|
|
@db.execute "INSERT OR REPLACE INTO #{table} (key, value) VALUES (?, ?)", key, value
|
|
end
|
|
|
|
def fetch_db_topic_last_post_time(topic_id)
|
|
@db.get_first_value "SELECT last_post_time FROM topic_last_post_time WHERE topic_id = ?", topic_id
|
|
end
|
|
|
|
def fetch_db_topic_last_post_user(topic_id)
|
|
@db.get_first_value "SELECT user_id FROM topic_last_post_user WHERE topic_id = ?", topic_id
|
|
end
|
|
|
|
def fetch_db_topic_post_count(topic_id)
|
|
@db.get_first_value "SELECT post_count FROM topic_post_count WHERE topic_id = ?", topic_id
|
|
end
|
|
|
|
def fetch_db_user_topic_count(user_id)
|
|
@db.get_first_value "SELECT topic_count FROM user_topic_count WHERE user_id = ?", user_id
|
|
end
|
|
|
|
def fetch_db_user_post_count(user_id)
|
|
@db.get_first_value "SELECT post_count FROM user_post_count WHERE user_id = ?", user_id
|
|
end
|
|
|
|
def fetch_db_topic_post_numbers(topic_id)
|
|
@db.get_first_value "SELECT post_number FROM topic_post_numbers WHERE topic_id = ?", topic_id
|
|
end
|
|
|
|
def update_db_topic_last_post_time(topic_id, last_post_time)
|
|
@db.execute "INSERT OR REPLACE INTO topic_last_post_time (topic_id, last_post_time) VALUES (?, ?)", topic_id, last_post_time
|
|
end
|
|
|
|
def update_db_topic_last_post_user(topic_id, user_id)
|
|
@db.execute "INSERT OR REPLACE INTO topic_last_post_user (topic_id, user_id) VALUES (?, ?)", topic_id, user_id
|
|
end
|
|
|
|
def update_db_topic_post_count(topic_id, post_count)
|
|
@db.execute "INSERT OR REPLACE INTO topic_post_count (topic_id, post_count) VALUES (?, ?)", topic_id, post_count
|
|
end
|
|
|
|
def update_db_user_topic_count(user_id, topic_count)
|
|
@db.execute "INSERT OR REPLACE INTO user_topic_count (user_id, topic_count) VALUES (?, ?)", user_id, topic_count
|
|
end
|
|
|
|
def update_db_user_post_count(user_id, post_count)
|
|
@db.execute "INSERT OR REPLACE INTO user_post_count (user_id, post_count) VALUES (?, ?)", user_id, post_count
|
|
end
|
|
|
|
def update_db_topic_post_numbers(topic_id, post_number)
|
|
@db.execute "INSERT OR REPLACE INTO topic_post_numbers (topic_id, post_number) VALUES (?, ?)", topic_id, post_number
|
|
end
|
|
|
|
# Fetch the highest processed post_id from the highest_processed_post_id table
|
|
def fetch_highest_processed_post_id
|
|
@db.get_first_value "SELECT post_id FROM highest_processed_post_id WHERE id = 1"
|
|
end
|
|
|
|
# Update the highest processed post_id in the highest_processed_post_id table
|
|
def update_highest_processed_post_id(post_id)
|
|
@db.execute "INSERT OR REPLACE INTO highest_processed_post_id (id, post_id) VALUES (1, ?)", post_id
|
|
end
|
|
|
|
# Fetch the highest processed personal_id from the highest_processed_personal_id table
|
|
def fetch_highest_processed_personal_id
|
|
@db.get_first_value "SELECT personal_id FROM highest_processed_personal_id WHERE id = 1"
|
|
end
|
|
|
|
# Update the highest processed personal_id in the highest_processed_personal_id table
|
|
def update_highest_processed_personal_id(personal_id)
|
|
@db.execute "INSERT OR REPLACE INTO highest_processed_personal_id (id, personal_id) VALUES (1, ?)", personal_id
|
|
end
|
|
|
|
# Check if post_id exists and its status
|
|
def fetch_post_status(post_id)
|
|
result = @db.execute("SELECT status FROM topic_import_status WHERE post_id = ?", post_id).flatten.first
|
|
result.nil? ? nil : result.to_i
|
|
end
|
|
|
|
# Mark post_id as complete
|
|
def mark_post_as_complete(post_id)
|
|
@db.execute("INSERT OR REPLACE INTO topic_import_status (post_id, status) VALUES (?, 1)", post_id)
|
|
end
|
|
|
|
# Mark post_id as failed
|
|
def mark_post_as_failed(post_id)
|
|
@db.execute("INSERT OR REPLACE INTO topic_import_status (post_id, status) VALUES (?, 0)", post_id)
|
|
end
|
|
|
|
# Execute an SQL query on the Gossamer Forums database
|
|
def execute_query(query)
|
|
@mysql_client.query(query, as: :hash)
|
|
end
|
|
|
|
# Execute an SQL query on the Gossamer Forums database
|
|
def execute_query_concurrent(query, mysql_client)
|
|
mysql_client.query(query, as: :hash)
|
|
end
|
|
|
|
# Sanitize the username to meet Discourse's requirements
|
|
def sanitize_username(original_username, email, real_name)
|
|
# original_username = username
|
|
sanitized_username = original_username.gsub(/[^a-zA-Z0-9._-]/, '_')
|
|
sanitized_username = "#{sanitized_username}." if sanitized_username.length < 2 # Allow two-character usernames
|
|
sanitized_username = sanitized_username[0, 20] if sanitized_username.length > 20
|
|
firststep_sanitized = sanitized_username
|
|
|
|
existing_user = User.find_by(username: sanitized_username)
|
|
|
|
if existing_user
|
|
if existing_user.email.downcase == email.downcase && existing_user.name == real_name
|
|
# The existing user with the username the same as the current proposed sanitised name _is_ the same person...
|
|
return sanitized_username
|
|
else
|
|
# We cannot clobber another person with the same proposed username, so we resolve the conflict
|
|
counter = 1
|
|
while User.exists?(username: sanitized_username)
|
|
sanitized_username = "#{firststep_sanitized}_#{counter}"
|
|
sanitized_username = sanitized_username[0, 20] if sanitized_username.length > 20
|
|
counter += 1
|
|
end
|
|
end
|
|
end
|
|
|
|
if original_username != sanitized_username
|
|
# The Discourse username is not the same as the Gossamer Forums username
|
|
puts "Sanitized username: '#{original_username}' --> '#{sanitized_username}'"
|
|
insert_username_mapping(original_username, sanitized_username, email, real_name)
|
|
# else
|
|
# puts "UNsanitized username: '#{original_username}' --> '#{sanitized_username}'"
|
|
end
|
|
|
|
sanitized_username
|
|
end
|
|
|
|
# Sanitize email to replace restricted domains
|
|
def sanitize_email(email)
|
|
restricted_domains = ['mailinator.com', 'example.com'] # Add more restricted domains as needed
|
|
domain = email.split('@').last
|
|
|
|
if restricted_domains.include?(domain)
|
|
sanitized_email = email.gsub(domain, 'example.org') # Change to a permissible domain
|
|
puts "Sanitized email: '#{email}' --> '#{sanitized_email}'"
|
|
return sanitized_email
|
|
end
|
|
|
|
email
|
|
end
|
|
|
|
def generate_unique_title(base_title, user_id, timestamp)
|
|
unique_title = base_title
|
|
suffix = 1
|
|
|
|
puts "Generating unique title for base title: #{base_title}"
|
|
|
|
while Topic.find_by(title: unique_title)
|
|
# while Topic.where(title: unique_title).exists?
|
|
# while Topic.exists?(title: unique_title)
|
|
puts "Title '#{unique_title}' already exists, generating a new one."
|
|
unique_title = "#{base_title} (#{suffix})"
|
|
puts "NEW unique_title #{unique_title}"
|
|
suffix += 1
|
|
end
|
|
|
|
puts "Final unique title: #{unique_title}"
|
|
unique_title
|
|
end
|
|
|
|
# Helper method to download an attachment / image from a URL
|
|
def download_attachment(url)
|
|
begin
|
|
puts "URL: '#{url}'"
|
|
URI.open(url, ssl_verify_mode: OpenSSL::SSL::VERIFY_NONE).read
|
|
rescue OpenURI::HTTPError => e
|
|
puts "Failed to download attachment from #{url}: #{e.message}"
|
|
nil
|
|
rescue URI::InvalidURIError => e
|
|
puts "Failed to handle invalid URL/URI for #{url}: #{e.message}"
|
|
nil
|
|
end
|
|
end
|
|
|
|
# Helper method to upload an attachment / image to Discourse
|
|
def upload_attachment(user_id, file, filename, gossamer_url)
|
|
begin
|
|
|
|
# BAD
|
|
# upload = Upload.create!(
|
|
# user_id: user_id,
|
|
# original_filename: filename,
|
|
# filesize: file.size,
|
|
# # filesize: File.size(file.path),
|
|
# # content_type: `file --brief --mime-type #{file.path}`.strip,
|
|
# # sha1: Digest::SHA1.file(file.path).hexdigest,
|
|
# # origin: 'user_avatar',
|
|
# # retain_hours: nil,
|
|
# url: gossamer_url
|
|
# )
|
|
# # Error -- non-existent method upload.ensure_consistency!
|
|
|
|
# Use the correct content type and SHA1 hash for the upload
|
|
# content_type = `file --brief --mime-type #{file.path}`.strip
|
|
|
|
# Generate SHA1 hash for the file
|
|
sha1 = Digest::SHA1.file(file.path).hexdigest
|
|
|
|
# Ensure the file size is correctly handled
|
|
file_size = File.size(file.path)
|
|
|
|
puts "Attempting to upload attachment. filename #{filename} user_id #{user_id} file.size #{file.size} file_size #{file_size} sha1 #{sha1} gossamer_url #{gossamer_url}"
|
|
|
|
|
|
# Create the upload record, adjusting fields as needed for 3.2.5 compatibility
|
|
# upload = Upload.create!(
|
|
upload = Upload.new(
|
|
user_id: user_id,
|
|
original_filename: filename,
|
|
filesize: file.size,
|
|
sha1: sha1,
|
|
url: gossamer_url
|
|
# content_type: content_type,
|
|
# origin: 'composer',
|
|
# retain_hours: nil
|
|
)
|
|
|
|
# Save the upload object before generating the file path
|
|
upload.save!
|
|
|
|
# Use FileStore::LocalStore to store the file
|
|
store = FileStore::LocalStore.new
|
|
upload.url = store.store_file(file, store.get_path_for('original', upload.id, upload.sha1, File.extname(file.path)))
|
|
|
|
# # Use Discourse's internal method to upload the file
|
|
# file.rewind
|
|
# Discourse.store.upload(file, upload.sha1)
|
|
|
|
# # Move the file to the correct location
|
|
# upload_path = Upload.get_path_for_file(upload.sha1)
|
|
# FileUtils.mkdir_p(File.dirname(upload_path))
|
|
# FileUtils.mv(file.path, upload.path)
|
|
|
|
# Save the upload object
|
|
upload.save!
|
|
|
|
# # Check if further processing or compatibility handling is needed
|
|
# if Discourse.respond_to?(:store)
|
|
# store = Discourse.store
|
|
# upload.url = store.store_file(file, store.get_path_for('original', upload.id, upload.sha1, File.extname(file.path)))
|
|
# else
|
|
# # Fallback for earlier file store methods
|
|
# upload_path = Upload.get_path_for_file(upload.sha1)
|
|
# FileUtils.mkdir_p(File.dirname(upload_path))
|
|
# FileUtils.mv(file.path, upload_path)
|
|
# end
|
|
|
|
# Return the upload object
|
|
upload
|
|
rescue => e
|
|
puts "FAILURE: Failed to upload attachment #{filename} for user_id #{user_id}: #{e.message}"
|
|
puts e.backtrace.join("\n") # Print the full stack trace
|
|
nil
|
|
end
|
|
end
|
|
|
|
# Helper method to upload an attachment / image to Discourse
|
|
def upload_attachment_two(user_id, file, filename)
|
|
begin
|
|
upload = UploadCreator.new(file, filename, origin: 'import').create_for(user_id)
|
|
|
|
# Raise an erorr if the upload fails to catch issues early
|
|
raise "Upload failed" unless upload
|
|
|
|
upload
|
|
rescue => e
|
|
puts "Failed to upload attachment #{filename} for user_id #{user_id}: #{e.message}"
|
|
puts e.backtrace.join("\n") # Print the full stack trace
|
|
nil
|
|
end
|
|
end
|
|
|
|
|
|
# Helper method to handle post attachments
|
|
def handle_post_attachments(gossamer_post_id, post, user_id, mysql_client)
|
|
execute_query_concurrent("SELECT * FROM gforum_PostAttachment WHERE post_id_fk = #{gossamer_post_id}", mysql_client).each do |att_row|
|
|
attachment_url = "https://forum.slowtwitch.com/forum/?do=post_attachment;postatt_id=#{att_row['postatt_id']}"
|
|
puts "Handling attachment: #{attachment_url}"
|
|
attachment_data = download_attachment(attachment_url)
|
|
next unless attachment_data
|
|
|
|
mime_type = att_row['postatt_content']
|
|
|
|
# Create a Tempfile to pass to the UploadCreator
|
|
temp_file = Tempfile.new(['attachment', File.extname(att_row['postatt_filename'])])
|
|
temp_file.binmode
|
|
temp_file.write(attachment_data)
|
|
temp_file.rewind
|
|
|
|
puts "Attempting upload..."
|
|
upload = upload_attachment_two(user_id, temp_file, att_row['postatt_filename'])
|
|
next unless upload
|
|
|
|
# Get the URL of the uploaded file from Discourse
|
|
# upload_url = Upload.get_from_url(upload.url).url
|
|
upload_url = upload.url
|
|
|
|
puts "Appending to post.raw... #{upload_url} MIME type: #{mime_type}"
|
|
if mime_type.start_with?('image/')
|
|
post.raw += "\n![#{att_row['postatt_filename']}](#{upload_url})"
|
|
else
|
|
post.raw += "\n[#{att_row['postatt_filename']}](#{upload_url})"
|
|
end
|
|
post.save!
|
|
|
|
temp_file.close
|
|
temp_file.unlink
|
|
end
|
|
end
|
|
|
|
|
|
# def download_file(url)
|
|
# require 'open-uri'
|
|
# begin
|
|
# file = Tempfile.new
|
|
# file.binmode
|
|
# file.write(URI.open(url).read)
|
|
# file.rewind
|
|
# file
|
|
# rescue => e
|
|
# puts "Failed to download file from #{url}: #{e.message}"
|
|
# puts e.backtrace.join("\n") # Print the full stack trace
|
|
# nil
|
|
# end
|
|
# end
|
|
|
|
# Helper method to upload an image to Discourse
|
|
# def upload_image(user, image_data, filename)
|
|
# return if image_data.nil?
|
|
#
|
|
# upload = Upload.create_for(user.id, File.open(image_data.path), filename, 'image/jpeg')
|
|
# if upload.nil? || !upload.persisted?
|
|
# puts "Failed to upload image for user #{user.username}"
|
|
# return
|
|
# end
|
|
#
|
|
# upload
|
|
# end
|
|
|
|
# Add 'Former_User' account if not already present
|
|
def add_former_user
|
|
puts "Adding 'Former User' account if it does not exist..."
|
|
|
|
former_user = User.find_by(username: 'Former_User')
|
|
|
|
if former_user.nil?
|
|
former_user = User.create!(
|
|
username: 'Former_User',
|
|
name: 'Former User',
|
|
email: 'former_user@example.com',
|
|
password: SecureRandom.hex(16),
|
|
active: true
|
|
)
|
|
puts "'Former User' account created with user ID: #{former_user.id}"
|
|
|
|
# Store the user ID mapping
|
|
puts "for insert_user_id_mapping: 0 (former_user) discourse_user.id #{former_user.id}"
|
|
insert_user_id_mapping(0, former_user.id)
|
|
else
|
|
puts "'Former User' account already exists with user ID: #{former_user.id}"
|
|
end
|
|
end
|
|
|
|
# Import users from Gossamer Forums to Discourse
|
|
def import_users
|
|
puts "Importing Users..."
|
|
users = []
|
|
|
|
# Fetch all users from Gossamer Forums
|
|
execute_query("SELECT * FROM gforum_User").each do |row|
|
|
users << {
|
|
id: row['user_id'],
|
|
username: sanitize_username(row['user_username'], row['user_email'], row['user_real_name']),
|
|
email: row['user_email'],
|
|
created_at: Time.at(row['user_registered']),
|
|
updated_at: Time.at(row['user_last_seen']),
|
|
name: row['user_real_name'],
|
|
title: row['user_title'],
|
|
bio_raw: row['user_about'] || "",
|
|
website: row['user_homepage'],
|
|
location: row['user_location'],
|
|
custom_fields: {
|
|
md5_password: row['user_password'],
|
|
original_username: row['user_username'],
|
|
original_gossamer_id: row['user_id']
|
|
}
|
|
}
|
|
end
|
|
|
|
# Create or update users in Discourse
|
|
create_users(users) do |user|
|
|
# insert_user_id_mapping(user[:id], user.id)
|
|
user
|
|
end
|
|
end
|
|
|
|
# Generate SQLite user ID mapping between Discourse and Gossamer
|
|
def generate_user_id_mapping
|
|
puts "Generating User ID Mapping..."
|
|
users = []
|
|
|
|
# Fetch all users from Gossamer Forums
|
|
execute_query("SELECT * FROM gforum_User").each do |row|
|
|
users << {
|
|
id: row['user_id'],
|
|
username: sanitize_username(row['user_username'], row['user_email'], row['user_real_name']),
|
|
email: row['user_email'],
|
|
created_at: Time.at(row['user_registered']),
|
|
updated_at: Time.at(row['user_last_seen']),
|
|
name: row['user_real_name'],
|
|
title: row['user_title'],
|
|
bio_raw: row['user_about'] || "",
|
|
website: row['user_homepage'],
|
|
location: row['user_location'],
|
|
custom_fields: {
|
|
md5_password: row['user_password'],
|
|
original_username: row['user_username'],
|
|
original_gossamer_id: row['user_id']
|
|
}
|
|
}
|
|
end
|
|
|
|
# For each user, add user ID mapping to SQLite now that we know what the Discourse user ID is
|
|
users.each do |user|
|
|
# discourse_username = sanitize_username(user[:username], user[:email], user[:name])
|
|
discourse_username = user[:username]
|
|
discourse_user = User.find_by(username: discourse_username)
|
|
|
|
if discourse_user.nil?
|
|
puts "User #{user[:username]} --> #{discourse_username} not found in Discourse. Skipping user mapping addition."
|
|
next
|
|
end
|
|
|
|
# Store the user ID mapping
|
|
# @user_id_map[user[:id]] = discourse_user.id
|
|
puts "for insert_user_id_mapping: user[:id] #{user[:id]} discourse_user.id #{discourse_user.id}"
|
|
insert_user_id_mapping(user[:id], discourse_user.id)
|
|
end
|
|
end
|
|
|
|
# Import and set user Bio and Images
|
|
def set_user_bio_images
|
|
puts "Setting User Bio and Images..."
|
|
users = []
|
|
|
|
# Fetch all users from Gossamer Forums
|
|
execute_query("SELECT * FROM gforum_User").each do |row|
|
|
users << {
|
|
id: row['user_id'],
|
|
username: sanitize_username(row['user_username'], row['user_email'], row['user_real_name']),
|
|
email: row['user_email'],
|
|
created_at: Time.at(row['user_registered']),
|
|
updated_at: Time.at(row['user_last_seen']),
|
|
name: row['user_real_name'],
|
|
title: row['user_title'],
|
|
bio_raw: row['user_about'] || "",
|
|
website: row['user_homepage'],
|
|
location: row['user_location'],
|
|
custom_fields: {
|
|
md5_password: row['user_password'],
|
|
original_username: row['user_username'],
|
|
original_gossamer_id: row['user_id']
|
|
}
|
|
}
|
|
end
|
|
|
|
# For each user, append user bio and import user files
|
|
users.each do |user|
|
|
# discourse_username = sanitize_username(user[:username], user[:email], user[:name])
|
|
discourse_username = user[:username]
|
|
discourse_user = User.find_by(username: discourse_username)
|
|
|
|
if discourse_user.nil?
|
|
puts "User #{user[:username]} --> #{discourse_username} not found in Discourse. Skipping bio-image setting."
|
|
next
|
|
end
|
|
|
|
# Ensure user profile exists and bio_raw is a string
|
|
discourse_user.user_profile ||= UserProfile.new(user_id: discourse_user.id)
|
|
discourse_user.user_profile.bio_raw ||= ""
|
|
|
|
# Append bio if it exists, otherwise set it to empty string to avoid nil errors
|
|
if discourse_user.user_profile.bio_raw.empty?
|
|
discourse_user.user_profile.bio_raw = user[:bio_raw]
|
|
else
|
|
discourse_user.user_profile.bio_raw += "\n\n" + user[:bio_raw]
|
|
end
|
|
|
|
# Ensure the bio does not exceed 3000 characters
|
|
if discourse_user.user_profile.bio_raw.length > 3000
|
|
puts "Warning: About Me for user #{discourse_user.username} (ID: #{discourse_user.id}) exceeds 3000 characters. Truncating."
|
|
discourse_user.user_profile.bio_raw = discourse_user.user_profile.bio_raw[0, 3000]
|
|
end
|
|
discourse_user.user_profile.save!
|
|
|
|
# Import user files
|
|
# TEMPORARY!! -- Use if script is interrupted mid-user-importing.
|
|
# if user[:id] > 353
|
|
import_user_files(discourse_user)
|
|
# end
|
|
end
|
|
end
|
|
|
|
# # Import user files from Gossamer Forums to Discourse
|
|
# def import_user_files(user)
|
|
# print "\rImporting files for user #{user.username}..."
|
|
#
|
|
# original_gossamer_id = user.custom_fields['original_gossamer_id']
|
|
# if original_gossamer_id.nil? || original_gossamer_id.empty?
|
|
# puts "User #{user.username} does not have a valid original_gossamer_id. Skipping file import."
|
|
# return
|
|
# end
|
|
#
|
|
# # puts "Original Gossamer ID for user #{user.username}: #{original_gossamer_id}"
|
|
#
|
|
# # Fetch and import user files
|
|
# execute_query("SELECT * FROM gforum_User_Files WHERE ForeignColKey = #{original_gossamer_id}").each do |file|
|
|
# # Construct the file URL
|
|
# file_url = "https://forum.slowtwitch.com/images/users/images/#{file['ID'] % 10}/#{file['ID']}-#{file['File_Name']}"
|
|
# puts "User #{user.username} User ID: #{user.id} original_gossamer_id: #{original_gossamer_id} file_url: #{file_url}"
|
|
#
|
|
# new_bio = user.user_profile.bio_raw + "\n\n![#{file['File_Name']}](#{file_url})"
|
|
# if new_bio.length > 3000
|
|
# puts "Warning: About Me for user #{user.username} (ID: #{user.id}) exceeds 3000 characters after adding file link. Truncating."
|
|
# new_bio = new_bio[0, 3000]
|
|
# end
|
|
# user.user_profile.bio_raw = new_bio
|
|
# user.user_profile.save!
|
|
# end
|
|
# print "Importing files for user #{user.username}... Done.\n"
|
|
# end
|
|
|
|
# Helper method to convert TIFF to PNG
|
|
def convert_tiff_to_png(file_path)
|
|
png_path = file_path.sub('.tiff', '.png').sub('.tif', '.png')
|
|
begin
|
|
system("convert #{file_path} #{png_path}")
|
|
png_path if File.exist?(png_path)
|
|
rescue => e
|
|
puts "Failed to convert image #{file_path}: #{e.message}"
|
|
puts e.backtrace.join("\n") # Print the full stack trace
|
|
nil
|
|
end
|
|
end
|
|
|
|
# Helper method to resize an image to specified dimensions
|
|
def resize_image(file_path, max_width, max_height)
|
|
resized_path = file_path.sub(File.extname(file_path), "_resized#{File.extname(file_path)}")
|
|
begin
|
|
system("convert #{file_path} -resize #{max_width}x#{max_height} #{resized_path}")
|
|
resized_path if File.exist?(resized_path)
|
|
rescue => e
|
|
puts "Failed to resize image #{file_path}: #{e.message}"
|
|
puts e.backtrace.join("\n") # Print the full stack trace
|
|
nil
|
|
end
|
|
end
|
|
|
|
# Create avatar for the user using the provided image path
|
|
def create_avatar(user, avatar_path)
|
|
tempfile = Tempfile.new(['avatar', File.extname(avatar_path)])
|
|
FileUtils.copy_file(avatar_path, tempfile.path)
|
|
filename = "avatar#{File.extname(avatar_path)}"
|
|
upload = UploadCreator.new(tempfile, filename, type: "avatar").create_for(user.id)
|
|
|
|
if upload.present? && upload.persisted?
|
|
user.create_user_avatar
|
|
user.user_avatar.update(custom_upload_id: upload.id)
|
|
user.update(uploaded_avatar_id: upload.id)
|
|
else
|
|
puts "Failed to upload avatar for user #{user.username}: #{avatar_path}"
|
|
puts upload.errors.inspect if upload
|
|
end
|
|
rescue StandardError => e
|
|
puts "Failed to create avatar for user #{user.username}: #{avatar_path}. Error: #{e.message}"
|
|
ensure
|
|
tempfile.close! if tempfile
|
|
end
|
|
|
|
# Import user files (profile images) from Gossamer Forums to Discourse
|
|
def import_user_files(user)
|
|
print "\rImporting files for user #{user.username}..."
|
|
|
|
original_gossamer_id = user.custom_fields['original_gossamer_id']
|
|
if original_gossamer_id.nil? || original_gossamer_id.empty?
|
|
puts "User #{user.username} does not have a valid original_gossamer_id. Skipping file import."
|
|
return
|
|
end
|
|
|
|
puts "Original Gossamer ID for user #{user.username}: #{original_gossamer_id}"
|
|
|
|
images_imported = 0
|
|
|
|
execute_query("SELECT * FROM gforum_User_Files WHERE ForeignColKey = #{original_gossamer_id}").each do |file|
|
|
|
|
# Encode the filename twice to match how it is stored on the server
|
|
double_encoded_filename = file['File_Name'].gsub('%', '%25')
|
|
encoded_filename = URI.encode_www_form_component(double_encoded_filename)
|
|
|
|
# Construct the file URL
|
|
file_url = "https://forum.slowtwitch.com/images/users/images/#{file['ID'] % 10}/#{file['ID']}-#{encoded_filename}"
|
|
puts "User #{user.username} User ID: #{user.id} original_gossamer_id: #{original_gossamer_id} file_url: #{file_url}"
|
|
|
|
# Ensure the file is a user image and has a supported MIME type
|
|
next unless file['ForeignColName'] =~ /^user_image\d+$/
|
|
puts "#A"
|
|
next unless ['image/jpeg', 'image/png', 'image/gif', 'image/webp', 'image/avif', 'image/heif', 'image/heic'].include?(file['File_MimeType'])
|
|
puts "#B"
|
|
|
|
# Download the attachment
|
|
image_data = download_attachment(file_url)
|
|
next if image_data.nil?
|
|
puts "#C"
|
|
|
|
# Write the image data to a temporary file
|
|
temp_file = Tempfile.new(['user_image', File.extname(file['File_Name'])])
|
|
temp_file.binmode
|
|
temp_file.write(image_data)
|
|
temp_file.rewind
|
|
|
|
# Convert TIFF to PNG if necessary
|
|
|
|
# if file['File_MimeType'] == 'image/tiff'
|
|
# begin
|
|
# converted_tiff_to_png_path = convert_tiff_to_png(temp_file.path)
|
|
# raise "Conversion of TIFF failed" if converted_tiff_to_png_path.nil?
|
|
# temp_file.close
|
|
# temp_file.unlink
|
|
# temp_file = File.open(converted_tiff_to_png_path)
|
|
# rescue => e
|
|
# puts "Skipping image due to convert failure: #{temp_file.path}"
|
|
# temp_file.close
|
|
# temp_file.unlink
|
|
# next
|
|
# end
|
|
# end
|
|
|
|
# Resize the image for the avatar / profile picture (200x200)
|
|
begin
|
|
resized_image_path = resize_image(temp_file.path, 200, 200)
|
|
raise "Image resize failed" if resized_image_path.nil?
|
|
resized_temp_file = Tempfile.new(['user_image_resized', '.png'])
|
|
FileUtils.copy_file(resized_image_path, resized_temp_file.path)
|
|
rescue => e
|
|
puts "Skipping image due to resize failure: #{temp_file.path}"
|
|
puts e.backtrace.join("\n") # Print the full stack trace
|
|
temp_file.close
|
|
temp_file.unlink
|
|
next
|
|
end
|
|
|
|
# Only handle the first image for avator and profile settings
|
|
if images_imported == 0
|
|
puts "#D"
|
|
|
|
# Upload the original image for profile header and user card background
|
|
upload = upload_attachment(user.id, temp_file, file['File_Name'], file_url)
|
|
next if upload.nil?
|
|
|
|
# Upload the resized image for the avatar
|
|
resized_upload = upload_attachment(user.id, resized_temp_file, file['File_Name'], file_url)
|
|
next if resized_upload.nil?
|
|
|
|
# Ensure the upload ID is valid and exists
|
|
if resized_upload
|
|
# && UserAvatar.exists?(custom_upload_id: resized_upload.id)
|
|
# Set the avatar using the resized image
|
|
# user.user_avatar = UserAvatar.create!(user_id: user.id, custom_upload_id: resized_upload.id)
|
|
# user.save!
|
|
# user.create_user_avatar
|
|
# user.user_avatar.update(custom_upload_id: resized_upload.id)
|
|
# user.update(uploaded_avatar_id: resized_upload.id)
|
|
create_avatar(user, resized_temp_file.path)
|
|
puts "Avatar set for user #{user.username} with upload ID #{resized_upload.id}"
|
|
else
|
|
puts "Failed to set avatar for user #{user.username}"
|
|
end
|
|
|
|
# Set the Profile Header
|
|
UserProfile.find_by(user_id: user.id).update!(profile_background_upload_id: upload.id)
|
|
|
|
# Set the User Card Background
|
|
UserProfile.find_by(user_id: user.id).update!(card_background_upload_id: upload.id)
|
|
|
|
images_imported += 1
|
|
end
|
|
|
|
puts "#E"
|
|
|
|
# Append the image to the user's bio_raw only if it does not already exist
|
|
image_markdown = "\n\n![#{file['File_Name']}](#{file_url})"
|
|
user.user_profile.bio_raw ||= ""
|
|
unless user.user_profile.bio_raw.include?(image_markdown)
|
|
user.user_profile.bio_raw += image_markdown
|
|
user.user_profile.save!
|
|
end
|
|
|
|
# Clean up temporary files
|
|
temp_file.close
|
|
temp_file.unlink if temp_file.is_a?(Tempfile)
|
|
resized_temp_file.close
|
|
resized_temp_file.unlink
|
|
end
|
|
|
|
# Check for and remove exact duplicate image markdown strings
|
|
user.user_profile.bio_raw = user.user_profile.bio_raw.split("\n\n").uniq.join("\n\n")
|
|
user.user_profile.save!
|
|
|
|
print "Importing files for user #{user.username}... Done.\n"
|
|
end
|
|
|
|
|
|
# Import categories from Gossamer Forums to Discourse
|
|
def import_categories
|
|
puts "Importing categories (forums)..."
|
|
execute_query("SELECT * FROM gforum_Forum").each do |row|
|
|
# Only create category if it does not exist
|
|
unless CategoryCustomField.exists?(name: 'original_gossamer_id', value: row['forum_id'])
|
|
category_name = row['forum_name']
|
|
category_description = row['forum_desc'] || "No description provided"
|
|
puts "id #{row['forum_id']} name #{category_name} description #{category_description}"
|
|
# Create category in Discourse
|
|
category = create_category(
|
|
{
|
|
# id: row['forum_id'] + 10,
|
|
name: category_name,
|
|
description: category_description,
|
|
created_at: row['forum_last'] ? Time.at(row['forum_last']) : Time.now,
|
|
updated_at: row['forum_last'] ? Time.at(row['forum_last']) : Time.now
|
|
},
|
|
row['forum_id'] # import_id argument
|
|
)
|
|
|
|
# # Map Gossamer forum ID to Discourse category ID for future reference
|
|
# @forum_id_map[row['forum_id']] = category.id
|
|
|
|
# category.custom_fields.create!(name: 'original_gossamer_id', value: row['forum_id'])
|
|
category.custom_fields['original_gossamer_id'] = row['forum_id']
|
|
category.save!
|
|
|
|
# Store the user ID mapping
|
|
puts "for insert_category_id_mapping: category[:id] #{category[:id]} row['forum_id'] #{row['forum_id']}"
|
|
insert_category_id_mapping(row['forum_id'], category[:id])
|
|
end
|
|
end
|
|
puts "Importing categories... Done."
|
|
end
|
|
|
|
# Helper function to ensure title meets the minimum length requirement
|
|
def ensure_valid_title(title, min_length = 5)
|
|
if title.length < min_length
|
|
title += "." * (min_length - title.length) # Append dots to make it longer
|
|
end
|
|
title
|
|
end
|
|
|
|
# # Convert Gossamer tags to Discourse markdown
|
|
# def convert_gossamer_tags_to_markdown(text)
|
|
# text.gsub!(/\[b\](.*?)\[\/b\]/, '**\1**')
|
|
# text.gsub!(/\[i\](.*?)\[\/i\]/, '*\1*')
|
|
# text.gsub!(/\[img\](.*?)\[\/img\]/, '![image](\1)')
|
|
# text.gsub!(/\[quote\](.*?)\[\/quote\]/m, '[quote]\1[/quote]')
|
|
# text.gsub!(/\[quote (.*?)\](.*?)\[\/quote\]/m, '[quote=\1]\2[/quote]')
|
|
# text.gsub!(/\[font "(.*?)"\](.*?)\[\/font\]/m, '\2') # Ignoring font changes
|
|
# text.gsub!(/\[size (\d+)\](.*?)\[\/size\]/m, '\2') # Ignoring size changes
|
|
# text
|
|
# end
|
|
|
|
# Sanitize post message to remove Gossamer-specific tags and convert to Discourse-friendly format
|
|
def sanitize_post_message(message)
|
|
# Ensure the raw post string contents itself is acceptable to Discourse
|
|
sanitized_message = message&.tr("\0", '') || ""
|
|
|
|
# Ensure minimum length
|
|
if sanitized_message.strip.empty? || sanitized_message.length < 3
|
|
sanitized_message = "Empty post contents."
|
|
end
|
|
|
|
# Ensure sentence structure
|
|
unless sanitized_message.match?(/[.!?]\s|[.!?]$/)
|
|
sanitized_message += "."
|
|
end
|
|
|
|
# Remove the [signature] as we don't support this in Discourse
|
|
# sanitized_message.sub!(/\n?\[signature\]\n?\z/, '')
|
|
sanitized_message.gsub(/\n?\[signature\]\n?/, '')
|
|
|
|
# Convert Gossamer tags to Discourse markdown
|
|
sanitized_message.gsub!(/\[b\](.*?)\[\/b\]/, '**\1**')
|
|
sanitized_message.gsub!(/\[i\](.*?)\[\/i\]/, '*\1*')
|
|
sanitized_message.gsub!(/\[u\](.*?)\[\/u\]/, '<u>\1</u>')
|
|
sanitized_message.gsub!(/\[quote\](.*?)\[\/quote\]/m, '[quote]\1[/quote]')
|
|
sanitized_message.gsub!(/\[quote\s+user=(.*?)\](.*?)\[\/quote\]/m, '[quote \1]\2[/quote]')
|
|
sanitized_message.gsub!(/\[img\](.*?)\[\/img\]/, '![image](\1)')
|
|
sanitized_message.gsub!(/\[url=(.*?)\](.*?)\[\/url\]/, '[\2](\1)')
|
|
sanitized_message.gsub!(/\[size=(.*?)\](.*?)\[\/size\]/, '\2')
|
|
sanitized_message.gsub!(/\[color=(.*?)\](.*?)\[\/color\]/, '\2')
|
|
sanitized_message.gsub!(/\[font=(.*?)\](.*?)\[\/font\]/, '\2')
|
|
|
|
# Remove unsupported tags
|
|
sanitized_message.gsub!(/\[.*?\]/, '')
|
|
|
|
# Convert inline image syntax from `!(url)` to `![url](url)`
|
|
sanitized_message.gsub!(/!\((http[s]?:\/\/[^\)]+)\)/, '![\1](\1)')
|
|
|
|
sanitized_message
|
|
end
|
|
|
|
# Fetch post views from the gforum_PostView table
|
|
def fetch_post_views(post_id)
|
|
result = execute_query("SELECT post_views FROM gforum_PostView WHERE post_id_fk = #{post_id} LIMIT 1").first
|
|
result ? result['post_views'] : 0
|
|
end
|
|
|
|
########## THREADING START --------------------------------------------
|
|
|
|
# Method to dynamically calculate the optimal thread pool size based on system load
|
|
def calculate_dynamic_pool_size
|
|
# Fetch current CPU load average using Sys::ProcTable.loadavg
|
|
# load_avg = Sys::ProcTable.loadavg.last # Get the 15-minute load average
|
|
# load_avg = Sys::ProcTable.loadavg
|
|
load_avg = File.read('/proc/loadavg').split
|
|
|
|
# Calculate the pool size based on the load average
|
|
# Adjust the multiplier and threshold as needed
|
|
# pool_size = [(Concurrent.processor_count / (load_avg + 0.1)).to_i, 1].max
|
|
|
|
# Extract the 1-minute load average from the fetched data
|
|
one_minute_load_avg = load_avg[0].to_f
|
|
|
|
# Determine how many logical CPU cores are available on the system
|
|
cpu_count = Concurrent.processor_count
|
|
|
|
# Log the current load and CPU information for debugging and monitoring purposes
|
|
puts "1-minute Load Average: #{one_minute_load_avg}, CPU Count: #{cpu_count}"
|
|
|
|
# Calculate the initial pool size based on the ratio of the 1-minute load average to the number of CPUs
|
|
# This ratio gives an idea of how many threads should be running to efficiently utilize the CPU resources
|
|
initial_pool_size = (cpu_count / one_minute_load_avg).ceil
|
|
|
|
# Ensure the pool size is at least 1 to avoid creating a pool with zero threads
|
|
initial_pool_size = 1 if initial_pool_size < 1
|
|
|
|
# Cap the maximum pool size to twice the number of CPUs
|
|
# This prevents overloading the system with too many threads, which could lead to diminishing returns
|
|
max_pool_size = cpu_count * 2
|
|
|
|
# Adjust the final pool size to be within the valid range (1 to max_pool_size)
|
|
pool_size = [[initial_pool_size, max_pool_size].min, 1].max
|
|
|
|
puts "Calculated and adjusted dynamic pool size: #{pool_size}" # Log the dynamically adjusted pool size
|
|
pool_size
|
|
end
|
|
|
|
# Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0;
|
|
def threaded_topic_import
|
|
|
|
# Use CachedThreadPool for dynamic thread management
|
|
#### pool = Concurrent::CachedThreadPool.new
|
|
###### pool = Concurrent::FixedThreadPool.new(7)
|
|
pool = Concurrent::FixedThreadPool.new(40)
|
|
|
|
# Define the connection pool inside the method
|
|
###### mariadb_pool = ConnectionPool.new(size: 14, timeout: 100) do
|
|
mariadb_pool = ConnectionPool.new(size: 40, timeout: 100) do
|
|
# host: "slowtwitch.northend.network",
|
|
# username: "admin",
|
|
# password: "yxnh93Ybbz2Nm8#mp28zCVv",
|
|
# database: "slowtwitch"
|
|
Mysql2::Client.new(
|
|
host: "172.99.0.10",
|
|
username: "admin",
|
|
password: "x0YGLA9252iiTFQuqaM0ROX8FmQzZuUu",
|
|
database: "slowtwitch"
|
|
)
|
|
end
|
|
|
|
# The query selects post_ids from gforum_Post where post_root_id is 0, meaning these posts are the topic starters (OPs).
|
|
# Execute the query and fetch the result
|
|
# result = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id ASC")
|
|
result = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id ASC")
|
|
|
|
# Convert the result set to an array of post_ids
|
|
parent_post_ids = result.map { |row| row['post_id'] }
|
|
|
|
# parent_post_count = parent_post_ids.count
|
|
batch_size = 1000 # Set our batch size for number of posts to import in a single batch
|
|
|
|
#### current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed.
|
|
|
|
is_complete = false # Flag to indicate whether the import process is complete.
|
|
|
|
# Mutex to control access to shared resources
|
|
### mutex = Mutex.new # Mutex for MySQL2 operations -- disabled as this may not in fact be necessary - TBD.
|
|
sqlite_mutex = Mutex.new # Mutex for SQLite opreations
|
|
|
|
# Run until all posts have been processed.
|
|
until is_complete
|
|
|
|
# puts "QQ 11 -- GETTING NEXT BATCH ****************************************"
|
|
#### # Query in batches, create pool, wait for termination, do it again
|
|
#### current_post_batch_max = current_post_batch + batch_size
|
|
|
|
# Get the next batch of posts
|
|
current_post_batch = parent_post_ids.shift(batch_size)
|
|
|
|
break if current_post_batch.empty?
|
|
|
|
# Process each post in the current batch
|
|
current_post_batch.each do |post_id|
|
|
# puts "QQ 22 -- #{post_id}"
|
|
|
|
|
|
####### # Static pool size based on number of CPUs
|
|
# # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) # Create a thread pool that is bounded by processors avaialable
|
|
# # pool = Concurrent::FixedThreadPool.new(8) # Create a thread pool of 8 pool members
|
|
|
|
#### # Dynamically calculate the pool size based on system load to optimise performance
|
|
#### pool_size = calculate_dynamic_pool_size # Dynamically calculate what the pool size "ought" to be.
|
|
#### pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size
|
|
|
|
#### # Process each post in the current batch
|
|
#### while current_post_batch < current_post_batch_max
|
|
#### post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post
|
|
|
|
#### # Check if the post has already been processed or is incomplete
|
|
#### post_status = fetch_post_status(post_id)
|
|
|
|
# Submit the import job for the current post_id to the thread pool
|
|
pool.post do
|
|
# Initialise a new MariaDB / Mysql2 client inside of each thread
|
|
#### mysql_client = Mysql2::Client.new(
|
|
#### host: "slowtwitch.northend.network",
|
|
#### username: "admin",
|
|
#### password: "yxnh93Ybbz2Nm8#mp28zCVv",
|
|
#### database: "slowtwitch"
|
|
#### )
|
|
#### puts "PP 22 -- #{post_id}"
|
|
|
|
begin
|
|
mariadb_pool.with do |mysql_client|
|
|
|
|
# Ensure the connection is active, otherwise reconnect
|
|
puts "PP 11 -- #{post_id} -- Checking MySQL connections status.."
|
|
mysql_client.ping
|
|
## || mysql_client = Mysql2::Client.new(
|
|
## host: "slowtwitch.northend.network",
|
|
## username: "admin",
|
|
## password: "yxnh93Ybbz2Nm8#mp28zCVv",
|
|
## database: "slowtwitch"
|
|
## )
|
|
#### puts " FIRST Checking MySQL connection status..."
|
|
#### if mysql_client.query('SELECT 1').nil?
|
|
#### puts " MySQL connection is not valid"
|
|
#### else
|
|
#### puts " MySQL connection is valid"
|
|
#### end
|
|
|
|
# Use connection pooling for PostgreSQL and synchronize access to shared resources
|
|
ActiveRecord::Base.connection_pool.with_connection do
|
|
post_status = fetch_post_status(post_id)
|
|
if post_status.nil? || post_status == 0
|
|
puts "Starting import for post_id #{post_id}"
|
|
topic_import_job(post_id, sqlite_mutex, mysql_client) # Import topic and its replies
|
|
sqlite_mutex.synchronize do
|
|
mark_post_as_complete(post_id) # Mark as complete in SQLite table
|
|
end
|
|
else
|
|
puts "Skipping post_id #{post_id}, already processed."
|
|
end
|
|
end
|
|
end
|
|
rescue => e
|
|
puts "Error processing post ID #{post_id}: #{e.message}"
|
|
puts e.backtrace.join("\n") # Print the full stack trace
|
|
sqlite_mutex.synchronize do
|
|
mark_post_as_failed(post_id)
|
|
end
|
|
if e.message =~ /MySQL client is not connected/ || e.message =~ /This connection is in use by/
|
|
sleep(1)
|
|
puts "Reconnecting to MySQL for post ID #{post_id} due to connection loss..."
|
|
retry
|
|
end
|
|
#### ensure
|
|
#### # Ensure the MariaDB connection is closed after processing
|
|
#### mysql_client.close if mysql_client
|
|
#### puts "** CLOSED MariaDB client"
|
|
#### puts "PP 22 -- #{post_id}"
|
|
#### puts " FINAL Checking MySQL connection status..."
|
|
#### if mysql_client.query('SELECT 1').nil?
|
|
#### puts " MySQL connection is not valid"
|
|
#### else
|
|
#### puts " MySQL connection is valid"
|
|
#### end
|
|
end
|
|
end
|
|
end
|
|
|
|
#### current_post_batch += 1 # Increment, moving to next post in the batch
|
|
#### break if current_post_batch >= parent_post_count
|
|
|
|
# Check if all posts have been processed
|
|
#### is_complete = true if current_post_batch >= parent_post_count
|
|
is_complete = parent_post_ids.empty?
|
|
end
|
|
|
|
# Wait for all jobs in the current batch to finish before proceeding
|
|
puts "PP 33 -- Ready for shutdown"
|
|
pool.shutdown # Initiate thread pool shutdown after all jobs submitted
|
|
puts "PP 44 -- Now wait for termination"
|
|
pool.wait_for_termination # Wait for all threads to finish exec
|
|
|
|
end
|
|
|
|
# # Method to ensure thread-safe updates to highest_processed_post_id
|
|
# def update_highest_processed_post_id_thread_safe(post_id)
|
|
# @highest_processed_mutex ||= Mutex.new
|
|
# @highest_processed_mutex.synchronize do
|
|
# if post_id > fetch_highest_processed_post_id
|
|
# update_highest_processed_post_id(post_id)
|
|
# end
|
|
# end
|
|
# end
|
|
|
|
# Method to import an entire topic, including its first post and all subsequent replies
|
|
def topic_import_job(post_id, sqlite_mutex, mysql_client)
|
|
##### def topic_import_job(post_id, sqlite_mutex)
|
|
puts "TIJ ZZ post_id #{post_id}"
|
|
##### mysql_client = Mysql2::Client.new(
|
|
##### host: "slowtwitch.northend.network",
|
|
##### username: "admin",
|
|
##### password: "yxnh93Ybbz2Nm8#mp28zCVv",
|
|
##### database: "slowtwitch"
|
|
##### )
|
|
puts " FIRST Checking MySQL connection status..."
|
|
if mysql_client.query('SELECT 1').nil?
|
|
puts " MySQL connection is not valid, TRY TO RECONNECT II"
|
|
mysql_client.ping
|
|
# || mysql_client = Mysql2::Client.new(
|
|
# host: "slowtwitch.northend.network",
|
|
# username: "admin",
|
|
# password: "yxnh93Ybbz2Nm8#mp28zCVv",
|
|
# database: "slowtwitch"
|
|
# )
|
|
else
|
|
puts " MySQL connection is valid"
|
|
end
|
|
puts "TIJ AA post_id #{post_id}"
|
|
# Fetch the post data for the given post_id (this is the first post in the topic)
|
|
row = execute_query_concurrent("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post WHERE post_id = #{post_id}", mysql_client).first
|
|
|
|
puts "TIJ BB post_id #{post_id}"
|
|
# Early return if the post data is not found
|
|
return unless row
|
|
|
|
puts "TIJ CC post_id #{post_id}"
|
|
# Extract key values from the fetched row
|
|
post_id = row['post_id'].to_i
|
|
puts "Processing post_id #{row['post_id']} post_root_id #{row['post_root_id']} post_subject/title #{row['post_subject']} forum_id_fk/category_id #{row['forum_id_fk']}"
|
|
|
|
# Fetch the mapped Discourse user and category ID based on Gossamer data
|
|
discourse_user_id = fetch_user_id_mapping(row['user_id_fk'])
|
|
|
|
# Check to be certain user has not been deleted, etc.
|
|
if discourse_user_id.nil? || discourse_user_id == 0
|
|
puts "discourse_user_id is NIL/ZERO for post_id #{row['post_id']}"
|
|
discourse_former_user = User.find_by(username: 'Former_User')
|
|
discourse_user_id = discourse_former_user.id
|
|
puts "discourse_user_id is NOW Former_User id #{discourse_user_id} for post_id #{row['post_id']}"
|
|
end
|
|
|
|
# Fetch the mapped Discourse user and category ID based on Gossamer data
|
|
discourse_category_id = fetch_category_id_mapping(row['forum_id_fk'])
|
|
|
|
puts "discourse_user_id #{discourse_user_id} discourse_category_id #{discourse_category_id}"
|
|
return unless discourse_user_id && discourse_category_id
|
|
|
|
puts "TIJ DD post_id #{post_id}"
|
|
# Ensure the topic title is valid and generate a unique title if needed
|
|
title = ensure_valid_title(row['post_subject'])
|
|
unique_title = title
|
|
|
|
# Fetch the number of views the post has had
|
|
post_views = fetch_post_views(row['post_id'])
|
|
|
|
# Check if the topic has already been imported using the custom field 'original_gossamer_id'
|
|
unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id'])
|
|
puts "TIJ EE post_id #{post_id}"
|
|
ActiveRecord::Base.transaction do
|
|
# Create the new topic in Discourse
|
|
begin
|
|
suffix = 1
|
|
topic_created = false
|
|
|
|
while !topic_created
|
|
begin
|
|
puts "TIJ FF post_id #{post_id}"
|
|
puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}"
|
|
topic = Topic.create!(
|
|
title: unique_title,
|
|
user_id: discourse_user_id,
|
|
created_at: Time.at(row['post_time']),
|
|
updated_at: Time.at(row['post_time']),
|
|
category_id: discourse_category_id,
|
|
views: post_views || 0,
|
|
posts_count: 0
|
|
)
|
|
topic.custom_fields['original_gossamer_id'] = row['post_id']
|
|
topic.save!
|
|
|
|
topic_created = true
|
|
# rescue ActiveRecord::RecordInvalid => e
|
|
rescue => e
|
|
if e.message.include?("Title has already been used")
|
|
unique_title = "#{title} (#{suffix})"
|
|
suffix += 1
|
|
else
|
|
raise e
|
|
end
|
|
# puts e.backtrace.join("\n") # Print the full stack trace
|
|
end
|
|
end
|
|
|
|
# Workaround... take a copy of topic.id
|
|
current_topic_id = topic.id
|
|
|
|
sqlite_mutex.synchronize do
|
|
# Update the database with the last post time and user for the topic
|
|
update_db_topic_last_post_time(current_topic_id, Time.at(row['post_time']).to_i)
|
|
update_db_topic_last_post_user(current_topic_id, discourse_user_id)
|
|
|
|
# Increment the topic count for the user
|
|
update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1)
|
|
end
|
|
|
|
# Sanitize and prepare the post message for Discourse
|
|
sanitized_post_message = sanitize_post_message(row['post_message'])
|
|
|
|
puts "CREATE TOPIC POST for current_topic_id #{current_topic_id} discourse_user_id #{discourse_user_id}"
|
|
|
|
post_number = 1
|
|
# Increment the post count for the topic
|
|
# This is a first post... post_number = fetch_db_topic_post_numbers(current_topic_id).to_i + 1
|
|
sqlite_mutex.synchronize do
|
|
update_db_topic_post_numbers(current_topic_id, post_number)
|
|
end
|
|
|
|
puts "TIJ GG post_id #{post_id}"
|
|
|
|
# Create the initial post in the new topic
|
|
post = Post.create!(
|
|
topic_id: current_topic_id,
|
|
user_id: discourse_user_id,
|
|
raw: sanitized_post_message,
|
|
created_at: Time.at(row['post_time']),
|
|
updated_at: Time.at(row['post_time']),
|
|
reads: post_views || 0,
|
|
post_number: post_number
|
|
)
|
|
post.custom_fields['original_gossamer_id'] = row['post_id']
|
|
post.save!
|
|
|
|
sqlite_mutex.synchronize do
|
|
# Increment the post count for the topic and user
|
|
update_db_topic_post_count(current_topic_id, fetch_db_topic_post_count(current_topic_id).to_i + 1)
|
|
update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1)
|
|
end
|
|
|
|
puts "TIJ HH post_id #{post_id}"
|
|
# Handle any attachments associated with the post
|
|
handle_post_attachments(row['post_id'], post, discourse_user_id, mysql_client)
|
|
|
|
# Create URL mappings for the new topic
|
|
new_url = "https://new/t/#{topic.slug}/#{current_topic_id}"
|
|
sqlite_mutex.synchronize do
|
|
insert_url_mapping(row['post_id'], new_url, unique_title)
|
|
end
|
|
|
|
# Fetch and import all replies to this topic
|
|
replies = execute_query_concurrent("SELECT post_id, user_id_fk, post_message, post_time FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC", mysql_client)
|
|
|
|
# Import each reply sequentially
|
|
replies.each do |reply_row|
|
|
## begin
|
|
# Fetch the discourse user ID for the reply
|
|
reply_user_id = fetch_user_id_mapping(reply_row['user_id_fk'])
|
|
|
|
if reply_user_id.nil? || reply_user_id == 0
|
|
puts "reply_user_id is NIL/ZERO for reply post_id #{reply_row['post_id']}"
|
|
former_user = User.find_by(username: 'Former_User')
|
|
reply_user_id = former_user.id
|
|
puts "reply_user_id is NOW Former_User id #{reply_user_id} for reply post_id #{reply_row['post_id']}"
|
|
end
|
|
|
|
puts "TIJ II post_id #{post_id}"
|
|
# Sanitize and prepare the reply message for Discourse
|
|
sanitized_reply_message = sanitize_post_message(reply_row['post_message'])
|
|
|
|
puts "CREATE REPLY in current_topic_id #{current_topic_id} for reply post_id #{reply_row['post_id']}"
|
|
|
|
### def get_topic_id
|
|
### return topic.id
|
|
### end
|
|
# Increment the post count for the topic
|
|
post_number = fetch_db_topic_post_numbers(current_topic_id).to_i + 1
|
|
sqlite_mutex.synchronize do
|
|
update_db_topic_post_numbers(current_topic_id, post_number)
|
|
### update_db_topic_post_numbers(get_topic_id, post_number)
|
|
end
|
|
|
|
# Fetch the number of views the post has had
|
|
reply_post_views = fetch_post_views(reply_row['post_id'])
|
|
|
|
# crazy sanity check
|
|
if topic.nil?
|
|
puts "ERROR: Topic is nil for reply post_id #{reply_row['post_id']}, attempting to BYPASS anyway"
|
|
end
|
|
puts "TIJ JJ post_id #{post_id} reply post_id #{reply_row['post_id']} reply_post_views #{reply_post_views || 0} post_number #{post_number} current_topic_id #{current_topic_id} reply_post_views #{reply_post_views || 0}"
|
|
|
|
# Create the reply post in the existing topic
|
|
post = Post.create!(
|
|
topic_id: current_topic_id,
|
|
user_id: reply_user_id,
|
|
raw: sanitized_reply_message,
|
|
created_at: Time.at(reply_row['post_time']),
|
|
updated_at: Time.at(reply_row['post_time']),
|
|
reads: reply_post_views || 0,
|
|
post_number: post_number
|
|
)
|
|
post.custom_fields['original_gossamer_id'] = reply_row['post_id']
|
|
post.save!
|
|
|
|
puts "TIJ KK post_id #{post_id}"
|
|
# Increment the post count for the topic and user
|
|
sqlite_mutex.synchronize do
|
|
update_db_topic_post_count(current_topic_id, fetch_db_topic_post_count(current_topic_id).to_i + 1)
|
|
update_db_user_post_count(reply_user_id, fetch_db_user_post_count(reply_user_id).to_i + 1)
|
|
end
|
|
|
|
# Update last post time and user for the topic
|
|
if fetch_db_topic_last_post_time(current_topic_id).nil? || Time.at(reply_row['post_time']).to_i > fetch_db_topic_last_post_time(current_topic_id).to_i
|
|
sqlite_mutex.synchronize do
|
|
update_db_topic_last_post_time(current_topic_id, Time.at(reply_row['post_time']).to_i)
|
|
update_db_topic_last_post_user(current_topic_id, reply_user_id)
|
|
end
|
|
end
|
|
|
|
# Handle any attachments associated with the reply
|
|
handle_post_attachments(reply_row['post_id'], post, reply_user_id, mysql_client)
|
|
|
|
# # Update the highest processed post_id in the database (thread-safe)
|
|
# update_highest_processed_post_id_thread_safe(reply_row['post_id'])
|
|
|
|
# rescue ActiveRecord::RecordInvalid => e
|
|
## rescue => e
|
|
## puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}"
|
|
## puts e.backtrace.join("\n") # Print the full stack trace
|
|
## end
|
|
end
|
|
|
|
# # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe)
|
|
# update_highest_processed_post_id_thread_safe(post_id)
|
|
|
|
rescue ActiveRecord::RecordInvalid => e
|
|
puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
|
|
raise ActiveRecord::Rollback
|
|
end
|
|
end
|
|
else
|
|
puts "Topic for post_id #{row['post_id']} already exists, skipping creation."
|
|
end
|
|
|
|
puts " LAST Removing MySQL connection"
|
|
##### mysql_client.close # if mysql_client
|
|
end
|
|
|
|
|
|
########## THREADING END --------------------------------------------
|
|
|
|
|
|
# Import topics and posts from Gossamer Forums to Discourse
|
|
def import_topics_and_posts_with_attachments
|
|
puts "Importing topics and posts with attachments..."
|
|
|
|
# topic_last_post_time = {}
|
|
# topic_post_count = Hash.new(0)
|
|
# user_topic_count = Hash.new(0)
|
|
# user_post_count = Hash.new(0)
|
|
# topic_last_post_user = {}
|
|
# topic_post_numbers = Hash.new { |hash, key| hash[key] = 0 }
|
|
|
|
# Fetch the highest old_post_id from the url_map table
|
|
highest_old_post_id = fetch_highest_old_post_id.to_i
|
|
puts "Highest (OP) old_post_id in url_map: #{highest_old_post_id}"
|
|
highest_processed_post_id = fetch_highest_processed_post_id.to_i
|
|
puts "Highest processed post_id: #{highest_processed_post_id}"
|
|
|
|
# OVERRIDE........
|
|
# Attachment example: highest_processed_post_id = 1359862
|
|
|
|
# Execute the query to get all posts ordered by post_id
|
|
execute_query("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_likes, post_replies FROM gforum_Post ORDER BY post_id").each do |row|
|
|
post_id = row['post_id'].to_i
|
|
|
|
# Skip posts that have already been processed
|
|
# FIXED PROPERLY FOR FUTURE RUNS
|
|
# next if post_id < highest_old_post_id
|
|
next if post_id <= highest_processed_post_id
|
|
|
|
puts "Processing post_id #{row['post_id']} post_id #{row['post_root_id']} post_subject/title #{row['post_subject']} forum_id_fk/category_id #{row['forum_id_fk']}"
|
|
# discourse_user_id = @user_id_map[row['user_id_fk']]
|
|
|
|
# Fetch the Discourse user and category IP mappings
|
|
discourse_user_id = fetch_user_id_mapping(row['user_id_fk'])
|
|
discourse_category_id = fetch_category_id_mapping(row['forum_id_fk'])
|
|
puts "discourse_user_id #{discourse_user_id} discourse_category_id #{discourse_category_id}"
|
|
next unless discourse_user_id && discourse_category_id
|
|
|
|
# Check if the post is a topic (post_root_id == 0)
|
|
if row['post_root_id'] == 0
|
|
puts "#1"
|
|
# Ensure the title is valid
|
|
title = ensure_valid_title(row['post_subject'])
|
|
unique_title = title
|
|
|
|
# Confirm the number of views the post has had
|
|
post_views = fetch_post_views(row['post_id'])
|
|
|
|
# Skip if the topic already exists
|
|
unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id'])
|
|
# Create the topic
|
|
begin
|
|
suffix = 1
|
|
topic_created = false
|
|
|
|
while !topic_created
|
|
begin
|
|
puts "#2"
|
|
# unique_title = generate_unique_title(title, discourse_user_id, Time.at(row['post_time']))
|
|
puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}"
|
|
topic = Topic.create!(
|
|
title: unique_title,
|
|
user_id: discourse_user_id,
|
|
created_at: Time.at(row['post_time']),
|
|
updated_at: Time.at(row['post_time']),
|
|
category_id: discourse_category_id,
|
|
views: post_views || 0,
|
|
posts_count: 0
|
|
)
|
|
topic.custom_fields['original_gossamer_id'] = row['post_id']
|
|
topic.save!
|
|
|
|
topic_created = true
|
|
|
|
rescue ActiveRecord::RecordInvalid => e
|
|
if e.message.include?("Title has already been used")
|
|
unique_title = "#{title} (#{suffix})"
|
|
suffix += 1
|
|
else
|
|
raise e
|
|
end
|
|
end
|
|
end
|
|
# Track last post time and user for the topic
|
|
# topic_last_post_time[topic.id] = Time.at(row['post_time'])
|
|
# topic_last_post_user[topic.id] = discourse_user_id
|
|
update_db_topic_last_post_time(topic.id, Time.at(row['post_time']).to_i)
|
|
update_db_topic_last_post_user(topic.id, discourse_user_id)
|
|
|
|
# Increment the count of the number of topics created by each user
|
|
# user_topic_count[discourse_user_id] += 1
|
|
update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1)
|
|
|
|
# # Ensure the raw post stirng contents itself is acceptable to Discourse
|
|
# sanitized_post_message = row['post_message']&.tr("\0", '') || ""
|
|
#
|
|
# # Convert Gossamer tags to Discourse markdown
|
|
# sanitized_post_message = convert_gossamer_tags_to_markdown(sanitized_post_message)
|
|
#
|
|
# # Remove the [signature] label from appearing at the end of the messages after import
|
|
# sanitized_post_message.sub(/\n?\[signature\]\n?\z/, '')
|
|
# sanitized_post_message.gsub(/\n?\[signature\]\n?/, '')
|
|
|
|
# Sanitize the post message
|
|
sanitized_post_message = sanitize_post_message(row['post_message'])
|
|
|
|
puts "CREATE POST topic.id #{topic.id} discourse_user_id #{discourse_user_id}"
|
|
|
|
# Increment the number of posts in the given topic.
|
|
# topic_post_numbers[topic.id] += 1
|
|
post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1
|
|
update_db_topic_post_numbers(topic.id, post_number)
|
|
|
|
# Create the initial post in the topic
|
|
post = Post.create!(
|
|
topic_id: topic.id,
|
|
user_id: discourse_user_id,
|
|
# raw: import_attachments(row['post_message'], row['post_id']),
|
|
# raw: row['post_message'] || "",
|
|
raw: sanitized_post_message,
|
|
created_at: Time.at(row['post_time']),
|
|
updated_at: Time.at(row['post_time']),
|
|
like_count: row['post_likes'] || 0,
|
|
reads: post_views || 0,
|
|
post_number: post_number
|
|
)
|
|
# post_number: topic_post_numbers[topic.id]
|
|
post.custom_fields['original_gossamer_id'] = row['post_id']
|
|
post.save!
|
|
|
|
# Track the number of posts in the topic and by the user
|
|
# topic_post_count[topic.id] += 1
|
|
# user_post_count[discourse_user_id] += 1
|
|
update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1)
|
|
update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1)
|
|
|
|
# Handle attachments for the post
|
|
handle_post_attachments(row['post_id'], post, discourse_user_id)
|
|
|
|
# Create URL mappings
|
|
# old_url = "https://old/forum/#{row['forum_name']}/topics/#{row['post_id']}"
|
|
new_url = "https://new/t/#{topic.slug}/#{topic.id}"
|
|
insert_url_mapping(row['post_id'], new_url, unique_title)
|
|
|
|
# Update the highest processed post_id
|
|
puts "Updated highest processed post_id #{post_id}"
|
|
update_highest_processed_post_id(post_id)
|
|
|
|
rescue ActiveRecord::RecordInvalid => e
|
|
puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
|
|
end
|
|
end
|
|
else
|
|
puts "#3"
|
|
|
|
# Confirm the number of views the post has had
|
|
post_views = fetch_post_views(row['post_id'])
|
|
|
|
# Find the root topic for the post
|
|
root_topic_field = TopicCustomField.find_by(name: 'original_gossamer_id', value: row['post_root_id'])
|
|
|
|
if root_topic_field
|
|
topic_id = root_topic_field.topic_id
|
|
|
|
# Find the parent post for the reply
|
|
parent_post_field = PostCustomField.find_by(name: 'original_gossamer_id', value: row['post_father_id'])
|
|
reply_to_post_number = parent_post_field ? Post.find(parent_post_field.post_id).post_number : nil
|
|
|
|
# Create the post in the existing topic
|
|
begin
|
|
puts "#4"
|
|
|
|
# Sanitize the post message
|
|
sanitized_post_message = sanitize_post_message(row['post_message'])
|
|
|
|
# topic_post_numbers[topic_id] += 1
|
|
post_number = fetch_db_topic_post_numbers(topic_id).to_i + 1
|
|
update_db_topic_post_numbers(topic_id, post_number)
|
|
|
|
# Create the post in the existing topic
|
|
post = Post.create!(
|
|
topic_id: topic_id,
|
|
user_id: discourse_user_id,
|
|
# raw: import_attachments(row['post_message'], row['post_id']),
|
|
# raw: row['post_message'] || "",
|
|
raw: sanitized_post_message,
|
|
created_at: Time.at(row['post_time']),
|
|
updated_at: Time.at(row['post_time']),
|
|
reply_to_post_number: reply_to_post_number,
|
|
like_count: row['post_replies'] || 0,
|
|
reads: post_views || fetch_db_topic_post_count(topic_id).to_i,
|
|
post_number: post_number
|
|
)
|
|
post.custom_fields['original_gossamer_id'] = row['post_id']
|
|
post.save!
|
|
|
|
# Track the number of posts in the topic and by the user
|
|
# topic_post_count[topic_id] += 1
|
|
# user_post_count[discourse_user_id] += 1
|
|
update_db_topic_post_count(topic_id, fetch_db_topic_post_count(topic_id).to_i + 1)
|
|
update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1)
|
|
|
|
# Update last post time and user for the topic
|
|
# if topic_last_post_time[topic_id].nil? || Time.at(row['post_time']) > topic_last_post_time[topic_id]
|
|
# topic_last_post_time[topic_id] = Time.at(row['post_time'])
|
|
# topic_last_post_user[topic_id] = discourse_user_id
|
|
# end
|
|
if fetch_db_topic_last_post_time(topic_id).nil? || Time.at(row['post_time']).to_i > fetch_db_topic_last_post_time(topic_id).to_i
|
|
update_db_topic_last_post_time(topic_id, Time.at(row['post_time']).to_i)
|
|
update_db_topic_last_post_user(topic_id, discourse_user_id)
|
|
end
|
|
|
|
# Handle attachments for the post
|
|
handle_post_attachments(row['post_id'], post, discourse_user_id)
|
|
|
|
# Update the highest processed post_id
|
|
puts "Updated highest processed post_id #{post_id}"
|
|
update_highest_processed_post_id(post_id)
|
|
|
|
rescue ActiveRecord::RecordInvalid => e
|
|
puts "Error importing post with post_id #{row['post_id']}: #{e.message}"
|
|
end
|
|
else
|
|
puts "Warning: Root topic not found for post_id #{row['post_id']} with post_root_id #{row['post_root_id']}"
|
|
end
|
|
end
|
|
end
|
|
puts "Importing topics and posts with attachments... Done."
|
|
end
|
|
|
|
def update_topic_stats
|
|
# Update topics with the correct last post time, post count, and last post user
|
|
puts "Update topics with the correct last post time, post count, and last post user"
|
|
# topic_last_post_time.each do |topic_id, last_post_time|
|
|
# Topic.find(topic_id).update!(
|
|
# updated_at: last_post_time,
|
|
# posts_count: topic_post_count[topic_id],
|
|
# last_posted_at: last_post_time,
|
|
# bumped_at: last_post_time,
|
|
# last_post_user_id: topic_last_post_user[topic_id]
|
|
# )
|
|
# end
|
|
@db.execute("SELECT * FROM topic_last_post_time").each do |row|
|
|
topic_id, last_post_time = row
|
|
# Validation error: Topic.find(topic_id).update!(
|
|
topic = Topic.find(topic_id)
|
|
|
|
# Ensure we are only updating necessary fields
|
|
topic.update_columns(
|
|
updated_at: Time.at(last_post_time),
|
|
posts_count: fetch_db_topic_post_count(topic_id).to_i,
|
|
last_posted_at: Time.at(last_post_time),
|
|
bumped_at: Time.at(last_post_time),
|
|
last_post_user_id: fetch_db_topic_last_post_user(topic_id).to_i
|
|
)
|
|
end
|
|
end
|
|
|
|
def update_user_stats
|
|
# Update user profiles with the number of topics and posts created
|
|
puts "Update user profiles with the number of topics and posts created"
|
|
# user_topic_count.each do |user_id, count|
|
|
# user = User.find(user_id)
|
|
# user.update!(topic_count: count)
|
|
# end
|
|
# user_post_count.each do |user_id, count|
|
|
# user = User.find(user_id)
|
|
# user.update!(post_count: count)
|
|
# end
|
|
@db.execute("SELECT * FROM user_topic_count").each do |row|
|
|
user_id, count = row
|
|
# user = User.find(user_id)
|
|
# user.update!(topic_count: count)
|
|
puts "update_user_stats user_id #{user_id} topic_count #{count}"
|
|
user_stat = UserStat.find_or_initialize_by(user_id: user_id)
|
|
user_stat.update_columns(topic_count: count)
|
|
end
|
|
|
|
@db.execute("SELECT * FROM user_post_count").each do |row|
|
|
user_id, count = row
|
|
# user = User.find(user_id)
|
|
# user.update!(post_count: count)
|
|
puts "update_user_stats user_id #{user_id} post_count #{count}"
|
|
user_stat = UserStat.find_or_initialize_by(user_id: user_id)
|
|
user_stat.update_columns(post_count: count)
|
|
|
|
# Determine the new Trust Level based on post_count
|
|
user = User.find(user_id)
|
|
new_trust_level = case count
|
|
when 0..2 then 1 # basic user
|
|
when 3..50 then 2 # member
|
|
else 3 # regular or above when 51..100
|
|
end
|
|
|
|
# Fetch the current user and check if Trust Level needs updating
|
|
user = User.find(user_id)
|
|
current_trust_level = user.trust_level || 1 # default to 1 if not set
|
|
|
|
# Only update trust level if the new level is higher than the current one
|
|
if new_trust_level > current_trust_level
|
|
user.update!(trust_level: new_trust_level)
|
|
puts "update_user_stats user_id #{user_id} trust_level updated to #{new_trust_level}"
|
|
else
|
|
puts "update_user_stats user_id #{user_id} trust_level remains at #{current_trust_level}"
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
# Import personal messages from gforum_Message table (both inbox and sent messages)
|
|
def import_personal_messages
|
|
puts "Importing personal (inbox and sentmail) messages..."
|
|
|
|
# Fetch the highest_processed_personal_id
|
|
highest_processed_personal_id = fetch_highest_processed_personal_id.to_i
|
|
puts "Highest processed personal_id: #{highest_processed_personal_id}"
|
|
|
|
# OVERRIDE - to speed getting to problem msg
|
|
# # # highest_processed_personal_id = 1543840
|
|
puts "Highest processed personal_id override: #{highest_processed_personal_id}"
|
|
|
|
execute_query("SELECT * FROM gforum_Message").each do |row|
|
|
begin
|
|
msg_id = row['msg_id'].to_i
|
|
puts "msg_id #{msg_id}"
|
|
|
|
# Skip posts that have already been processed
|
|
next if msg_id <= highest_processed_personal_id
|
|
|
|
from_user_id = fetch_user_id_mapping(row['from_user_id_fk'])
|
|
to_user_id = fetch_user_id_mapping(row['to_user_id_fk'])
|
|
|
|
next unless from_user_id && to_user_id
|
|
|
|
# Skip if the message already exists
|
|
unless TopicCustomField.exists?(name: 'original_gossamer_msg_id', value: row['msg_id'])
|
|
|
|
# Sanitize the message, ensuring we have an empty string or the content without any \0
|
|
sanitized_message = sanitize_post_message(row['msg_body'])
|
|
|
|
# Set default message body if the sanitized message is blank
|
|
sanitized_message = "<No message body to migrate.>" if sanitized_message.strip.empty? || sanitized_message.split.size < 3
|
|
|
|
# If we do not change the "min personal message post length" to 1, we need this. ... We may need this anyway.
|
|
sanitized_message = sanitized_message.ljust(10, '.') if sanitized_message.length < 10
|
|
|
|
# Check and set a default title if the original title is nil or empty
|
|
sanitized_title = row['msg_subject']&.strip
|
|
sanitized_title = "<no subject>" if sanitized_title.nil? || sanitized_title.empty?
|
|
|
|
# Check for an existing private message topic between the same two users with a similar title
|
|
# topic = Topic.joins(:topic_allowed_users, :custom_fields)
|
|
# .where(archetype: Archetype.private_message)
|
|
# .where("topic_allowed_users.user_id = ? AND topic_allowed_users.user_id = ?", from_user_id, to_user_id)
|
|
# .where("title = ? OR title = ?", sanitized_title, "Re: #{sanitized_title}")
|
|
# .where(topic_custom_fields: { name: 'original_gossamer_msg_id' })
|
|
# .first
|
|
topic = Topic.joins(:topic_allowed_users)
|
|
.where(archetype: Archetype.private_message)
|
|
.where("topics.title = ? OR topics.title = ?", sanitized_title, sanitized_title.gsub(/^Re: /, ""))
|
|
.where("topic_allowed_users.user_id IN (?)", [from_user_id, to_user_id])
|
|
.group("topics.id")
|
|
.having("COUNT(topic_allowed_users.user_id) = 2")
|
|
.first
|
|
# .where("topics.title = ? OR topics.title = ?", sanitized_title, sanitized_title.gsub(/^Re: /, ""))
|
|
# .where("topics.topic_allowed_users.user_id = ? AND topics.topic_allowed_users.user_id = ?", from_user_id, to_user_id)
|
|
# .where("((topics.user_id = ? AND topics.topic_allowed_users.user_id = ?) OR (topics.user_id = ? AND topics.topic_allowed_users.user_id = ?))", from_user_id, to_user_id, to_user_id, from_user_id)
|
|
|
|
if topic.nil?
|
|
puts "IMPORTING new message topic sanitized: #{sanitized_title} user_id #{from_user_id} to_user_id #{to_user_id}"
|
|
|
|
# Create a private message topic in Discourse
|
|
topic = Topic.create!(
|
|
title: sanitized_title,
|
|
user_id: from_user_id,
|
|
archetype: Archetype.private_message,
|
|
created_at: Time.at(row['msg_time']),
|
|
updated_at: Time.at(row['msg_time']),
|
|
last_posted_at: Time.at(row['msg_time']),
|
|
last_post_user_id: from_user_id
|
|
)
|
|
topic.custom_fields['original_gossamer_msg_id'] = row['msg_id']
|
|
topic.save!
|
|
|
|
# Add recipient user to the private message topic
|
|
topic.topic_allowed_users.create!(user_id: to_user_id)
|
|
|
|
# Add sender user to the private message topic
|
|
topic.topic_allowed_users.create!(user_id: from_user_id)
|
|
else
|
|
puts "APPENDING to existing message topic sanitized: #{sanitized_title} user_id #{from_user_id} to_user_id #{to_user_id}"
|
|
|
|
# Increment the number of replies for the topic
|
|
topic.increment!(:posts_count)
|
|
end
|
|
|
|
# Create the message as a post in the private topic
|
|
post = Post.create!(
|
|
topic_id: topic.id,
|
|
user_id: from_user_id,
|
|
raw: sanitized_message,
|
|
created_at: Time.at(row['msg_time']),
|
|
updated_at: Time.at(row['msg_time'])
|
|
)
|
|
post.custom_fields['original_gossamer_msg_id'] = row['msg_id']
|
|
post.save!
|
|
|
|
# Update the topic's last reply information
|
|
topic.update!(
|
|
last_posted_at: post.updated_at,
|
|
last_post_user_id: post.user_id
|
|
)
|
|
|
|
update_highest_processed_personal_id(msg_id)
|
|
|
|
# N/A. These were never a thing in Slowtwitch...
|
|
# handle_post_attachments(row['msg_id'], post, from_user_id)
|
|
end
|
|
rescue StandardError => e
|
|
puts "Error importing message #{row['msg_id']}: #{e.message}"
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
# Main method to perform the import
|
|
def perform_import
|
|
# Secret trick to disable RateLimiting protection in Discourse
|
|
RateLimiter.disable
|
|
|
|
# ActiveRecord::Base.connection.disconnect! rescue nil
|
|
# ActiveRecord::Base.clear_active_connections! rescue nil
|
|
# ActiveRecord::Base.establish_connection(
|
|
# adapter: 'postgresql',
|
|
# host: '10.0.0.2', # Use the external DB host
|
|
# port: 5432, # Default PostgreSQL port
|
|
# database: 'discourse',
|
|
# username: 'discourse',
|
|
# password: 'nhB5FWhQkjdvaD2ViRNO63dQagDnzaTn',
|
|
# connect_timeout: 10
|
|
# )
|
|
#
|
|
ENV['PGHOST'] = '10.0.0.2'
|
|
ENV['PGPORT'] = '5432'
|
|
ENV['PGDATABASE'] = 'discourse'
|
|
ENV['PGUSER'] = 'discourse'
|
|
ENV['PGPASSWORD'] = 'nhB5FWhQkjdvaD2ViRNO63dQagDnzaTn'
|
|
# ActiveRecord::Base.establish
|
|
|
|
|
|
# Set our unique timestamp for this migration run
|
|
timestamp = Time.now.strftime("-%Y%m%d%H%M%S")
|
|
|
|
puts "Starting Gossamer Forums import... #{timestamp}"
|
|
|
|
# add_former_user
|
|
# import_users
|
|
|
|
# generate_user_id_mapping
|
|
# export_username_mapping_to_csv("/bitnami/discourse/sqlite/gossamer-migration-username-mapping#{timestamp}")
|
|
|
|
## set_user_bio_images
|
|
|
|
# import_categories
|
|
|
|
####### import_topics_and_posts_with_attachments
|
|
threaded_topic_import
|
|
|
|
update_topic_stats
|
|
update_user_stats
|
|
export_url_mapping_to_csv("/bitnami/discourse/sqlite/gossamer-migration-url-mapping#{timestamp}")
|
|
create_nginx_rewrite_rules("/bitnami/discourse/sqlite/gossamer-redirects#{timestamp}.conf")
|
|
|
|
import_personal_messages
|
|
|
|
puts "Gossamer Forums import complete! #{timestamp}"
|
|
end
|
|
end
|
|
|
|
GossamerForumsImporter.new.perform_import
|
|
|