v0.39 Add SQLite table for tracking successful post importation; Split out user import into three separate callable methods; require sys/proctable
This commit is contained in:
parent
7b0a45cb89
commit
5002b90712
@ -1,7 +1,7 @@
|
|||||||
# Federated Computer, Inc.
|
# Federated Computer, Inc.
|
||||||
# David Sainty <saint@federated.computer> 2024 A.D.
|
# David Sainty <saint@federated.computer> 2024 A.D.
|
||||||
# Gossamer Threads to Discourse -- Migration-Import Script
|
# Gossamer Threads to Discourse -- Migration-Import Script
|
||||||
# v0.38 Attempt to reimplement with mutex support for interrupting and resuming during post import. This will have to be tested / validated
|
# v0.39 Add SQLite table for tracking successful post importation; Split out user import into three separate callable methods; require sys/proctable
|
||||||
|
|
||||||
require 'mysql2'
|
require 'mysql2'
|
||||||
require 'open-uri'
|
require 'open-uri'
|
||||||
@ -16,6 +16,8 @@ require 'time'
|
|||||||
|
|
||||||
require 'concurrent'
|
require 'concurrent'
|
||||||
|
|
||||||
|
require 'sys/proctable'
|
||||||
|
|
||||||
require File.expand_path("../../../../config/environment", __FILE__)
|
require File.expand_path("../../../../config/environment", __FILE__)
|
||||||
# require_relative '../base'
|
# require_relative '../base'
|
||||||
require File.expand_path("../../../../script/import_scripts/base", __FILE__)
|
require File.expand_path("../../../../script/import_scripts/base", __FILE__)
|
||||||
@ -88,7 +90,6 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
topic_id INTEGER PRIMARY KEY,
|
topic_id INTEGER PRIMARY KEY,
|
||||||
post_count INTEGER DEFAULT 0
|
post_count INTEGER DEFAULT 0
|
||||||
);
|
);
|
||||||
|
|
||||||
SQL
|
SQL
|
||||||
@db.execute <<-SQL
|
@db.execute <<-SQL
|
||||||
CREATE TABLE IF NOT EXISTS user_topic_count (
|
CREATE TABLE IF NOT EXISTS user_topic_count (
|
||||||
@ -126,6 +127,12 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
personal_id INTEGER
|
personal_id INTEGER
|
||||||
);
|
);
|
||||||
SQL
|
SQL
|
||||||
|
@db.execute <<-SQL
|
||||||
|
CREATE TABLE IF NOT EXISTS topic_import_status (
|
||||||
|
post_id INTEGER PRIMARY KEY,
|
||||||
|
status INTEGER DEFAULT 0
|
||||||
|
);
|
||||||
|
SQL
|
||||||
end
|
end
|
||||||
|
|
||||||
def insert_user_id_mapping(old_user_id, new_user_id)
|
def insert_user_id_mapping(old_user_id, new_user_id)
|
||||||
@ -271,6 +278,22 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
@db.execute "INSERT OR REPLACE INTO highest_processed_personal_id (id, personal_id) VALUES (1, ?)", personal_id
|
@db.execute "INSERT OR REPLACE INTO highest_processed_personal_id (id, personal_id) VALUES (1, ?)", personal_id
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Check if post_id exists and its status
|
||||||
|
def post_status(post_id)
|
||||||
|
result = @db.execute("SELECT status FROM topic_import_status WHERE post_id = ?", post_id).flatten.first
|
||||||
|
result.nil? ? nil : result.to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
# Mark post_id as complete
|
||||||
|
def mark_post_as_complete(post_id)
|
||||||
|
@db.execute("INSERT OR REPLACE INTO topic_import_status (post_id, status) VALUES (?, 1)", post_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Mark post_id as failed
|
||||||
|
def mark_post_as_failed(post_id)
|
||||||
|
@db.execute("INSERT OR REPLACE INTO topic_import_status (post_id, status) VALUES (?, 0)", post_id)
|
||||||
|
end
|
||||||
|
|
||||||
# Execute an SQL query on the Gossamer Forums database
|
# Execute an SQL query on the Gossamer Forums database
|
||||||
def execute_query(query)
|
def execute_query(query)
|
||||||
@mysql_client.query(query, as: :hash)
|
@mysql_client.query(query, as: :hash)
|
||||||
@ -547,7 +570,7 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
|
|
||||||
# Import users from Gossamer Forums to Discourse
|
# Import users from Gossamer Forums to Discourse
|
||||||
def import_users
|
def import_users
|
||||||
puts "Importing users..."
|
puts "Importing Users..."
|
||||||
users = []
|
users = []
|
||||||
|
|
||||||
# Fetch all users from Gossamer Forums
|
# Fetch all users from Gossamer Forums
|
||||||
@ -576,22 +599,44 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
# insert_user_id_mapping(user[:id], user.id)
|
# insert_user_id_mapping(user[:id], user.id)
|
||||||
user
|
user
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# For each user, add user ID mapping to SQLite now that we know what the Discourse user ID is, ... and append user bio and import user files
|
# Generate SQLite user ID mapping between Discourse and Gossamer
|
||||||
|
def generate_user_id_mapping
|
||||||
|
puts "Generating User ID Mapping..."
|
||||||
|
|
||||||
|
# For each user, add user ID mapping to SQLite now that we know what the Discourse user ID is
|
||||||
users.each do |user|
|
users.each do |user|
|
||||||
# discourse_username = sanitize_username(user[:username], user[:email], user[:name])
|
# discourse_username = sanitize_username(user[:username], user[:email], user[:name])
|
||||||
discourse_username = user[:username]
|
discourse_username = user[:username]
|
||||||
discourse_user = User.find_by(username: discourse_username)
|
discourse_user = User.find_by(username: discourse_username)
|
||||||
|
|
||||||
if discourse_user.nil?
|
if discourse_user.nil?
|
||||||
puts "User #{user[:username]} --> #{discourse_username} not found in Discourse. Skipping file import."
|
puts "User #{user[:username]} --> #{discourse_username} not found in Discourse. Skipping user mapping addition."
|
||||||
next
|
next
|
||||||
end
|
end
|
||||||
|
|
||||||
# # Store the user ID mapping
|
# Store the user ID mapping
|
||||||
# @user_id_map[user[:id]] = discourse_user.id
|
# @user_id_map[user[:id]] = discourse_user.id
|
||||||
puts "for insert_user_id_mapping: user[:id] #{user[:id]} discourse_user.id #{discourse_user.id}"
|
puts "for insert_user_id_mapping: user[:id] #{user[:id]} discourse_user.id #{discourse_user.id}"
|
||||||
insert_user_id_mapping(user[:id], discourse_user.id)
|
insert_user_id_mapping(user[:id], discourse_user.id)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Import and set user Bio and Images
|
||||||
|
def set_user_bio_images
|
||||||
|
puts "Setting User Bio and Images..."
|
||||||
|
|
||||||
|
# For each user, append user bio and import user files
|
||||||
|
users.each do |user|
|
||||||
|
# discourse_username = sanitize_username(user[:username], user[:email], user[:name])
|
||||||
|
discourse_username = user[:username]
|
||||||
|
discourse_user = User.find_by(username: discourse_username)
|
||||||
|
|
||||||
|
if discourse_user.nil?
|
||||||
|
puts "User #{user[:username]} --> #{discourse_username} not found in Discourse. Skipping bio-image setting."
|
||||||
|
next
|
||||||
|
end
|
||||||
|
|
||||||
# Ensure user profile exists and bio_raw is a string
|
# Ensure user profile exists and bio_raw is a string
|
||||||
discourse_user.user_profile ||= UserProfile.new(user_id: discourse_user.id)
|
discourse_user.user_profile ||= UserProfile.new(user_id: discourse_user.id)
|
||||||
@ -967,7 +1012,7 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
# Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0;
|
# Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0;
|
||||||
# The query selects post_ids from gforum_Post where post_root_id is 0, meaning these posts are the topic starters (OPs).
|
# The query selects post_ids from gforum_Post where post_root_id is 0, meaning these posts are the topic starters (OPs).
|
||||||
# It also ensures that we only process posts with a post_id greater than the last processed one, allowing for resumption.
|
# It also ensures that we only process posts with a post_id greater than the last processed one, allowing for resumption.
|
||||||
parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0 AND post_id > #{fetch_highest_processed_post_id} ORDER BY post_id ASC")
|
parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id ASC")
|
||||||
|
|
||||||
parent_post_count = parent_post_ids.count
|
parent_post_count = parent_post_ids.count
|
||||||
batch_size = 100 # Set our batch size for number of posts to import in a single batch
|
batch_size = 100 # Set our batch size for number of posts to import in a single batch
|
||||||
@ -984,18 +1029,30 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
|
|
||||||
# Dynamically calculate the pool size based on system load to optimise performance
|
# Dynamically calculate the pool size based on system load to optimise performance
|
||||||
pool_size = calculate_dynamic_pool_size
|
pool_size = calculate_dynamic_pool_size
|
||||||
pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size.
|
pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size
|
||||||
|
|
||||||
# Process each post in the current batch
|
# Process each post in the current batch
|
||||||
while current_post_batch < current_post_batch_max
|
while current_post_batch < current_post_batch_max
|
||||||
post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post
|
post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post
|
||||||
|
|
||||||
|
# Check if the post has already been processed or is incomplete
|
||||||
|
post_status = post_status(post_id)
|
||||||
|
if post_status.nil? || post_status == 0
|
||||||
puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{thread_count} threads"
|
puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{thread_count} threads"
|
||||||
|
|
||||||
# Submit the import job for the current post_id to the thread pool
|
# Submit the import job for the current post_id to the thread pool
|
||||||
pool.post do
|
pool.post do
|
||||||
|
begin
|
||||||
puts "Processing post ID: #{post_id}"
|
puts "Processing post ID: #{post_id}"
|
||||||
topic_import_job(post_id) # Import topic and its replies
|
topic_import_job(post_id) # Import topic and its replies
|
||||||
|
mark_post_as_complete(post_id) # Mark as complete in SQLite table
|
||||||
|
rescue => e
|
||||||
|
puts "Error processing post ID #{post_id}: #{e.message}"
|
||||||
|
mark_post_as_failed(post_id)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
else
|
||||||
|
puts "Skipping post_id #{post_id}, already processed."
|
||||||
end
|
end
|
||||||
|
|
||||||
current_post_batch += 1 # Increment, moving to next post in the batch
|
current_post_batch += 1 # Increment, moving to next post in the batch
|
||||||
@ -1010,15 +1067,15 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Method to ensure thread-safe updates to highest_processed_post_id
|
# # Method to ensure thread-safe updates to highest_processed_post_id
|
||||||
def update_highest_processed_post_id_thread_safe(post_id)
|
# def update_highest_processed_post_id_thread_safe(post_id)
|
||||||
@highest_processed_mutex ||= Mutex.new
|
# @highest_processed_mutex ||= Mutex.new
|
||||||
@highest_processed_mutex.synchronize do
|
# @highest_processed_mutex.synchronize do
|
||||||
if post_id > fetch_highest_processed_post_id
|
# if post_id > fetch_highest_processed_post_id
|
||||||
update_highest_processed_post_id(post_id)
|
# update_highest_processed_post_id(post_id)
|
||||||
end
|
# end
|
||||||
end
|
# end
|
||||||
end
|
# end
|
||||||
|
|
||||||
# Method to import an entire topic, including its first post and all subsequent replies
|
# Method to import an entire topic, including its first post and all subsequent replies
|
||||||
def topic_import_job(post_id)
|
def topic_import_job(post_id)
|
||||||
@ -1169,16 +1226,16 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
# Handle any attachments associated with the reply
|
# Handle any attachments associated with the reply
|
||||||
handle_post_attachments(reply_row['post_id'], post, reply_user_id)
|
handle_post_attachments(reply_row['post_id'], post, reply_user_id)
|
||||||
|
|
||||||
# Update the highest processed post_id in the database (thread-safe)
|
# # Update the highest processed post_id in the database (thread-safe)
|
||||||
update_highest_processed_post_id_thread_safe(reply_row['post_id'])
|
# update_highest_processed_post_id_thread_safe(reply_row['post_id'])
|
||||||
|
|
||||||
rescue ActiveRecord::RecordInvalid => e
|
rescue ActiveRecord::RecordInvalid => e
|
||||||
puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}"
|
puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe)
|
# # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe)
|
||||||
update_highest_processed_post_id_thread_safe(post_id)
|
# update_highest_processed_post_id_thread_safe(post_id)
|
||||||
|
|
||||||
rescue ActiveRecord::RecordInvalid => e
|
rescue ActiveRecord::RecordInvalid => e
|
||||||
puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
|
puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
|
||||||
@ -1648,6 +1705,9 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
import_users
|
import_users
|
||||||
export_username_mapping_to_csv("/bitnami/discourse/sqlite/gossamer-migration-username-mapping#{timestamp}")
|
export_username_mapping_to_csv("/bitnami/discourse/sqlite/gossamer-migration-username-mapping#{timestamp}")
|
||||||
|
|
||||||
|
generate_user_id_mapping
|
||||||
|
# set_user_bio_images
|
||||||
|
|
||||||
import_categories
|
import_categories
|
||||||
|
|
||||||
#### import_topics_and_posts_with_attachments
|
#### import_topics_and_posts_with_attachments
|
||||||
|
Loading…
Reference in New Issue
Block a user