Skip to content

Commit ee3a100

Browse files
authored
Merge pull request #15 from devchat-ai/test
Add configurable task queue size to GoPool
2 parents 29eb3b0 + bf1ff45 commit ee3a100

File tree

5 files changed

+91
-3
lines changed

5 files changed

+91
-3
lines changed

README.md

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ ok github.com/devchat-ai/gopool 3.946s
8181
<img src="./logo/gopool.png" width="750">
8282
</div>
8383

84-
- [x] **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue.
84+
- [x] **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue. The size of the task queue can be configured.
8585

8686
- [x] **Concurrency Control**: GoPool can control the number of concurrent tasks to prevent system overload.
8787

@@ -163,6 +163,35 @@ func main() {
163163
}
164164
```
165165

166+
## Configurable Task Queue Size
167+
168+
GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue. The size of the task queue can be configured when creating the pool using the `WithTaskQueueSize` option.
169+
170+
Here is an example of how to use GoPool with a configurable task queue size:
171+
172+
```go
173+
package main
174+
175+
import (
176+
"time"
177+
178+
"github.com/devchat-ai/gopool"
179+
)
180+
181+
func main() {
182+
pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(5000))
183+
defer pool.Release()
184+
185+
for i := 0; i < 1000; i++ {
186+
pool.AddTask(func() (interface{}, error){
187+
time.Sleep(10 * time.Millisecond)
188+
return nil, nil
189+
})
190+
}
191+
pool.Wait()
192+
}
193+
```
194+
166195
## Dynamic Worker Adjustment
167196

168197
GoPool supports dynamic worker adjustment. This means that the number of workers in the pool can increase or decrease based on the number of tasks in the queue. This feature can be enabled by setting the MinWorkers option when creating the pool.

README_zh.md

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ ok github.com/devchat-ai/gopool 3.946s
8181
<img src="./logo/gopool.png" width="750">
8282
</div>
8383

84-
- [x] **任务队列**:GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。
84+
- [x] **任务队列**:GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。任务队列的大小可配置。
8585

8686
- [x] **并发控制**:GoPool 可以控制并发任务的数量,防止系统过载。
8787

@@ -163,6 +163,35 @@ func main() {
163163
}
164164
```
165165

166+
## 配置任务队列大小
167+
168+
GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。任务队列的大小可配置。可以通过在创建池时设置 `WithQueueSize` 选项来配置任务队列的大小。
169+
170+
这是一个如何配置 GoPool 任务队列大小的示例:
171+
172+
```go
173+
package main
174+
175+
import (
176+
"time"
177+
178+
"github.com/devchat-ai/gopool"
179+
)
180+
181+
func main() {
182+
pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(5000))
183+
defer pool.Release()
184+
185+
for i := 0; i < 1000; i++ {
186+
pool.AddTask(func() (interface{}, error){
187+
time.Sleep(10 * time.Millisecond)
188+
return nil, nil
189+
})
190+
}
191+
pool.Wait()
192+
}
193+
```
194+
166195
## 动态工作器调整
167196

168197
GoPool 支持动态工作器调整。这意味着池中的工作器数量可以根据队列中的任务数量增加或减少。可以通过在创建池时设置 MinWorkers 选项来启用此功能。

gopool.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type GoPool interface {
1818
Running() int
1919
// GetWorkerCount returns the number of workers.
2020
GetWorkerCount() int
21+
// GetTaskQueueSize returns the size of the task queue.
22+
GetTaskQueueSize() int
2123
}
2224

2325
// task represents a function that will be executed by a worker.
@@ -33,6 +35,8 @@ type goPool struct {
3335
minWorkers int
3436
// tasks are added to this channel first, then dispatched to workers. Default buffer size is 1 million.
3537
taskQueue chan task
38+
// Set by WithTaskQueueSize(), used to set the size of the task queue. Default is 1e6.
39+
taskQueueSize int
3640
// Set by WithRetryCount(), used to retry a task when it fails. Default is 0.
3741
retryCount int
3842
lock sync.Locker
@@ -60,7 +64,8 @@ func NewGoPool(maxWorkers int, opts ...Option) GoPool {
6064
// workers and workerStack should be initialized after WithMinWorkers() is called
6165
workers: nil,
6266
workerStack: nil,
63-
taskQueue: make(chan task, 1e6),
67+
taskQueue: nil,
68+
taskQueueSize: 1e6,
6469
retryCount: 0,
6570
lock: new(sync.Mutex),
6671
timeout: 0,
@@ -73,6 +78,7 @@ func NewGoPool(maxWorkers int, opts ...Option) GoPool {
7378
opt(pool)
7479
}
7580

81+
pool.taskQueue = make(chan task, pool.taskQueueSize)
7682
pool.workers = make([]*worker, pool.minWorkers)
7783
pool.workerStack = make([]int, pool.minWorkers)
7884

@@ -213,3 +219,10 @@ func (p *goPool) GetWorkerCount() int {
213219
defer p.lock.Unlock()
214220
return len(p.workers)
215221
}
222+
223+
// GetTaskQueueSize returns the size of the task queue.
224+
func (p *goPool) GetTaskQueueSize() int {
225+
p.lock.Lock()
226+
defer p.lock.Unlock()
227+
return p.taskQueueSize
228+
}

gopool_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,14 @@ var _ = Describe("Gopool", func() {
129129
Expect(pool.GetWorkerCount()).To(Equal(minWorkers))
130130
})
131131
})
132+
133+
Describe("With TaskQueueSize", func() {
134+
It("should work correctly", func() {
135+
size := 5000
136+
pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(size))
137+
defer pool.Release()
138+
139+
Expect(pool.GetTaskQueueSize()).To(Equal(size))
140+
})
141+
})
132142
})

option.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,10 @@ func WithRetryCount(retryCount int) Option {
5050
p.retryCount = retryCount
5151
}
5252
}
53+
54+
// WithTaskQueueSize sets the size of the task queue for the pool.
55+
func WithTaskQueueSize(size int) Option {
56+
return func(p *goPool) {
57+
p.taskQueueSize = size
58+
}
59+
}

0 commit comments

Comments
 (0)