v0.42.1 Rewrite and simplication of concurrent-ruby support

This commit is contained in:
David Sainty 2024-08-17 22:23:25 +10:00
parent b44e9baf57
commit f90d0fb6ae

View File

@ -1,7 +1,7 @@
# Federated Computer, Inc.
# David Sainty <saint@federated.computer> 2024 A.D.
# Gossamer Threads to Discourse -- Migration-Import Script
# v0.42 Mutex addition for SQLite (which may be very important)
# v0.42.1 Rewrite and simplication of concurrent-ruby support
require 'mysql2'
require 'open-uri'
@ -1077,7 +1077,9 @@ class GossamerForumsImporter < ImportScripts::Base
parent_post_count = parent_post_ids.count
batch_size = 100 # Set our batch size for number of posts to import in a single batch
current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed.
#### current_post_batch = 0 # Set our current batch number. This tracks the current batch of posts being processed.
is_complete = false # Flag to indicate whether the import process is complete.
# Mutex to control access to shared resources
@ -1086,72 +1088,82 @@ class GossamerForumsImporter < ImportScripts::Base
# Run until all posts have been processed.
until is_complete
# Query in batches, create pool, wait for termination, do it again
current_post_batch_max = current_post_batch + batch_size
#### # Query in batches, create pool, wait for termination, do it again
#### current_post_batch_max = current_post_batch + batch_size
# Get the next batch of posts
current_post_batch = parent_post_ids.shift(batch_size)
break if current_post_batch.empty?
# Process each post in the current batch
current_post_batch.each do |post_id|
####### # Static pool size based on number of CPUs
# # pool = Concurrent::FixedThreadPool.new(Concurrent.processor_count) # Create a thread pool that is bounded by processors avaialable
# # pool = Concurrent::FixedThreadPool.new(8) # Create a thread pool of 8 pool members
# Dynamically calculate the pool size based on system load to optimise performance
pool_size = calculate_dynamic_pool_size # Dynamically calculate what the pool size "ought" to be.
pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size
#### # Dynamically calculate the pool size based on system load to optimise performance
#### pool_size = calculate_dynamic_pool_size # Dynamically calculate what the pool size "ought" to be.
#### pool = Concurrent::FixedThreadPool.new(pool_size) # Create a thread pool with the calculated size
# Process each post in the current batch
while current_post_batch < current_post_batch_max
post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post
#### # Process each post in the current batch
#### while current_post_batch < current_post_batch_max
#### post_id = parent_post_ids[current_post_batch] # Fetch the post_id for the current post
# Check if the post has already been processed or is incomplete
post_status = post_status(post_id)
if post_status.nil? || post_status == 0
puts "Starting import for post_id #{post_id} in batch #{current_post_batch / batch_size + 1} with #{pool_size} threads"
#### # Check if the post has already been processed or is incomplete
#### post_status = post_status(post_id)
# Submit the import job for the current post_id to the thread pool
pool.post do
# Submit the import job for the current post_id to the thread pool
pool.post do
puts "PP 11 -- #{post_id}"
# Initialise a new MariaDB / Mysql2 client inside of each thread
mysql_client = Mysql2::Client.new(
host: "slowtwitch.northend.network",
username: "admin",
password: "yxnh93Ybbz2Nm8#mp28zCVv",
database: "slowtwitch"
)
# Initialise a new MariaDB / Mysql2 client inside of each thread
mysql_client = Mysql2::Client.new(
host: "slowtwitch.northend.network",
username: "admin",
password: "yxnh93Ybbz2Nm8#mp28zCVv",
database: "slowtwitch"
)
puts "PP 22 -- ${post_id}"
begin
# Use connection pooling for PostgreSQL and synchronize access to shared resources
ActiveRecord::Base.connection_pool.with_connection do
### mutex.synchronize do
begin
puts "Processing post ID: #{post_id}"
topic_import_job(post_id, mysql_client, sqlite_mutex) # Import topic and its replies
sqlite_mutex.sychronize do
mark_post_as_complete(post_id) # Mark as complete in SQLite table
end
rescue => e
puts "Error processing post ID #{post_id}: #{e.message}"
sqlite_mutex.sychronize do
mark_post_as_failed(post_id)
end
ensure
# Ensure the MariaDB connection is closed after processing
mysql_client.close if mysql_client
post_status = post_status(post_id)
if post_status.nil? || post_status == 0
puts "Starting import for post_id #{post_id}"
topic_import_job(post_id, mysql_client, sqlite_mutex) # Import topic and its replies
sqlite_mutex.sychronize do
mark_post_as_complete(post_id) # Mark as complete in SQLite table
end
### end
else
puts "Skipping post_id #{post_id}, already processed."
end
end
rescue => e
puts "Error processing post ID #{post_id}: #{e.message}"
sqlite_mutex.sychronize do
mark_post_as_failed(post_id)
end
ensure
# Ensure the MariaDB connection is closed after processing
mysql_client.close if mysql_client
end
else
puts "Skipping post_id #{post_id}, already processed."
end
current_post_batch += 1 # Increment, moving to next post in the batch
break if current_post_batch >= parent_post_count
end
#### current_post_batch += 1 # Increment, moving to next post in the batch
#### break if current_post_batch >= parent_post_count
# Wait for all jobs in the current batch to finish before proceeding
pool.shutdown # Initiate thread pool shutdown after all jobs submitted
pool.wait_for_termination # Wait for all threads to finish exec
# Set when all posts have been processed
is_complete = true if current_post_batch >= parent_post_count
# Check if all posts have been processed
#### is_complete = true if current_post_batch >= parent_post_count
is_complete = parent_post_ids.empty?
end
end