diff --git a/gossamer_forums.rb b/gossamer_forums.rb index 2fd0996..45d5866 100644 --- a/gossamer_forums.rb +++ b/gossamer_forums.rb @@ -1,7 +1,7 @@ # Federated Computer, Inc. # David Sainty 2024 A.D. # Gossamer Threads to Discourse -- Migration-Import Script -# v0.40 Move to per thread MySQL/MariaDB connection +# v0.41 Further improve FULL concurrency support, for both MySQL/MariaDB _and_ importantly, the PostGreSQL Discourse DB additions and changes with ActiveRecord connection pooling and Mutex require 'mysql2' require 'open-uri' @@ -1072,16 +1072,20 @@ class GossamerForumsImporter < ImportScripts::Base current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed. is_complete = false # Flag to indicate whether the import process is complete. + # Mutex to control access to shared resources + mutex = Mutex.new + # Run until all posts have been processed. until is_complete # Query in batches, create pool, wait for termination, do it again current_post_batch_max = current_post_batch + batch_size - ## Static pool size based on number of CPUs - # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best +# # Static pool size based on number of CPUs +# # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) # Create a thread pool that is bounded by processors avaialable +# # pool = Concurrent::FixedThreadPool.new(8) # Create a thread pool of 8 pool members # Dynamically calculate the pool size based on system load to optimise performance - pool_size = calculate_dynamic_pool_size + pool_size = calculate_dynamic_pool_size # Dynamically calculate what the pool size "ought" to be. pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size # Process each post in the current batch @@ -1102,19 +1106,26 @@ class GossamerForumsImporter < ImportScripts::Base password: "yxnh93Ybbz2Nm8#mp28zCVv", database: "slowtwitch" ) - begin - puts "Processing post ID: #{post_id}" - topic_import_job(post_id, mysql_client) # Import topic and its replies - mark_post_as_complete(post_id) # Mark as complete in SQLite table - rescue => e - puts "Error processing post ID #{post_id}: #{e.message}" - mark_post_as_failed(post_id) - ensure - mysql_client.close if mysql_client + + # Use connection ppoling for PostgreSQL and synchronize access to shared resources + ActiveRecord::Base.connection_pool.with_connection do + mutex.synchronize do + begin + puts "Processing post ID: #{post_id}" + topic_import_job(post_id, mysql_client) # Import topic and its replies + mark_post_as_complete(post_id) # Mark as complete in SQLite table + rescue => e + puts "Error processing post ID #{post_id}: #{e.message}" + mark_post_as_failed(post_id) + ensure + # Ensure the MariaDB connection is closed after processing + mysql_client.close if mysql_client + end + end end end else - puts "Skipping post_id #{post_id}, already processed." + puts "Skipping post_id #{post_id}, already processed." end current_post_batch += 1 # Increment, moving to next post in the batch @@ -1172,134 +1183,140 @@ class GossamerForumsImporter < ImportScripts::Base # Check if the topic has already been imported using the custom field 'original_gossamer_id' unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id']) - # Create the new topic in Discourse - begin - suffix = 1 - topic_created = false - - while !topic_created - begin - puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}" - topic = Topic.create!( - title: unique_title, - user_id: discourse_user_id, - created_at: Time.at(row['post_time']), - updated_at: Time.at(row['post_time']), - category_id: discourse_category_id, - views: post_views || 0, - posts_count: 0 - ) - topic.custom_fields['original_gossamer_id'] = row['post_id'] - topic.save! - - topic_created = true - rescue ActiveRecord::RecordInvalid => e - if e.message.include?("Title has already been used") - unique_title = "#{title} (#{suffix})" - suffix += 1 - else - raise e + ActiveRecord::Base.transaction do + # Create the new topic in Discourse + begin + suffix = 1 + topic_created = false + + while !topic_created + begin + puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}" + topic = Topic.create!( + title: unique_title, + user_id: discourse_user_id, + created_at: Time.at(row['post_time']), + updated_at: Time.at(row['post_time']), + category_id: discourse_category_id, + views: post_views || 0, + posts_count: 0 + ) + topic.custom_fields['original_gossamer_id'] = row['post_id'] + topic.save! + + topic_created = true + rescue ActiveRecord::RecordInvalid => e + if e.message.include?("Title has already been used") + unique_title = "#{title} (#{suffix})" + suffix += 1 + else + raise e + end end end - end + + # Update the database with the last post time and user for the topic + update_db_topic_last_post_time(topic.id, Time.at(row['post_time']).to_i) + update_db_topic_last_post_user(topic.id, discourse_user_id) + + # Increment the topic count for the user + update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1) + + # Sanitize and prepare the post message for Discourse + sanitized_post_message = sanitize_post_message(row['post_message']) + + puts "CREATE POST topic.id #{topic.id} discourse_user_id #{discourse_user_id}" + + # Increment the post count for the topic + post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1 + update_db_topic_post_numbers(topic.id, post_number) - # Update the database with the last post time and user for the topic - update_db_topic_last_post_time(topic.id, Time.at(row['post_time']).to_i) - update_db_topic_last_post_user(topic.id, discourse_user_id) - - # Increment the topic count for the user - update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1) - - # Sanitize and prepare the post message for Discourse - sanitized_post_message = sanitize_post_message(row['post_message']) - - puts "CREATE POST topic.id #{topic.id} discourse_user_id #{discourse_user_id}" - - # Increment the post count for the topic - post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1 - update_db_topic_post_numbers(topic.id, post_number) - - # Create the initial post in the new topic - post = Post.create!( - topic_id: topic.id, - user_id: discourse_user_id, - raw: sanitized_post_message, - created_at: Time.at(row['post_time']), - updated_at: Time.at(row['post_time']), - reads: post_views || 0, - post_number: post_number - ) - post.custom_fields['original_gossamer_id'] = row['post_id'] - post.save! - - # Increment the post count for the topic and user - update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1) - update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1) - - # Handle any attachments associated with the post - handle_post_attachments(row['post_id'], post, discourse_user_id) - - # Create URL mappings for the new topic - new_url = "https://new/t/#{topic.slug}/#{topic.id}" - insert_url_mapping(row['post_id'], new_url, unique_title) - - # Now fetch and import all replies to this topic - replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC") - - # Import each reply sequentially - replies.each do |reply_row| - begin - # Fetch the discourse user ID for the reply - reply_user_id = fetch_user_id_mapping(reply_row['user_id_fk']) - - # Sanitize and prepare the reply message for Discourse - sanitized_reply_message = sanitize_post_message(reply_row['post_message']) - - puts "CREATE REPLY in topic_id #{topic.id}" - - # Increment the post count for the topic - post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1 - update_db_topic_post_numbers(topic.id, post_number) - - # Create the reply post in the existing topic - post = Post.create!( - topic_id: topic.id, - user_id: reply_user_id, - raw: sanitized_reply_message, - created_at: Time.at(reply_row['post_time']), - updated_at: Time.at(reply_row['post_time']), - post_number: post_number - ) - post.custom_fields['original_gossamer_id'] = reply_row['post_id'] - post.save! - - # Increment the post count for the topic and user - update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1) - update_db_user_post_count(reply_user_id, fetch_db_user_post_count(reply_user_id).to_i + 1) - - # Update last post time and user for the topic - if fetch_db_topic_last_post_time(topic.id).nil? || Time.at(reply_row['post_time']).to_i > fetch_db_topic_last_post_time(topic.id).to_i - update_db_topic_last_post_time(topic.id, Time.at(reply_row['post_time']).to_i) - update_db_topic_last_post_user(topic.id, reply_user_id) + # Create the initial post in the new topic + post = Post.create!( + topic_id: topic.id, + user_id: discourse_user_id, + raw: sanitized_post_message, + created_at: Time.at(row['post_time']), + updated_at: Time.at(row['post_time']), + reads: post_views || 0, + post_number: post_number + ) + post.custom_fields['original_gossamer_id'] = row['post_id'] + post.save! + + # Increment the post count for the topic and user + update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1) + update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1) + + # Handle any attachments associated with the post + handle_post_attachments(row['post_id'], post, discourse_user_id) + + # Create URL mappings for the new topic + new_url = "https://new/t/#{topic.slug}/#{topic.id}" + insert_url_mapping(row['post_id'], new_url, unique_title) + + # Fetch and import all replies to this topic + replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC") + + # Import each reply sequentially + replies.each do |reply_row| + begin + # Fetch the discourse user ID for the reply + reply_user_id = fetch_user_id_mapping(reply_row['user_id_fk']) + + # Sanitize and prepare the reply message for Discourse + sanitized_reply_message = sanitize_post_message(reply_row['post_message']) + + puts "CREATE REPLY in topic_id #{topic.id}" + + # Increment the post count for the topic + post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1 + update_db_topic_post_numbers(topic.id, post_number) + + # Create the reply post in the existing topic + post = Post.create!( + topic_id: topic.id, + user_id: reply_user_id, + raw: sanitized_reply_message, + created_at: Time.at(reply_row['post_time']), + updated_at: Time.at(reply_row['post_time']), + post_number: post_number + ) + post.custom_fields['original_gossamer_id'] = reply_row['post_id'] + post.save! + + # Increment the post count for the topic and user + update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1) + update_db_user_post_count(reply_user_id, fetch_db_user_post_count(reply_user_id).to_i + 1) + + # Update last post time and user for the topic + if fetch_db_topic_last_post_time(topic.id).nil? || Time.at(reply_row['post_time']).to_i > fetch_db_topic_last_post_time(topic.id).to_i + update_db_topic_last_post_time(topic.id, Time.at(reply_row['post_time']).to_i) + update_db_topic_last_post_user(topic.id, reply_user_id) + end + + # Handle any attachments associated with the reply + handle_post_attachments(reply_row['post_id'], post, reply_user_id) + +# # Update the highest processed post_id in the database (thread-safe) +# update_highest_processed_post_id_thread_safe(reply_row['post_id']) + +# rescue ActiveRecord::RecordInvalid => e + rescue => e + puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}" end - - # Handle any attachments associated with the reply - handle_post_attachments(reply_row['post_id'], post, reply_user_id) - -# # Update the highest processed post_id in the database (thread-safe) -# update_highest_processed_post_id_thread_safe(reply_row['post_id']) - - rescue ActiveRecord::RecordInvalid => e - puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}" end + +# # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe) +# update_highest_processed_post_id_thread_safe(post_id) + + rescue ActiveRecord::RecordInvalid => e + puts "Error importing topic with post_id #{row['post_id']}: #{e.message}" + raise ActiveRecord::Rollback end - -# # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe) -# update_highest_processed_post_id_thread_safe(post_id) - - rescue ActiveRecord::RecordInvalid => e - puts "Error importing topic with post_id #{row['post_id']}: #{e.message}" end + else + puts "Topic for post_id #{row['post_id']} already exists, skipping creation." end end