diff --git a/gossamer_forums.rb b/gossamer_forums.rb index 2cb5b68..6f76f07 100644 --- a/gossamer_forums.rb +++ b/gossamer_forums.rb @@ -1,7 +1,7 @@ # Federated Computer, Inc. # David Sainty 2024 A.D. # Gossamer Threads to Discourse -- Migration-Import Script -# v0.37.1 Huge move to threaded support for topic-post import; disable topic-post import for current user re-import 202408162130, persistence for exported files +# v0.38 Attempt to reimplement with mutex support for interrupting and resuming during post import. This will have to be tested / validated require 'mysql2' require 'open-uri' @@ -965,46 +965,61 @@ class GossamerForumsImporter < ImportScripts::Base def threaded_topic_import # Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0; - parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0") + # The query selects post_ids from gforum_Post where post_root_id is 0, meaning these posts are the topic starters (OPs). + # It also ensures that we only process posts with a post_id greater than the last processed one, allowing for resumption. + parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0 AND post_id > #{fetch_highest_processed_post_id} ORDER BY post_id ASC") parent_post_count = parent_post_ids.count batch_size = 100 # Set our batch size for number of posts to import in a single batch current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed. is_complete = false # Flag to indicate whether the import process is complete. + # Run until all posts have been processed. until is_complete # Query in batches, create pool, wait for termination, do it again - # SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id current_post_batch_max = current_post_batch + batch_size + ## Static pool size based on number of CPUs # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best - # Dynamically calculate the pool size based on system load + # Dynamically calculate the pool size based on system load to optimise performance pool_size = calculate_dynamic_pool_size - - pool = Concurrent::FixedThreadPool.new(pool_size) + pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size. + # Process each post in the current batch while current_post_batch < current_post_batch_max - post_id = parent_post_ids[current_post_batch] + post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{thread_count} threads" + # Submit the import job for the current post_id to the thread pool pool.post do puts "Processing post ID: #{post_id}" - topic_import_job(post_id) + topic_import_job(post_id) # Import topic and its replies end - current_post_batch += 1 + current_post_batch += 1 # Increment, moving to next post in the batch break if current_post_batch >= parent_post_count end - pool.shutdown - pool.wait_for_termination + pool.shutdown # Initiate thread pool shutdown after all jobs submitted + pool.wait_for_termination # Wait for all threads to finish exec + # Set when all posts have been processed is_complete = true if current_post_batch >= parent_post_count end end + # Method to ensure thread-safe updates to highest_processed_post_id + def update_highest_processed_post_id_thread_safe(post_id) + @highest_processed_mutex ||= Mutex.new + @highest_processed_mutex.synchronize do + if post_id > fetch_highest_processed_post_id + update_highest_processed_post_id(post_id) + end + end + end + # Method to import an entire topic, including its first post and all subsequent replies def topic_import_job(post_id) #Here is where you can import the entire topic @@ -1110,10 +1125,6 @@ class GossamerForumsImporter < ImportScripts::Base new_url = "https://new/t/#{topic.slug}/#{topic.id}" insert_url_mapping(row['post_id'], new_url, unique_title) - # Update the highest processed post_id in the database - puts "Updated highest processed post_id #{post_id}" - update_highest_processed_post_id(post_id) - # Now fetch and import all replies to this topic replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, post_likes FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC") @@ -1158,15 +1169,17 @@ class GossamerForumsImporter < ImportScripts::Base # Handle any attachments associated with the reply handle_post_attachments(reply_row['post_id'], post, reply_user_id) - # Update the highest processed post_id in the database - puts "Updated highest processed post_id #{reply_row['post_id']}" - update_highest_processed_post_id(reply_row['post_id']) + # Update the highest processed post_id in the database (thread-safe) + update_highest_processed_post_id_thread_safe(reply_row['post_id']) rescue ActiveRecord::RecordInvalid => e puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}" end end + # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe) + update_highest_processed_post_id_thread_safe(post_id) + rescue ActiveRecord::RecordInvalid => e puts "Error importing topic with post_id #{row['post_id']}: #{e.message}" end