线程池

自定义线程池

组件:线程池,阻塞队列。

阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package cn.xiaohupao.threadpool;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* @Author: xiaohupao
* @Date: 2021/8/10 19:51
*/
public class BlockingQueue<T>{
//任务队列
private Deque<T> queue = new ArrayDeque<>();

//锁
private ReentrantLock lock = new ReentrantLock();

//生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//阻塞对了的容量上限
private int capacity;

public BlockingQueue(int capacity){
this.capacity = capacity;
}

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
if (nanos <= 0){
return null;
}
//返回的是剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
}catch (InterruptedException e){
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 阻塞获取
* @return 获取任务
*/
public T take(){
//获取是需要加锁
lock.lock();
try {
//当队列中没有元素时,需要阻塞住
while (queue.isEmpty()){
emptyWaitSet.await();
}
//唤醒生产者阻塞
fullWaitSet.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return queue.removeFirst();
}

//阻塞添加
public void put(T element){
lock.lock();
try {
//队列是否满了
while (queue.size() == capacity){
fullWaitSet.await();
}
//添加元素
queue.add(element);
//唤醒消费者阻塞状态
emptyWaitSet.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

/**
* 获取队列中元素个数
* @return 元素个数
*/
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}

线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package cn.xiaohupao.threadpool;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* @Author: xiaohupao
* @Date: 2021/8/10 19:51
*/
public class BlockingQueue<T>{
//任务队列
private Deque<T> queue = new ArrayDeque<>();

//锁
private ReentrantLock lock = new ReentrantLock();

//生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//阻塞对了的容量上限
private int capacity;

public BlockingQueue(int capacity){
this.capacity = capacity;
}

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
if (nanos <= 0){
return null;
}
//返回的是剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
}catch (InterruptedException e){
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 阻塞获取
* @return 获取任务
*/
public T take(){
//获取是需要加锁
lock.lock();
try {
//当队列中没有元素时,需要阻塞住
while (queue.isEmpty()){
emptyWaitSet.await();
}
//唤醒生产者阻塞
fullWaitSet.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return queue.removeFirst();
}

//阻塞添加
public void put(T element){
lock.lock();
try {
//队列是否满了
while (queue.size() == capacity){
fullWaitSet.await();
}
//添加元素
queue.add(element);
//唤醒消费者阻塞状态
emptyWaitSet.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

/**
* 获取队列中元素个数
* @return 元素个数
*/
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.xiaohupao.threadpool;

import java.util.concurrent.TimeUnit;

/**
* @Author: xiaohupao
* @Date: 2021/8/10 20:51
*/
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);

for (int i = 0; i < 5; i++){
int j = i;
threadPool.execute(() -> {
System.out.println(j);
});
}
}
}

拒绝策略线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package cn.xiaohupao.threadpool;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* @Author: xiaohupao
* @Date: 2021/8/10 19:51
*/
public class BlockingQueue<T>{
//任务队列
private Deque<T> queue = new ArrayDeque<>();

//锁
private ReentrantLock lock = new ReentrantLock();

//生产者条件变量
private Condition fullWaitSet = lock.newCondition();

//消费者条件变量
private Condition emptyWaitSet = lock.newCondition();

//阻塞对了的容量上限
private int capacity;

public BlockingQueue(int capacity){
this.capacity = capacity;
}

//带超时的阻塞获取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()){
try {
if (nanos <= 0){
return null;
}
//返回的是剩余时间
nanos = emptyWaitSet.awaitNanos(nanos);
}catch (InterruptedException e){
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 阻塞获取
* @return 获取任务
*/
public T take(){
//获取是需要加锁
lock.lock();
try {
//当队列中没有元素时,需要阻塞住
while (queue.isEmpty()){
emptyWaitSet.await();
}
//唤醒生产者阻塞
fullWaitSet.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return queue.removeFirst();
}

//阻塞添加
public void put(T element){
lock.lock();
try {
//队列是否满了
while (queue.size() == capacity){
fullWaitSet.await();
}
//添加元素
queue.add(element);
//唤醒消费者阻塞状态
emptyWaitSet.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

//带超时时间的阻塞添加
public boolean offer(T task, Long timeout, TimeUnit timeUnit){
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity){
try {
if (nanos <= 0){
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
}catch (InterruptedException e){
e.printStackTrace();
}
}
queue.add(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}

/**
* 获取队列中元素个数
* @return 元素个数
*/
public int size(){
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task){
lock.lock();
try {
//判断队列是否为满
if (queue.size() == capacity){
rejectPolicy.reject(this, task);
}else{
//空闲
queue.add(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package cn.xiaohupao.threadpool;

import java.util.HashSet;
import java.util.concurrent.TimeUnit;

/**
* @Author: xiaohupao
* @Date: 2021/8/10 20:26
*/
public class ThreadPool {

//任务集合
private BlockingQueue<Runnable> taskQueue;

//线程集合
private HashSet<Worker> workers = new HashSet<>();


//核心线程数
private int coreSize;

//获取任务的超时时间
private long timeout;

private TimeUnit timeUnit;

private RejectPolicy<Runnable> rejectPolicy;

public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int capacity, RejectPolicy<Runnable> rejectPolicy) {
this.taskQueue = new BlockingQueue<Runnable>(capacity);
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.rejectPolicy = rejectPolicy;
}

class Worker extends Thread{

private Runnable task;

public Worker(Runnable task){
this.task = task;
}

@Override
public void run(){
//当task不为空,执行任务;当task执行完毕,接着从任务队列来获取任务并执行
while (task != null || (task = taskQueue.take()) != null){
try {
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers){
workers.remove(this);
}
}
}

/**
* 执行任务
* @param task 任务
*/
public void execute(Runnable task){
//当任务数没有超过coreSize时,直接交给worker对象执行
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
//如果任务数超过coreSize时,加入任务队列
//taskQueue.put(task);
//死等
//带超时时间的等待
//放弃任务的执行
//抛出异常
//调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);
}
}
}
}
1
2
3
4
5
6
7
8
9
10
package cn.xiaohupao.threadpool;

/**
* @Author: xiaohupao
* @Date: 2021/8/10 21:58
*/
@FunctionalInterface
public interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package cn.xiaohupao.threadpool;

import java.util.concurrent.TimeUnit;

/**
* @Author: xiaohupao
* @Date: 2021/8/10 20:51
*/
public class TestPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10, (queue, task) -> queue.put(task));

for (int i = 0; i < 5; i++){
int j = i;
threadPool.execute(() -> {
System.out.println(j);
});
}
}
}
Donate comment here