deadlock after finishing post migration fixed (needs test)

This commit is contained in:
Ross Trottier 2024-06-08 15:48:39 -06:00
parent 7f73716b9a
commit 60a8597b15
2 changed files with 54 additions and 19 deletions

View File

@ -7,15 +7,15 @@ import (
)
// WP Config
const baseUrl = "http://go-api-playground.local/"
const wordpressKey = "admin"
const wordpressSecret = "JXDP 3EgE h1RF jhln b8F5 A1Xk"
const baseUrl = "https://slowtwitch.cloud/"
const wordpressKey = "admin@slowtwitch.cloud"
const wordpressSecret = "hGZ0 GWlR TuUN CVyk SXh6 05uP"
// DB Config
const slowtwitchAdminUser = "admin"
const slowtwitchAdminPass = "yxnh93Ybbz2Nm8#mp28zCVv"
const slowtwitchDbName = "slowtwitch"
const migrationDbName = "slowtwitch_transfer_threaded_test"
const migrationDbName = "slowtwitch_transfer"
const federatedDbUrl = "slowtwitch.northend.network"
const federatedDbPort = "3306"

View File

@ -41,16 +41,25 @@ func (migration MigratePosts) Execute() []PostResult {
batchSize := 5
workInputChannel := make(chan int)
workOutputChannel := make(chan PostResult)
doneChannel := make(chan bool)
defer close(doneChannel)
var wg sync.WaitGroup
wg.Add(1)
wg.Add(len(slowtwitchPostIdsForMigration))
for i := 0; i < batchSize; i++ {
go func() {
for postId := range workInputChannel {
result := createPost(postId, migration, wpTagData)
workOutputChannel <- result
wg.Done()
for {
select {
case <-doneChannel:
return
case postId, ok := <-workInputChannel:
if ok {
result := createPost(postId, migration, wpTagData)
workOutputChannel <- result
} else {
return
}
}
}
}()
}
@ -59,20 +68,34 @@ func (migration MigratePosts) Execute() []PostResult {
defer close(workInputChannel)
for _, postId := range slowtwitchPostIdsForMigration {
workInputChannel <- postId
select {
case <-doneChannel:
return
case workInputChannel <- postId:
}
}
wg.Done()
}()
go func() {
for postResult := range workOutputChannel {
postResults = append(postResults, postResult)
defer close(workOutputChannel)
for {
select {
case <-doneChannel:
return
case postResult, ok := <-workOutputChannel:
if ok {
postResults = append(postResults, postResult)
wg.Done()
} else {
return
}
}
}
}()
wg.Wait()
close(workOutputChannel)
doneChannel <- true
updatePostRelationships(postResults, migration)
@ -292,6 +315,8 @@ func updatePostRelationships(postResults []PostResult, migration MigratePosts) {
fmt.Println("Updating post relationships")
batchSize := 5
postResultWorkInput := make(chan PostResult)
doneChannel := make(chan bool)
defer close(doneChannel)
//Set up a wait group to wait for the input channel to close after execution
var wg sync.WaitGroup
wg.Add(1)
@ -301,9 +326,14 @@ func updatePostRelationships(postResults []PostResult, migration MigratePosts) {
//maxed out at the batch size
for i := 0; i < batchSize; i++ {
go func() {
for result := range postResultWorkInput {
updatePostRelationship(result, migration)
wg.Done()
for {
select {
case <-doneChannel:
return
case result := <-postResultWorkInput:
updatePostRelationship(result, migration)
wg.Done()
}
}
}()
}
@ -315,13 +345,18 @@ func updatePostRelationships(postResults []PostResult, migration MigratePosts) {
defer close(postResultWorkInput)
for _, result := range postResults {
postResultWorkInput <- result
select {
case <-doneChannel:
return
case postResultWorkInput <- result:
}
}
wg.Done()
}()
wg.Wait()
doneChannel <- true
fmt.Println("Done with post relationships")
}