v0.41 Further improve FULL concurrency support, for both MySQL/MariaDB _and_ importantly, the PostGreSQL Discourse DB additions and changes with ActiveRecord connection pooling and Mutex
This commit is contained in:
parent
168dcc9db7
commit
a005cda0ae
@ -1,7 +1,7 @@
|
|||||||
# Federated Computer, Inc.
|
# Federated Computer, Inc.
|
||||||
# David Sainty <saint@federated.computer> 2024 A.D.
|
# David Sainty <saint@federated.computer> 2024 A.D.
|
||||||
# Gossamer Threads to Discourse -- Migration-Import Script
|
# Gossamer Threads to Discourse -- Migration-Import Script
|
||||||
# v0.40 Move to per thread MySQL/MariaDB connection
|
# v0.41 Further improve FULL concurrency support, for both MySQL/MariaDB _and_ importantly, the PostGreSQL Discourse DB additions and changes with ActiveRecord connection pooling and Mutex
|
||||||
|
|
||||||
require 'mysql2'
|
require 'mysql2'
|
||||||
require 'open-uri'
|
require 'open-uri'
|
||||||
@ -1072,16 +1072,20 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed.
|
current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed.
|
||||||
is_complete = false # Flag to indicate whether the import process is complete.
|
is_complete = false # Flag to indicate whether the import process is complete.
|
||||||
|
|
||||||
|
# Mutex to control access to shared resources
|
||||||
|
mutex = Mutex.new
|
||||||
|
|
||||||
# Run until all posts have been processed.
|
# Run until all posts have been processed.
|
||||||
until is_complete
|
until is_complete
|
||||||
# Query in batches, create pool, wait for termination, do it again
|
# Query in batches, create pool, wait for termination, do it again
|
||||||
current_post_batch_max = current_post_batch + batch_size
|
current_post_batch_max = current_post_batch + batch_size
|
||||||
|
|
||||||
## Static pool size based on number of CPUs
|
# # Static pool size based on number of CPUs
|
||||||
# pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) #create thread pool that is bounded by processors avaialable, however play with the number to see what works best
|
# # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) # Create a thread pool that is bounded by processors avaialable
|
||||||
|
# # pool = Concurrent::FixedThreadPool.new(8) # Create a thread pool of 8 pool members
|
||||||
|
|
||||||
# Dynamically calculate the pool size based on system load to optimise performance
|
# Dynamically calculate the pool size based on system load to optimise performance
|
||||||
pool_size = calculate_dynamic_pool_size
|
pool_size = calculate_dynamic_pool_size # Dynamically calculate what the pool size "ought" to be.
|
||||||
pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size
|
pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size
|
||||||
|
|
||||||
# Process each post in the current batch
|
# Process each post in the current batch
|
||||||
@ -1102,19 +1106,26 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
password: "yxnh93Ybbz2Nm8#mp28zCVv",
|
password: "yxnh93Ybbz2Nm8#mp28zCVv",
|
||||||
database: "slowtwitch"
|
database: "slowtwitch"
|
||||||
)
|
)
|
||||||
begin
|
|
||||||
puts "Processing post ID: #{post_id}"
|
# Use connection ppoling for PostgreSQL and synchronize access to shared resources
|
||||||
topic_import_job(post_id, mysql_client) # Import topic and its replies
|
ActiveRecord::Base.connection_pool.with_connection do
|
||||||
mark_post_as_complete(post_id) # Mark as complete in SQLite table
|
mutex.synchronize do
|
||||||
rescue => e
|
begin
|
||||||
puts "Error processing post ID #{post_id}: #{e.message}"
|
puts "Processing post ID: #{post_id}"
|
||||||
mark_post_as_failed(post_id)
|
topic_import_job(post_id, mysql_client) # Import topic and its replies
|
||||||
ensure
|
mark_post_as_complete(post_id) # Mark as complete in SQLite table
|
||||||
mysql_client.close if mysql_client
|
rescue => e
|
||||||
|
puts "Error processing post ID #{post_id}: #{e.message}"
|
||||||
|
mark_post_as_failed(post_id)
|
||||||
|
ensure
|
||||||
|
# Ensure the MariaDB connection is closed after processing
|
||||||
|
mysql_client.close if mysql_client
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
else
|
else
|
||||||
puts "Skipping post_id #{post_id}, already processed."
|
puts "Skipping post_id #{post_id}, already processed."
|
||||||
end
|
end
|
||||||
|
|
||||||
current_post_batch += 1 # Increment, moving to next post in the batch
|
current_post_batch += 1 # Increment, moving to next post in the batch
|
||||||
@ -1172,134 +1183,140 @@ class GossamerForumsImporter < ImportScripts::Base
|
|||||||
|
|
||||||
# Check if the topic has already been imported using the custom field 'original_gossamer_id'
|
# Check if the topic has already been imported using the custom field 'original_gossamer_id'
|
||||||
unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id'])
|
unless TopicCustomField.exists?(name: 'original_gossamer_id', value: row['post_id'])
|
||||||
# Create the new topic in Discourse
|
ActiveRecord::Base.transaction do
|
||||||
begin
|
# Create the new topic in Discourse
|
||||||
suffix = 1
|
begin
|
||||||
topic_created = false
|
suffix = 1
|
||||||
|
topic_created = false
|
||||||
while !topic_created
|
|
||||||
begin
|
while !topic_created
|
||||||
puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}"
|
begin
|
||||||
topic = Topic.create!(
|
puts "CREATE TOPIC unique_title #{unique_title} title #{title} discourse_user_id #{discourse_user_id} category_id #{discourse_category_id}"
|
||||||
title: unique_title,
|
topic = Topic.create!(
|
||||||
user_id: discourse_user_id,
|
title: unique_title,
|
||||||
created_at: Time.at(row['post_time']),
|
user_id: discourse_user_id,
|
||||||
updated_at: Time.at(row['post_time']),
|
created_at: Time.at(row['post_time']),
|
||||||
category_id: discourse_category_id,
|
updated_at: Time.at(row['post_time']),
|
||||||
views: post_views || 0,
|
category_id: discourse_category_id,
|
||||||
posts_count: 0
|
views: post_views || 0,
|
||||||
)
|
posts_count: 0
|
||||||
topic.custom_fields['original_gossamer_id'] = row['post_id']
|
)
|
||||||
topic.save!
|
topic.custom_fields['original_gossamer_id'] = row['post_id']
|
||||||
|
topic.save!
|
||||||
topic_created = true
|
|
||||||
rescue ActiveRecord::RecordInvalid => e
|
topic_created = true
|
||||||
if e.message.include?("Title has already been used")
|
rescue ActiveRecord::RecordInvalid => e
|
||||||
unique_title = "#{title} (#{suffix})"
|
if e.message.include?("Title has already been used")
|
||||||
suffix += 1
|
unique_title = "#{title} (#{suffix})"
|
||||||
else
|
suffix += 1
|
||||||
raise e
|
else
|
||||||
|
raise e
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
# Update the database with the last post time and user for the topic
|
||||||
|
update_db_topic_last_post_time(topic.id, Time.at(row['post_time']).to_i)
|
||||||
|
update_db_topic_last_post_user(topic.id, discourse_user_id)
|
||||||
|
|
||||||
|
# Increment the topic count for the user
|
||||||
|
update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1)
|
||||||
|
|
||||||
|
# Sanitize and prepare the post message for Discourse
|
||||||
|
sanitized_post_message = sanitize_post_message(row['post_message'])
|
||||||
|
|
||||||
|
puts "CREATE POST topic.id #{topic.id} discourse_user_id #{discourse_user_id}"
|
||||||
|
|
||||||
|
# Increment the post count for the topic
|
||||||
|
post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1
|
||||||
|
update_db_topic_post_numbers(topic.id, post_number)
|
||||||
|
|
||||||
# Update the database with the last post time and user for the topic
|
# Create the initial post in the new topic
|
||||||
update_db_topic_last_post_time(topic.id, Time.at(row['post_time']).to_i)
|
post = Post.create!(
|
||||||
update_db_topic_last_post_user(topic.id, discourse_user_id)
|
topic_id: topic.id,
|
||||||
|
user_id: discourse_user_id,
|
||||||
# Increment the topic count for the user
|
raw: sanitized_post_message,
|
||||||
update_db_user_topic_count(discourse_user_id, fetch_db_user_topic_count(discourse_user_id).to_i + 1)
|
created_at: Time.at(row['post_time']),
|
||||||
|
updated_at: Time.at(row['post_time']),
|
||||||
# Sanitize and prepare the post message for Discourse
|
reads: post_views || 0,
|
||||||
sanitized_post_message = sanitize_post_message(row['post_message'])
|
post_number: post_number
|
||||||
|
)
|
||||||
puts "CREATE POST topic.id #{topic.id} discourse_user_id #{discourse_user_id}"
|
post.custom_fields['original_gossamer_id'] = row['post_id']
|
||||||
|
post.save!
|
||||||
# Increment the post count for the topic
|
|
||||||
post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1
|
# Increment the post count for the topic and user
|
||||||
update_db_topic_post_numbers(topic.id, post_number)
|
update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1)
|
||||||
|
update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1)
|
||||||
# Create the initial post in the new topic
|
|
||||||
post = Post.create!(
|
# Handle any attachments associated with the post
|
||||||
topic_id: topic.id,
|
handle_post_attachments(row['post_id'], post, discourse_user_id)
|
||||||
user_id: discourse_user_id,
|
|
||||||
raw: sanitized_post_message,
|
# Create URL mappings for the new topic
|
||||||
created_at: Time.at(row['post_time']),
|
new_url = "https://new/t/#{topic.slug}/#{topic.id}"
|
||||||
updated_at: Time.at(row['post_time']),
|
insert_url_mapping(row['post_id'], new_url, unique_title)
|
||||||
reads: post_views || 0,
|
|
||||||
post_number: post_number
|
# Fetch and import all replies to this topic
|
||||||
)
|
replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC")
|
||||||
post.custom_fields['original_gossamer_id'] = row['post_id']
|
|
||||||
post.save!
|
# Import each reply sequentially
|
||||||
|
replies.each do |reply_row|
|
||||||
# Increment the post count for the topic and user
|
begin
|
||||||
update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1)
|
# Fetch the discourse user ID for the reply
|
||||||
update_db_user_post_count(discourse_user_id, fetch_db_user_post_count(discourse_user_id).to_i + 1)
|
reply_user_id = fetch_user_id_mapping(reply_row['user_id_fk'])
|
||||||
|
|
||||||
# Handle any attachments associated with the post
|
# Sanitize and prepare the reply message for Discourse
|
||||||
handle_post_attachments(row['post_id'], post, discourse_user_id)
|
sanitized_reply_message = sanitize_post_message(reply_row['post_message'])
|
||||||
|
|
||||||
# Create URL mappings for the new topic
|
puts "CREATE REPLY in topic_id #{topic.id}"
|
||||||
new_url = "https://new/t/#{topic.slug}/#{topic.id}"
|
|
||||||
insert_url_mapping(row['post_id'], new_url, unique_title)
|
# Increment the post count for the topic
|
||||||
|
post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1
|
||||||
# Now fetch and import all replies to this topic
|
update_db_topic_post_numbers(topic.id, post_number)
|
||||||
replies = execute_query("SELECT post_id, user_id_fk, post_message, post_time, FROM gforum_Post WHERE post_root_id = #{post_id} ORDER BY post_time ASC")
|
|
||||||
|
# Create the reply post in the existing topic
|
||||||
# Import each reply sequentially
|
post = Post.create!(
|
||||||
replies.each do |reply_row|
|
topic_id: topic.id,
|
||||||
begin
|
user_id: reply_user_id,
|
||||||
# Fetch the discourse user ID for the reply
|
raw: sanitized_reply_message,
|
||||||
reply_user_id = fetch_user_id_mapping(reply_row['user_id_fk'])
|
created_at: Time.at(reply_row['post_time']),
|
||||||
|
updated_at: Time.at(reply_row['post_time']),
|
||||||
# Sanitize and prepare the reply message for Discourse
|
post_number: post_number
|
||||||
sanitized_reply_message = sanitize_post_message(reply_row['post_message'])
|
)
|
||||||
|
post.custom_fields['original_gossamer_id'] = reply_row['post_id']
|
||||||
puts "CREATE REPLY in topic_id #{topic.id}"
|
post.save!
|
||||||
|
|
||||||
# Increment the post count for the topic
|
# Increment the post count for the topic and user
|
||||||
post_number = fetch_db_topic_post_numbers(topic.id).to_i + 1
|
update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1)
|
||||||
update_db_topic_post_numbers(topic.id, post_number)
|
update_db_user_post_count(reply_user_id, fetch_db_user_post_count(reply_user_id).to_i + 1)
|
||||||
|
|
||||||
# Create the reply post in the existing topic
|
# Update last post time and user for the topic
|
||||||
post = Post.create!(
|
if fetch_db_topic_last_post_time(topic.id).nil? || Time.at(reply_row['post_time']).to_i > fetch_db_topic_last_post_time(topic.id).to_i
|
||||||
topic_id: topic.id,
|
update_db_topic_last_post_time(topic.id, Time.at(reply_row['post_time']).to_i)
|
||||||
user_id: reply_user_id,
|
update_db_topic_last_post_user(topic.id, reply_user_id)
|
||||||
raw: sanitized_reply_message,
|
end
|
||||||
created_at: Time.at(reply_row['post_time']),
|
|
||||||
updated_at: Time.at(reply_row['post_time']),
|
# Handle any attachments associated with the reply
|
||||||
post_number: post_number
|
handle_post_attachments(reply_row['post_id'], post, reply_user_id)
|
||||||
)
|
|
||||||
post.custom_fields['original_gossamer_id'] = reply_row['post_id']
|
# # Update the highest processed post_id in the database (thread-safe)
|
||||||
post.save!
|
# update_highest_processed_post_id_thread_safe(reply_row['post_id'])
|
||||||
|
|
||||||
# Increment the post count for the topic and user
|
# rescue ActiveRecord::RecordInvalid => e
|
||||||
update_db_topic_post_count(topic.id, fetch_db_topic_post_count(topic.id).to_i + 1)
|
rescue => e
|
||||||
update_db_user_post_count(reply_user_id, fetch_db_user_post_count(reply_user_id).to_i + 1)
|
puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}"
|
||||||
|
|
||||||
# Update last post time and user for the topic
|
|
||||||
if fetch_db_topic_last_post_time(topic.id).nil? || Time.at(reply_row['post_time']).to_i > fetch_db_topic_last_post_time(topic.id).to_i
|
|
||||||
update_db_topic_last_post_time(topic.id, Time.at(reply_row['post_time']).to_i)
|
|
||||||
update_db_topic_last_post_user(topic.id, reply_user_id)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Handle any attachments associated with the reply
|
|
||||||
handle_post_attachments(reply_row['post_id'], post, reply_user_id)
|
|
||||||
|
|
||||||
# # Update the highest processed post_id in the database (thread-safe)
|
|
||||||
# update_highest_processed_post_id_thread_safe(reply_row['post_id'])
|
|
||||||
|
|
||||||
rescue ActiveRecord::RecordInvalid => e
|
|
||||||
puts "Error importing reply with post_id #{reply_row['post_id']}: #{e.message}"
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe)
|
||||||
|
# update_highest_processed_post_id_thread_safe(post_id)
|
||||||
|
|
||||||
|
rescue ActiveRecord::RecordInvalid => e
|
||||||
|
puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
|
||||||
|
raise ActiveRecord::Rollback
|
||||||
end
|
end
|
||||||
|
|
||||||
# # After processing the entire topic, update the highest_processed_post_id to the current topic's post_id (thread-safe)
|
|
||||||
# update_highest_processed_post_id_thread_safe(post_id)
|
|
||||||
|
|
||||||
rescue ActiveRecord::RecordInvalid => e
|
|
||||||
puts "Error importing topic with post_id #{row['post_id']}: #{e.message}"
|
|
||||||
end
|
end
|
||||||
|
else
|
||||||
|
puts "Topic for post_id #{row['post_id']} already exists, skipping creation."
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user