diff --git a/gossamer_forums.rb b/gossamer_forums.rb index 5327f2f..0e016e0 100644 --- a/gossamer_forums.rb +++ b/gossamer_forums.rb @@ -1,7 +1,7 @@ # Federated Computer, Inc. # David Sainty 2024 A.D. # Gossamer Threads to Discourse -- Migration-Import Script -# v0.36.8 Move SQLite3 DB to persistent storage /bitnami/discourse/sqlite +# v0.37 Huge move to threaded support for topic-post import; disable topic-post import for current user re-import 202408162130 require 'mysql2' require 'open-uri' @@ -929,26 +929,71 @@ class GossamerForumsImporter < ImportScripts::Base #THREADING OUTLINE HERE -------------------------------------------- + # Method to dynamically calculate the optimal thread pool size based on system load + def calculate_dynamic_pool_size + # Fetch current CPU load average using Sys::ProcTable.loadavg + # load_avg = Sys::ProcTable.loadavg.last # Get the 15-minute load average + load_avg = Sys::ProcTable.loadavg + + # Calculate the pool size based on the load average + # Adjust the multiplier and threshold as needed + # pool_size = [(Concurrent.processor_count / (load_avg + 0.1)).to_i, 1].max + + # Extract the 1-minute load average from the fetched data + one_minute_load_avg = load_avg[0] + + # Log the current load and CPU information for debugging and monitoring purposes + puts "1-minute Load Average: #{one_minute_load_avg}, CPU Count: #{cpu_count}" + + # Calculate the initial pool size based on the ratio of the 1-minute load average to the number of CPUs + # This ratio gives an idea of how many threads should be running to efficiently utilize the CPU resources + initial_pool_size = (cpu_count / one_minute_load_avg).ceil + + # Ensure the pool size is at least 1 to avoid creating a pool with zero threads + initial_pool_size = 1 if initial_pool_size < 1 + + # Cap the maximum pool size to twice the number of CPUs + # This prevents overloading the system with too many threads, which could lead to diminishing returns + max_pool_size = cpu_count * 2 + + # Adjust the final pool size to be within the valid range (1 to max_pool_size) + pool_size = [[initial_pool_size, max_pool_size].min, 1].max + + puts "Calculated and adjusted dynamic pool size: #{pool_size}" # Log the dynamically adjusted pool size + pool_size + end + def threaded_topic_import - # Get list of IDS that have no parent ID - SELECT post_id FROM gforum_Post WHERE post_root_id = 0; + # Get list of TOPICS / OP posts, i.e. post ids that have no parent / root id - SELECT post_id FROM gforum_Post WHERE post_root_id = 0; parent_post_ids = execute_query("SELECT post_id FROM gforum_Post WHERE post_root_id = 0") parent_post_count = parent_post_ids.count - batch_size = 100 #set our batch size - current_post_batch = 0 #set our current batch number - is_complete = false + batch_size = 100 # Set our batch size for number of posts to import in a single batch + current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed. + is_complete = false # Flag to indicate whether the import process is complete. until is_complete # Query in batches, create pool, wait for termination, do it again # SELECT post_id FROM gforum_Post WHERE post_root_id = 0 ORDER BY post_id current_post_batch_max = current_post_batch + batch_size - pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best + + # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best + + # Dynamically calculate the pool size based on system load + pool_size = calculate_dynamic_pool_size + + pool = Concurrent::FixedThreadPool.new(pool_size) while current_post_batch < current_post_batch_max post_id = parent_post_ids[current_post_batch] + + puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{thread_count} threads" + pool.post do + puts "Processing post ID: #{post_id}" topic_import_job(post_id) end + current_post_batch += 1 break if current_post_batch >= parent_post_count end @@ -956,20 +1001,179 @@ class GossamerForumsImporter < ImportScripts::Base pool.shutdown pool.wait_for_termination - if current_post_batch >= parent_post_count - is_complete = true - end + is_complete = true if current_post_batch >= parent_post_count end end + # Method to import an entire topic, including its first post and all subsequent replies def topic_import_job(post_id) #Here is where you can import the entire topic #Get post -- SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post WHERE post_id = post_id #check if exists, create if not #get children, create -- SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post WHERE post_root_id = post_id #this parts needs to be synchronously to avoid race conditions + + # Fetch the post data for the given post_id (this is the first post in the topic) + row = execute_query("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_likes, post_replies FROM gforum_Post WHERE post_id = #{post_id}").first + + # Early return if the post data is not found + return unless row + + # Extract key values from the fetched row + post_id = row['post_id'].to_i + puts "Processing post_id #{row['post_id']} post_root_id #{row['post_root_id']} post_subject/title #{row['post_subject']} forum_id_fk/category_id #{row['forum_id_fk']}" + + # Fetch the mapped Discourse user and category ID based on Gossamer data + discourse_user_id = fetch_user_id_mapping(row['user_id_fk']) + discourse_category_id = fetch_category_id_mapping(row['forum_id_fk']) + puts "discourse_user_id #{discourse_user_id} discourse_category_id #{discourse_category_id}" + return unless discourse_user_id && discourse_category_id + + # Ensure the topic title is valid and generate a unique title if needed + title = ensure_valid_title(row['post_subject']) + unique_title = title + + # Fetch the number of views the post has had + post_views = fetch_post_views(row['post_id']) + + # Check if the topic has already been imported using the custom field 'original_gossamer_id' + unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id']) + # Create the new topic in Discourse + begin + suffix = 1 + topic_created = false + + while !topic_created + begin + puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}" + topic = Topic.create!( + title: unique_title, + user_id: discourse_user_id, + created_at: Time.at(row['post_time']), + updated_at: Time.at(row['post_time']), + category_id: discourse_category_id, + views: post_views || 0, + posts_count: 0 + ) + topic.custom_fields['original_gossamer_id'] = row['post_id'] + topic.save! + + topic_created = true + rescue ActiveRecord::RecordInvalid => e + if e.message.include?("Title has already been used") + unique_title = "#{title} (#{suffix})" + suffix += 1 + else + raise e + end + end + end + + # Update the database with the last post time and user for the topic + update_db_topic_last_post_time(topic.id, Time.at(row['post_time']).to_i) + update_db_topic_last_post_user(topic.id, discourse_user_id) + + # Increment the topic count for the user + update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1) + + # Sanitize and prepare the post message for Discourse + sanitized_post_message = sanitize_post_message(row['post_message']) + + puts "CREATE POST topic.id #{topic.id} discourse_user_id #{discourse_user_id}" + + # Increment the post count for the topic + post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1 + update_db_topic_post_numbers(topic.id, post_number) + + # Create the initial post in the new topic + post = Post.create!( + topic_id: topic.id, + user_id: discourse_user_id, + raw: sanitized_post_message, + created_at: Time.at(row['post_time']), + updated_at: Time.at(row['post_time']), + like_count: row['post_likes'] || 0, + reads: post_views || 0, + post_number: post_number + ) + post.custom_fields['original_gossamer_id'] = row['post_id'] + post.save! + + # Increment the post count for the topic and user + update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1) + update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1) + + # Handle any attachments associated with the post + handle_post_attachments(row['post_id'], post, discourse_user_id) + + # Create URL mappings for the new topic + new_url = "https://new/t/#{topic.slug}/#{topic.id}" + insert_url_mapping(row['post_id'], new_url, unique_title) + + # Update the highest processed post_id in the database + puts "Updated highest processed post_id #{post_id}" + update_highest_processed_post_id(post_id) + + # Now fetch and import all replies to this topic + replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, post_likes FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC") + + # Import each reply sequentially + replies.each do |reply_row| + begin + # Fetch the discourse user ID for the reply + reply_user_id = fetch_user_id_mapping(reply_row['user_id_fk']) + + # Sanitize and prepare the reply message for Discourse + sanitized_reply_message = sanitize_post_message(reply_row['post_message']) + + puts "CREATE REPLY in topic_id #{topic.id}" + + # Increment the post count for the topic + post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1 + update_db_topic_post_numbers(topic.id, post_number) + + # Create the reply post in the existing topic + post = Post.create!( + topic_id: topic.id, + user_id: reply_user_id, + raw: sanitized_reply_message, + created_at: Time.at(reply_row['post_time']), + updated_at: Time.at(reply_row['post_time']), + like_count: reply_row['post_likes'] || 0, + post_number: post_number + ) + post.custom_fields['original_gossamer_id'] = reply_row['post_id'] + post.save! + + # Increment the post count for the topic and user + update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1) + update_db_user_post_count(reply_user_id, fetch_db_user_post_count(reply_user_id).to_i + 1) + + # Update last post time and user for the topic + if fetch_db_topic_last_post_time(topic.id).nil? || Time.at(reply_row['post_time']).to_i > fetch_db_topic_last_post_time(topic.id).to_i + update_db_topic_last_post_time(topic.id, Time.at(reply_row['post_time']).to_i) + update_db_topic_last_post_user(topic.id, reply_user_id) + end + + # Handle any attachments associated with the reply + handle_post_attachments(reply_row['post_id'], post, reply_user_id) + + # Update the highest processed post_id in the database + puts "Updated highest processed post_id #{reply_row['post_id']}" + update_highest_processed_post_id(reply_row['post_id']) + + rescue ActiveRecord::RecordInvalid => e + puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}" + end + end + + rescue ActiveRecord::RecordInvalid => e + puts "Error importing topic with post_id #{row['post_id']}: #{e.message}" + end + end end + #------------------------------------------------------------------- # Import topics and posts from Gossamer Forums to Discourse @@ -993,7 +1197,7 @@ class GossamerForumsImporter < ImportScripts::Base # Attachment example: highest_processed_post_id = 1359862 # Execute the query to get all posts ordered by post_id - execute_query("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_replies FROM gforum_Post ORDER BY post_id").each do |row| + execute_query("SELECT post_id, user_id_fk, forum_id_fk, post_root_id, post_subject, post_time, post_message, post_father_id, post_likes, post_replies FROM gforum_Post ORDER BY post_id").each do |row| post_id = row['post_id'].to_i # Skip posts that have already been processed @@ -1433,7 +1637,8 @@ class GossamerForumsImporter < ImportScripts::Base import_categories - import_topics_and_posts_with_attachments + # import_topics_and_posts_with_attachments + threaded_topic_import update_topic_stats update_user_stats export_url_mapping_to_csv("gossamer-migration-url-mapping#{timestamp}")