v0.37 Huge move to threaded support for topic-post import; disable topic-post import for current user re-import 202408162130
This commit is contained in:
parent
3d7e5701ef
commit
84162b1b78
@ -1,7 +1,7 @@
|
|||||||
# Federated Computer, Inc.
|
# Federated Computer, Inc.
|
||||||
# David Sainty <saint@federated.computer> 2024 A.D.
|
# David Sainty <saint@federated.computer> 2024 A.D.
|
||||||
# Gossamer Threads to Discourse -- Migration-Import Script
|
# Gossamer Threads to Discourse -- Migration-Import Script
|
||||||
# v0.36.8 Move SQLite3 DB to persistent storage /bitnami/discourse/sqlite
|
# v0.37 Huge move to threaded support for topic-post import; disable topic-post import for current user re-import 202408162130
|
||||||
|
|
||||||
require 'mysql2'
|
require 'mysql2'
|
||||||
require 'open-uri'
|
require 'open-uri'
|
||||||
@ -929,26 +929,71 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
|
|
||||||
#THREADING OUTLINE HERE --------------------------------------------
|
#THREADING OUTLINE HERE --------------------------------------------
|
||||||
|
|
||||||
|
# Method to dynamically calculate the optimal thread pool size based on system load
|
||||||
|
def calculate_dynamic_pool_size
|
||||||
|
# Fetch current CPU load average using Sys::ProcTable.loadavg
|
||||||
|
# load_avg = Sys::ProcTable.loadavg.last # Get the 15-minute load average
|
||||||
|
load_avg = Sys::ProcTable.loadavg
|
||||||
|
|
||||||
|
# Calculate the pool size based on the load average
|
||||||
|
# Adjust the multiplier and threshold as needed
|
||||||
|
# pool_size = [(Concurrent.processor_count / (load_avg + 0.1)).to_i, 1].max
|
||||||
|
|
||||||
|
# Extract the 1-minute load average from the fetched data
|
||||||
|
one_minute_load_avg = load_avg[0]
|
||||||
|
|
||||||
|
# Log the current load and CPU information for debugging and monitoring purposes
|
||||||
|
puts "1-minute Load Average: #{one_minute_load_avg}, CPU Count: #{cpu_count}"
|
||||||
|
|
||||||
|
# Calculate the initial pool size based on the ratio of the 1-minute load average to the number of CPUs
|
||||||
|
# This ratio gives an idea of how many threads should be running to efficiently utilize the CPU resources
|
||||||
|
initial_pool_size = (cpu_count / one_minute_load_avg).ceil
|
||||||
|
|
||||||
|
# Ensure the pool size is at least 1 to avoid creating a pool with zero threads
|
||||||
|
initial_pool_size = 1 if initial_pool_size < 1
|
||||||
|
|
||||||
|
# Cap the maximum pool size to twice the number of CPUs
|
||||||
|
# This prevents overloading the system with too many threads, which could lead to diminishing returns
|
||||||
|
max_pool_size = cpu_count * 2
|
||||||
|
|
||||||
|
# Adjust the final pool size to be within the valid range (1 to max_pool_size)
|
||||||
|
pool_size = [[initial_pool_size, max_pool_size].min, 1].max
|
||||||
|
|
||||||
|
puts "Calculated and adjusted dynamic pool size: #{pool_size}" # Log the dynamically adjusted pool size
|
||||||
|
pool_size
|
||||||
|
end
|
||||||
|
|
||||||
def threaded_topic_import
|
def threaded_topic_import
|
||||||
# Get list of IDS that have no parent ID - SELECT post_id FROM gforum_Post WHERE post_root_id = 0;
|
# Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0;
|
||||||
parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0")
|
parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0")
|
||||||
|
|
||||||
parent_post_count = parent_post_ids.count
|
parent_post_count = parent_post_ids.count
|
||||||
batch_size = 100 #set our batch size
|
batch_size = 100 # Set our batch size for number of posts to import in a single batch
|
||||||
current_post_batch = 0 #set our current batch number
|
current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed.
|
||||||
is_complete = false
|
is_complete = false # Flag to indicate whether the import process is complete.
|
||||||
|
|
||||||
until is_complete
|
until is_complete
|
||||||
# Query in batches, create pool, wait for termination, do it again
|
# Query in batches, create pool, wait for termination, do it again
|
||||||
# SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id
|
# SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id
|
||||||
current_post_batch_max = current_post_batch + batch_size
|
current_post_batch_max = current_post_batch + batch_size
|
||||||
pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best
|
|
||||||
|
# pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best
|
||||||
|
|
||||||
|
# Dynamically calculate the pool size based on system load
|
||||||
|
pool_size = calculate_dynamic_pool_size
|
||||||
|
|
||||||
|
pool = Concurrent::FixedThreadPool.new(pool_size)
|
||||||
|
|
||||||
while current_post_batch < current_post_batch_max
|
while current_post_batch < current_post_batch_max
|
||||||
post_id = parent_post_ids[current_post_batch]
|
post_id = parent_post_ids[current_post_batch]
|
||||||
|
|
||||||
|
puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{thread_count} threads"
|
||||||
|
|
||||||
pool.post do
|
pool.post do
|
||||||
|
puts "Processing post ID: #{post_id}"
|
||||||
topic_import_job(post_id)
|
topic_import_job(post_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
current_post_batch += 1
|
current_post_batch += 1
|
||||||
break if current_post_batch >= parent_post_count
|
break if current_post_batch >= parent_post_count
|
||||||
end
|
end
|
||||||
@ -956,19 +1001,178 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
pool.shutdown
|
pool.shutdown
|
||||||
pool.wait_for_termination
|
pool.wait_for_termination
|
||||||
|
|
||||||
if current_post_batch >= parent_post_count
|
is_complete = true if current_post_batch >= parent_post_count
|
||||||
is_complete = true
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Method to import an entire topic, including its first post and all subsequent replies
|
||||||
def topic_import_job(post_id)
|
def topic_import_job(post_id)
|
||||||
#Here is where you can import the entire topic
|
#Here is where you can import the entire topic
|
||||||
#Get post -- SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post WHERE post_id = post_id
|
#Get post -- SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post WHERE post_id = post_id
|
||||||
#check if exists, create if not
|
#check if exists, create if not
|
||||||
#get children, create -- SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post WHERE post_root_id = post_id
|
#get children, create -- SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post WHERE post_root_id = post_id
|
||||||
#this parts needs to be synchronously to avoid race conditions
|
#this parts needs to be synchronously to avoid race conditions
|
||||||
|
|
||||||
|
# Fetch the post data for the given post_id (this is the first post in the topic)
|
||||||
|
row = execute_query("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_likes, post_replies FROM gforum_Post WHERE post_id = #{post_id}").first
|
||||||
|
|
||||||
|
# Early return if the post data is not found
|
||||||
|
return unless row
|
||||||
|
|
||||||
|
# Extract key values from the fetched row
|
||||||
|
post_id = row['post_id'].to_i
|
||||||
|
puts "Processing post_id #{row['post_id']} post_root_id #{row['post_root_id']} post_subject/title #{row['post_subject']} forum_id_fk/category_id #{row['forum_id_fk']}"
|
||||||
|
|
||||||
|
# Fetch the mapped Discourse user and category ID based on Gossamer data
|
||||||
|
discourse_user_id = fetch_user_id_mapping(row['user_id_fk'])
|
||||||
|
discourse_category_id = fetch_category_id_mapping(row['forum_id_fk'])
|
||||||
|
puts "discourse_user_id #{discourse_user_id} discourse_category_id #{discourse_category_id}"
|
||||||
|
return unless discourse_user_id && discourse_category_id
|
||||||
|
|
||||||
|
# Ensure the topic title is valid and generate a unique title if needed
|
||||||
|
title = ensure_valid_title(row['post_subject'])
|
||||||
|
unique_title = title
|
||||||
|
|
||||||
|
# Fetch the number of views the post has had
|
||||||
|
post_views = fetch_post_views(row['post_id'])
|
||||||
|
|
||||||
|
# Check if the topic has already been imported using the custom field 'original_gossamer_id'
|
||||||
|
unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id'])
|
||||||
|
# Create the new topic in Discourse
|
||||||
|
begin
|
||||||
|
suffix = 1
|
||||||
|
topic_created = false
|
||||||
|
|
||||||
|
while !topic_created
|
||||||
|
begin
|
||||||
|
puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}"
|
||||||
|
topic = Topic.create!(
|
||||||
|
title: unique_title,
|
||||||
|
user_id: discourse_user_id,
|
||||||
|
created_at: Time.at(row['post_time']),
|
||||||
|
updated_at: Time.at(row['post_time']),
|
||||||
|
category_id: discourse_category_id,
|
||||||
|
views: post_views || 0,
|
||||||
|
posts_count: 0
|
||||||
|
)
|
||||||
|
topic.custom_fields['original_gossamer_id'] = row['post_id']
|
||||||
|
topic.save!
|
||||||
|
|
||||||
|
topic_created = true
|
||||||
|
rescue ActiveRecord::RecordInvalid => e
|
||||||
|
if e.message.include?("Title has already been used")
|
||||||
|
unique_title = "#{title} (#{suffix})"
|
||||||
|
suffix += 1
|
||||||
|
else
|
||||||
|
raise e
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# Update the database with the last post time and user for the topic
|
||||||
|
update_db_topic_last_post_time(topic.id, Time.at(row['post_time']).to_i)
|
||||||
|
update_db_topic_last_post_user(topic.id, discourse_user_id)
|
||||||
|
|
||||||
|
# Increment the topic count for the user
|
||||||
|
update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1)
|
||||||
|
|
||||||
|
# Sanitize and prepare the post message for Discourse
|
||||||
|
sanitized_post_message = sanitize_post_message(row['post_message'])
|
||||||
|
|
||||||
|
puts "CREATE POST topic.id #{topic.id} discourse_user_id #{discourse_user_id}"
|
||||||
|
|
||||||
|
# Increment the post count for the topic
|
||||||
|
post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1
|
||||||
|
update_db_topic_post_numbers(topic.id, post_number)
|
||||||
|
|
||||||
|
# Create the initial post in the new topic
|
||||||
|
post = Post.create!(
|
||||||
|
topic_id: topic.id,
|
||||||
|
user_id: discourse_user_id,
|
||||||
|
raw: sanitized_post_message,
|
||||||
|
created_at: Time.at(row['post_time']),
|
||||||
|
updated_at: Time.at(row['post_time']),
|
||||||
|
like_count: row['post_likes'] || 0,
|
||||||
|
reads: post_views || 0,
|
||||||
|
post_number: post_number
|
||||||
|
)
|
||||||
|
post.custom_fields['original_gossamer_id'] = row['post_id']
|
||||||
|
post.save!
|
||||||
|
|
||||||
|
# Increment the post count for the topic and user
|
||||||
|
update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1)
|
||||||
|
update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1)
|
||||||
|
|
||||||
|
# Handle any attachments associated with the post
|
||||||
|
handle_post_attachments(row['post_id'], post, discourse_user_id)
|
||||||
|
|
||||||
|
# Create URL mappings for the new topic
|
||||||
|
new_url = "https://new/t/#{topic.slug}/#{topic.id}"
|
||||||
|
insert_url_mapping(row['post_id'], new_url, unique_title)
|
||||||
|
|
||||||
|
# Update the highest processed post_id in the database
|
||||||
|
puts "Updated highest processed post_id #{post_id}"
|
||||||
|
update_highest_processed_post_id(post_id)
|
||||||
|
|
||||||
|
# Now fetch and import all replies to this topic
|
||||||
|
replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, post_likes FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC")
|
||||||
|
|
||||||
|
# Import each reply sequentially
|
||||||
|
replies.each do |reply_row|
|
||||||
|
begin
|
||||||
|
# Fetch the discourse user ID for the reply
|
||||||
|
reply_user_id = fetch_user_id_mapping(reply_row['user_id_fk'])
|
||||||
|
|
||||||
|
# Sanitize and prepare the reply message for Discourse
|
||||||
|
sanitized_reply_message = sanitize_post_message(reply_row['post_message'])
|
||||||
|
|
||||||
|
puts "CREATE REPLY in topic_id #{topic.id}"
|
||||||
|
|
||||||
|
# Increment the post count for the topic
|
||||||
|
post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1
|
||||||
|
update_db_topic_post_numbers(topic.id, post_number)
|
||||||
|
|
||||||
|
# Create the reply post in the existing topic
|
||||||
|
post = Post.create!(
|
||||||
|
topic_id: topic.id,
|
||||||
|
user_id: reply_user_id,
|
||||||
|
raw: sanitized_reply_message,
|
||||||
|
created_at: Time.at(reply_row['post_time']),
|
||||||
|
updated_at: Time.at(reply_row['post_time']),
|
||||||
|
like_count: reply_row['post_likes'] || 0,
|
||||||
|
post_number: post_number
|
||||||
|
)
|
||||||
|
post.custom_fields['original_gossamer_id'] = reply_row['post_id']
|
||||||
|
post.save!
|
||||||
|
|
||||||
|
# Increment the post count for the topic and user
|
||||||
|
update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1)
|
||||||
|
update_db_user_post_count(reply_user_id, fetch_db_user_post_count(reply_user_id).to_i + 1)
|
||||||
|
|
||||||
|
# Update last post time and user for the topic
|
||||||
|
if fetch_db_topic_last_post_time(topic.id).nil? || Time.at(reply_row['post_time']).to_i > fetch_db_topic_last_post_time(topic.id).to_i
|
||||||
|
update_db_topic_last_post_time(topic.id, Time.at(reply_row['post_time']).to_i)
|
||||||
|
update_db_topic_last_post_user(topic.id, reply_user_id)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Handle any attachments associated with the reply
|
||||||
|
handle_post_attachments(reply_row['post_id'], post, reply_user_id)
|
||||||
|
|
||||||
|
# Update the highest processed post_id in the database
|
||||||
|
puts "Updated highest processed post_id #{reply_row['post_id']}"
|
||||||
|
update_highest_processed_post_id(reply_row['post_id'])
|
||||||
|
|
||||||
|
rescue ActiveRecord::RecordInvalid => e
|
||||||
|
puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
rescue ActiveRecord::RecordInvalid => e
|
||||||
|
puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
#-------------------------------------------------------------------
|
#-------------------------------------------------------------------
|
||||||
|
|
||||||
@ -993,7 +1197,7 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
# Attachment example: highest_processed_post_id = 1359862
|
# Attachment example: highest_processed_post_id = 1359862
|
||||||
|
|
||||||
# Execute the query to get all posts ordered by post_id
|
# Execute the query to get all posts ordered by post_id
|
||||||
execute_query("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post ORDER BY post_id").each do |row|
|
execute_query("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_likes, post_replies FROM gforum_Post ORDER BY post_id").each do |row|
|
||||||
post_id = row['post_id'].to_i
|
post_id = row['post_id'].to_i
|
||||||
|
|
||||||
# Skip posts that have already been processed
|
# Skip posts that have already been processed
|
||||||
@ -1433,7 +1637,8 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
|
|
||||||
import_categories
|
import_categories
|
||||||
|
|
||||||
import_topics_and_posts_with_attachments
|
# import_topics_and_posts_with_attachments
|
||||||
|
threaded_topic_import
|
||||||
update_topic_stats
|
update_topic_stats
|
||||||
update_user_stats
|
update_user_stats
|
||||||
export_url_mapping_to_csv("gossamer-migration-url-mapping#{timestamp}")
|
export_url_mapping_to_csv("gossamer-migration-url-mapping#{timestamp}")
|
||||||
|
Loading…
Reference in New Issue
Block a user