v0.38 Attempt to reimplement with mutex support for interrupting and resuming during post import. This will have to be tested / validated
This commit is contained in:
		@@ -1,7 +1,7 @@
 | 
				
			|||||||
# Federated Computer, Inc.
 | 
					# Federated Computer, Inc.
 | 
				
			||||||
# David Sainty <saint@federated.computer>  2024 A.D.
 | 
					# David Sainty <saint@federated.computer>  2024 A.D.
 | 
				
			||||||
# Gossamer Threads to Discourse -- Migration-Import Script
 | 
					# Gossamer Threads to Discourse -- Migration-Import Script
 | 
				
			||||||
# v0.37.1 Huge move to threaded support for topic-post import; disable topic-post import for current user re-import 202408162130, persistence for exported files
 | 
					# v0.38 Attempt to reimplement with mutex support for interrupting and resuming during post import. This will have to be tested / validated
 | 
				
			||||||
 | 
					
 | 
				
			||||||
require 'mysql2'
 | 
					require 'mysql2'
 | 
				
			||||||
require 'open-uri'
 | 
					require 'open-uri'
 | 
				
			||||||
@@ -965,46 +965,61 @@ class GossamerForumsImporter < ImportScripts::Base
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
  def threaded_topic_import
 | 
					  def threaded_topic_import
 | 
				
			||||||
    # Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0;
 | 
					    # Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0;
 | 
				
			||||||
    parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0")
 | 
					    # The query selects post_ids from gforum_Post where post_root_id is 0, meaning these posts are the topic starters (OPs).
 | 
				
			||||||
 | 
					    # It also ensures that we only process posts with a post_id greater than the last processed one, allowing for resumption.
 | 
				
			||||||
 | 
					    parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0 AND post_id > #{fetch_highest_processed_post_id} ORDER BY post_id ASC")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    parent_post_count = parent_post_ids.count
 | 
					    parent_post_count = parent_post_ids.count
 | 
				
			||||||
    batch_size = 100 # Set our batch size for number of posts to import in a single batch
 | 
					    batch_size = 100 # Set our batch size for number of posts to import in a single batch
 | 
				
			||||||
    current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed.
 | 
					    current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed.
 | 
				
			||||||
    is_complete = false # Flag to indicate whether the import process is complete.
 | 
					    is_complete = false # Flag to indicate whether the import process is complete.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Run until all posts have been processed.
 | 
				
			||||||
    until is_complete
 | 
					    until is_complete
 | 
				
			||||||
      # Query in batches, create pool, wait for termination, do it again
 | 
					      # Query in batches, create pool, wait for termination, do it again
 | 
				
			||||||
      # SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id  
 | 
					 | 
				
			||||||
      current_post_batch_max = current_post_batch + batch_size
 | 
					      current_post_batch_max = current_post_batch + batch_size
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      ## Static pool size based on number of CPUs
 | 
				
			||||||
      # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best
 | 
					      # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      # Dynamically calculate the pool size based on system load
 | 
					      # Dynamically calculate the pool size based on system load to optimise performance
 | 
				
			||||||
      pool_size = calculate_dynamic_pool_size
 | 
					      pool_size = calculate_dynamic_pool_size
 | 
				
			||||||
 | 
					      pool = Concurrent::FixedThreadPool.new(pool_size)  # Create a thread pool with the calculated size.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      pool = Concurrent::FixedThreadPool.new(pool_size)
 | 
					      # Process each post in the current batch
 | 
				
			||||||
 | 
					 | 
				
			||||||
      while current_post_batch < current_post_batch_max
 | 
					      while current_post_batch < current_post_batch_max
 | 
				
			||||||
        post_id = parent_post_ids[current_post_batch]
 | 
					        post_id = parent_post_ids[current_post_batch]  # Fetch the post_id for the current post
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{thread_count} threads"
 | 
					        puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{thread_count} threads"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # Submit the import job for the current post_id to the thread pool
 | 
				
			||||||
        pool.post do
 | 
					        pool.post do
 | 
				
			||||||
          puts "Processing post ID: #{post_id}"
 | 
					          puts "Processing post ID: #{post_id}"
 | 
				
			||||||
          topic_import_job(post_id)
 | 
					          topic_import_job(post_id)  # Import topic and its replies
 | 
				
			||||||
        end
 | 
					        end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        current_post_batch += 1
 | 
					        current_post_batch += 1  # Increment, moving to next post in the batch
 | 
				
			||||||
        break if current_post_batch >= parent_post_count
 | 
					        break if current_post_batch >= parent_post_count
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
      
 | 
					      
 | 
				
			||||||
      pool.shutdown
 | 
					      pool.shutdown  # Initiate thread pool shutdown after all jobs submitted
 | 
				
			||||||
      pool.wait_for_termination
 | 
					      pool.wait_for_termination  # Wait for all threads to finish exec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      # Set when all posts have been processed
 | 
				
			||||||
      is_complete = true if current_post_batch >= parent_post_count
 | 
					      is_complete = true if current_post_batch >= parent_post_count
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  # Method to ensure thread-safe updates to highest_processed_post_id
 | 
				
			||||||
 | 
					  def update_highest_processed_post_id_thread_safe(post_id)
 | 
				
			||||||
 | 
					    @highest_processed_mutex ||= Mutex.new
 | 
				
			||||||
 | 
					    @highest_processed_mutex.synchronize do
 | 
				
			||||||
 | 
					      if post_id > fetch_highest_processed_post_id
 | 
				
			||||||
 | 
					        update_highest_processed_post_id(post_id)
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
 | 
					    end
 | 
				
			||||||
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  # Method to import an entire topic, including its first post and all subsequent replies
 | 
					  # Method to import an entire topic, including its first post and all subsequent replies
 | 
				
			||||||
  def topic_import_job(post_id)
 | 
					  def topic_import_job(post_id)
 | 
				
			||||||
    #Here is where you can import the entire topic
 | 
					    #Here is where you can import the entire topic
 | 
				
			||||||
