From 60a8597b152c4ef5f23bf3a20c33b4f8f053f5a3 Mon Sep 17 00:00:00 2001 From: Ross Trottier Date: Sat, 8 Jun 2024 15:48:39 -0600 Subject: [PATCH] deadlock after finishing post migration fixed (needs test) --- main.go | 8 ++-- services/migration/migrate-posts.go | 65 ++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 19 deletions(-) diff --git a/main.go b/main.go index 2012da3..5036457 100644 --- a/main.go +++ b/main.go @@ -7,15 +7,15 @@ import ( ) // WP Config -const baseUrl = "http://go-api-playground.local/" -const wordpressKey = "admin" -const wordpressSecret = "JXDP 3EgE h1RF jhln b8F5 A1Xk" +const baseUrl = "https://slowtwitch.cloud/" +const wordpressKey = "admin@slowtwitch.cloud" +const wordpressSecret = "hGZ0 GWlR TuUN CVyk SXh6 05uP" // DB Config const slowtwitchAdminUser = "admin" const slowtwitchAdminPass = "yxnh93Ybbz2Nm8#mp28zCVv" const slowtwitchDbName = "slowtwitch" -const migrationDbName = "slowtwitch_transfer_threaded_test" +const migrationDbName = "slowtwitch_transfer" const federatedDbUrl = "slowtwitch.northend.network" const federatedDbPort = "3306" diff --git a/services/migration/migrate-posts.go b/services/migration/migrate-posts.go index f05eb52..be8e247 100644 --- a/services/migration/migrate-posts.go +++ b/services/migration/migrate-posts.go @@ -41,16 +41,25 @@ func (migration MigratePosts) Execute() []PostResult { batchSize := 5 workInputChannel := make(chan int) workOutputChannel := make(chan PostResult) + doneChannel := make(chan bool) + defer close(doneChannel) var wg sync.WaitGroup - wg.Add(1) wg.Add(len(slowtwitchPostIdsForMigration)) for i := 0; i < batchSize; i++ { go func() { - for postId := range workInputChannel { - result := createPost(postId, migration, wpTagData) - workOutputChannel <- result - wg.Done() + for { + select { + case <-doneChannel: + return + case postId, ok := <-workInputChannel: + if ok { + result := createPost(postId, migration, wpTagData) + workOutputChannel <- result + } else { + return + } + } } }() } @@ -59,20 +68,34 @@ func (migration MigratePosts) Execute() []PostResult { defer close(workInputChannel) for _, postId := range slowtwitchPostIdsForMigration { - workInputChannel <- postId + select { + case <-doneChannel: + return + case workInputChannel <- postId: + } } - - wg.Done() }() go func() { - for postResult := range workOutputChannel { - postResults = append(postResults, postResult) + defer close(workOutputChannel) + + for { + select { + case <-doneChannel: + return + case postResult, ok := <-workOutputChannel: + if ok { + postResults = append(postResults, postResult) + wg.Done() + } else { + return + } + } } }() wg.Wait() - close(workOutputChannel) + doneChannel <- true updatePostRelationships(postResults, migration) @@ -292,6 +315,8 @@ func updatePostRelationships(postResults []PostResult, migration MigratePosts) { fmt.Println("Updating post relationships") batchSize := 5 postResultWorkInput := make(chan PostResult) + doneChannel := make(chan bool) + defer close(doneChannel) //Set up a wait group to wait for the input channel to close after execution var wg sync.WaitGroup wg.Add(1) @@ -301,9 +326,14 @@ func updatePostRelationships(postResults []PostResult, migration MigratePosts) { //maxed out at the batch size for i := 0; i < batchSize; i++ { go func() { - for result := range postResultWorkInput { - updatePostRelationship(result, migration) - wg.Done() + for { + select { + case <-doneChannel: + return + case result := <-postResultWorkInput: + updatePostRelationship(result, migration) + wg.Done() + } } }() } @@ -315,13 +345,18 @@ func updatePostRelationships(postResults []PostResult, migration MigratePosts) { defer close(postResultWorkInput) for _, result := range postResults { - postResultWorkInput <- result + select { + case <-doneChannel: + return + case postResultWorkInput <- result: + } } wg.Done() }() wg.Wait() + doneChannel <- true fmt.Println("Done with post relationships") }