Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(dot/sync): improve worker pool #4258

Draft
wants to merge 12 commits into
base: development
Choose a base branch
from

Conversation

haikoschol
Copy link
Contributor

@haikoschol haikoschol commented Oct 15, 2024

The main difference in the worker pool API is that SubmitBatch() does not block until the whole batch has been processed. Instead, it returns an ID which can be used to retrieve the current state of the batch. In addition, Results() returns a channel over which task results are sent as they become available.

The main improvement this brings is increased concurrency, since results can be processed before the whole batch has been completed.

What has not changed is the overall flow of the Strategy interface; getting a new batch of tasks with NextActions() and processing the results with Process().

Changes

  • replaced the code in dot/sync/worker_pool.go
  • adapted SyncService to the API changes of the new worker pool
  • adapted some expectations in tests regarding how often some mocks are called (hopefully without changing the logic being tested)

Tests

go test github.com/ChainSafe/gossamer/dot/sync

Issues

Closes #4198

@CLAassistant
Copy link

CLAassistant commented Oct 15, 2024

CLA assistant check
All committers have signed the CLA.

@haikoschol
Copy link
Contributor Author

haikoschol commented Oct 15, 2024

Created as a draft for two reasons:

  1. I'd like to run a sync from scratch on Westend and/or Paseo as a regression test for a while.
  2. To discuss and possibly address this TODO:
// SubmitBatch accepts a list of tasks and immediately returns a batch ID. The batch ID can be used to query the status
// of the batch using [GetBatchStatus].
// TODO
// If tasks are submitted faster than they are completed, resChan will run full, blocking the calling goroutine.
// Ideally this method would provide backpressure to the caller in that case. The rejected tasks should then stay in
// FullSyncStrategy.requestQueue until the next round. But this would need to be supported in all sync strategies.
func (w *workerPool) SubmitBatch(tasks []Task) (id BatchID, err error) {

@haikoschol haikoschol force-pushed the haiko/sync-worker-pool branch 5 times, most recently from 4c0d5cb to c875d08 Compare October 18, 2024 03:18
workerPool: NewWorkerPool(WorkerPoolConfig{
MaxRetries: 5,
// TODO: This should depend on the actual configuration of the currently used sync strategy.
Capacity: defaultNumOfTasks * 10,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why times 10?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an arbitrary value that "should be enough" while I was testing the branch on the Westend genesis instance. Apart from wanting to do this testing, this TODO and the other one about back pressure are the reasons why this is still a draft.
Do you think it makes sense to use the actual value for number of tasks configured in the sync strategy? And should we add some extra space here or not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok got it, we can move it to a constant to self explain the usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to try and make this depend on the configuration of the strategy. But if that's too invasive, I'll move it to a constant.

Copy link
Contributor Author

@haikoschol haikoschol Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done this in b6cf4d7. It required extending the Strategy interface and delaying initialization of the worker pool.

dot/sync/service.go Outdated Show resolved Hide resolved
dot/sync/service.go Outdated Show resolved Hide resolved
dot/sync/service.go Outdated Show resolved Hide resolved
},
})
}
task, ok := result.Task.(*syncTask)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we define result.Task over a generic? so we can skip this casting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered making this generic and decided against it, but I can't remember why. Will give it a try.

Copy link
Contributor Author

@haikoschol haikoschol Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried and the reason I didn't do this is because SyncService is supposed to maintain the worker pool and work with any kind of Strategy. This won't work if the worker pool is a generic type instantiated with *syncTask.

With the current implementation, only the strategies themselves know the type of the tasks processed by the worker pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, that makes me think that the strategies should be responsible for creating their own workerpool maybe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still in favor of having the service manage one pool that is used for all strategies. I think it's easier to manage in terms of concurrency and good separation of concern. Is it worth giving that up to avoid a few casts? The strategies definitely know what concrete type they use as sync tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree that the service should be responsible for the working pool, but I'm still trying to find a way to improve the types. We can keep the casts in the meanwhile

dot/sync/fullsync.go Outdated Show resolved Hide resolved
@haikoschol
Copy link
Contributor Author

haikoschol commented Oct 24, 2024

  1. To discuss and possibly address this TODO:

I've addressed the TODO in f4a9ccc. Not sure if this is the right approach, but I think it is better than always adding new tasks, risking a blocked goroutine.

@haikoschol haikoschol force-pushed the haiko/sync-worker-pool branch 3 times, most recently from 5b1f455 to 03f66ec Compare October 28, 2024 12:29
@haikoschol haikoschol force-pushed the haiko/sync-worker-pool branch 4 times, most recently from e10cc50 to b64d894 Compare November 6, 2024 10:24
The main difference in the worker pool API is that SubmitBatch() does
not block until the whole batch has been processed. Instead, it returns
an ID which can be used to retrieve the current state of the batch.
In addition, Results() returns a channel over which task results are
sent as they become available.

The main improvement this brings is increased concurrency, since results
can be processed before the whole batch has been completed.

What has not changed is the overall flow of the Strategy interface;
getting a new batch of tasks with NextActions() and processing the
results with Process().

Closes #4232
When the worker pool falls behind processing tasks, the service won't
ask the strategy for more tasks and instead directly runs Process()
again.
It doesn't make sense to give up since we need the blocks in that
request to progress.
@haikoschol haikoschol force-pushed the haiko/sync-worker-pool branch 6 times, most recently from 1b5b722 to 3502cd4 Compare November 7, 2024 15:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat(dot/sync): Improve worker pool
3 participants