@@ -1110,10 +1125,6 @@ class GossamerForumsImporter < ImportScripts::Base
 | 
				
			|||||||
        new_url = "https://new/t/#{topic.slug}/#{topic.id}"
 | 
					        new_url = "https://new/t/#{topic.slug}/#{topic.id}"
 | 
				
			||||||
        insert_url_mapping(row['post_id'], new_url, unique_title)
 | 
					        insert_url_mapping(row['post_id'], new_url, unique_title)
 | 
				
			||||||
  
 | 
					  
 | 
				
			||||||
        # Update the highest processed post_id in the database
 | 
					 | 
				
			||||||
        puts "Updated highest processed post_id #{post_id}"
 | 
					 | 
				
			||||||
        update_highest_processed_post_id(post_id)
 | 
					 | 
				
			||||||
  
 | 
					 | 
				
			||||||
        # Now fetch and import all replies to this topic
 | 
					        # Now fetch and import all replies to this topic
 | 
				
			||||||
        replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, post_likes FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC")
 | 
					        replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, post_likes FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC")
 | 
				
			||||||
  
 | 
					  
 | 
				
			||||||
@@ -1158,15 +1169,17 @@ class GossamerForumsImporter < ImportScripts::Base
 | 
				
			|||||||
            # Handle any attachments associated with the reply
 | 
					            # Handle any attachments associated with the reply
 | 
				
			||||||
            handle_post_attachments(reply_row['post_id'], post, reply_user_id)
 | 
					            handle_post_attachments(reply_row['post_id'], post, reply_user_id)
 | 
				
			||||||
  
 | 
					  
 | 
				
			||||||
            # Update the highest processed post_id in the database
 | 
					            # Update the highest processed post_id in the database (thread-safe)
 | 
				
			||||||
            puts "Updated highest processed post_id #{reply_row['post_id']}"
 | 
					            update_highest_processed_post_id_thread_safe(reply_row['post_id'])
 | 
				
			||||||
            update_highest_processed_post_id(reply_row['post_id'])
 | 
					 | 
				
			||||||
  
 | 
					  
 | 
				
			||||||
          rescue ActiveRecord::RecordInvalid => e
 | 
					          rescue ActiveRecord::RecordInvalid => e
 | 
				
			||||||
            puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}"
 | 
					            puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}"
 | 
				
			||||||
          end
 | 
					          end
 | 
				
			||||||
        end
 | 
					        end
 | 
				
			||||||
  
 | 
					  
 | 
				
			||||||
 | 
					        # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe)
 | 
				
			||||||
 | 
					        update_highest_processed_post_id_thread_safe(post_id)
 | 
				
			||||||
 | 
					  
 | 
				
			||||||
      rescue ActiveRecord::RecordInvalid => e
 | 
					      rescue ActiveRecord::RecordInvalid => e
 | 
				
			||||||
        puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
 | 
					        puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user