diff --git a/gossamer_forums.rb b/gossamer_forums.rb index 8ff602f..72e43fd 100644 --- a/gossamer_forums.rb +++ b/gossamer_forums.rb @@ -1,7 +1,7 @@ # Federated Computer, Inc. # David Sainty 2024 A.D. # Gossamer Threads to Discourse -- Migration-Import Script -# v0.42 Mutex addition for SQLite (which may be very important) +# v0.42.1 Rewrite and simplication of concurrent-ruby support require 'mysql2' require 'open-uri' @@ -1077,7 +1077,9 @@ class GossamerForumsImporter < ImportScripts::Base parent_post_count = parent_post_ids.count batch_size = 100 # Set our batch size for number of posts to import in a single batch - current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed. + +#### current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed. + is_complete = false # Flag to indicate whether the import process is complete. # Mutex to control access to shared resources @@ -1086,72 +1088,82 @@ class GossamerForumsImporter < ImportScripts::Base # Run until all posts have been processed. until is_complete - # Query in batches, create pool, wait for termination, do it again - current_post_batch_max = current_post_batch + batch_size + +#### # Query in batches, create pool, wait for termination, do it again +#### current_post_batch_max = current_post_batch + batch_size + + # Get the next batch of posts + current_post_batch = parent_post_ids.shift(batch_size) + + break if current_post_batch.empty? + + # Process each post in the current batch + current_post_batch.each do |post_id| + ####### # Static pool size based on number of CPUs # # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) # Create a thread pool that is bounded by processors avaialable # # pool = Concurrent::FixedThreadPool.new(8) # Create a thread pool of 8 pool members - # Dynamically calculate the pool size based on system load to optimise performance - pool_size = calculate_dynamic_pool_size # Dynamically calculate what the pool size "ought" to be. - pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size +#### # Dynamically calculate the pool size based on system load to optimise performance +#### pool_size = calculate_dynamic_pool_size # Dynamically calculate what the pool size "ought" to be. +#### pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size - # Process each post in the current batch - while current_post_batch < current_post_batch_max - post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post +#### # Process each post in the current batch +#### while current_post_batch < current_post_batch_max +#### post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post - # Check if the post has already been processed or is incomplete - post_status = post_status(post_id) - if post_status.nil? || post_status == 0 - puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{pool_size} threads" +#### # Check if the post has already been processed or is incomplete +#### post_status = post_status(post_id) - # Submit the import job for the current post_id to the thread pool - pool.post do + # Submit the import job for the current post_id to the thread pool + pool.post do puts "PP 11 -- #{post_id}" - # Initialise a new MariaDB / Mysql2 client inside of each thread - mysql_client = Mysql2::Client.new( - host: "slowtwitch.northend.network", - username: "admin", - password: "yxnh93Ybbz2Nm8#mp28zCVv", - database: "slowtwitch" - ) - + # Initialise a new MariaDB / Mysql2 client inside of each thread + mysql_client = Mysql2::Client.new( + host: "slowtwitch.northend.network", + username: "admin", + password: "yxnh93Ybbz2Nm8#mp28zCVv", + database: "slowtwitch" + ) puts "PP 22 -- ${post_id}" + + begin # Use connection pooling for PostgreSQL and synchronize access to shared resources ActiveRecord::Base.connection_pool.with_connection do -### mutex.synchronize do - begin - puts "Processing post ID: #{post_id}" - topic_import_job(post_id, mysql_client, sqlite_mutex) # Import topic and its replies - sqlite_mutex.sychronize do - mark_post_as_complete(post_id) # Mark as complete in SQLite table - end - rescue => e - puts "Error processing post ID #{post_id}: #{e.message}" - sqlite_mutex.sychronize do - mark_post_as_failed(post_id) - end - ensure - # Ensure the MariaDB connection is closed after processing - mysql_client.close if mysql_client + post_status = post_status(post_id) + if post_status.nil? || post_status == 0 + puts "Starting import for post_id #{post_id}" + topic_import_job(post_id, mysql_client, sqlite_mutex) # Import topic and its replies + sqlite_mutex.sychronize do + mark_post_as_complete(post_id) # Mark as complete in SQLite table end -### end + else + puts "Skipping post_id #{post_id}, already processed." + end end + rescue => e + puts "Error processing post ID #{post_id}: #{e.message}" + sqlite_mutex.sychronize do + mark_post_as_failed(post_id) + end + ensure + # Ensure the MariaDB connection is closed after processing + mysql_client.close if mysql_client end - else - puts "Skipping post_id #{post_id}, already processed." end - - current_post_batch += 1 # Increment, moving to next post in the batch - break if current_post_batch >= parent_post_count end - + +#### current_post_batch += 1 # Increment, moving to next post in the batch +#### break if current_post_batch >= parent_post_count + + # Wait for all jobs in the current batch to finish before proceeding pool.shutdown # Initiate thread pool shutdown after all jobs submitted pool.wait_for_termination # Wait for all threads to finish exec - # Set when all posts have been processed - is_complete = true if current_post_batch >= parent_post_count + # Check if all posts have been processed +#### is_complete = true if current_post_batch >= parent_post_count + is_complete = parent_post_ids.empty? end end