- 程序 - Program
- 进程 - Process
- 线程 - Thread
- 继承Thread类,重写run()方法
- 实现Runnable接口,重写run()方法
- Lambda 表达式
package com.thread;
* 创建线程的三种基本方式
public class ThreadBasic {
* 第一种方式:继承Thread类,重写run()方法;
* 调用方式:new MyThread().start();
public static class MyThread extends Thread {
public void run() {
System.out.println("This is MyThread");
* 第二种方式:实现Runnable接口,重写run()方法;
* 调用方式:new Thread(new MyRunnable()).start();
public static class MyRunnable implements Runnable {
public void run() {
System.out.println("This is MyRunnable");
public static void main(String[] args) {
new MyThread().start();
new Thread(new MyRunnable()).start();
* 第三种方式:Lambda 表达式
new Thread(() -> {
System.out.println("This is Lambda");
Callable 与 Future
与 Runnable 功能类似,但是多一个返回值,可以通过 Future 来获取返回值。
- Runnable
public interface Runnable {
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
* @see java.lang.Thread#run()
public abstract void run();
- Callable
public interface Callable<V> {
* Computes a result, or throws an exception if unable to do so.
* @return computed result
* @throws Exception if unable to compute a result
V call() throws Exception;
- Future
public interface Future<V> {
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
boolean cancel(boolean mayInterruptIfRunning);
* Returns {@code true} if this task was cancelled before it completed
* normally.
* @return {@code true} if this task was cancelled before it completed
boolean isCancelled();
* Returns {@code true} if this task completed.
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
* @return {@code true} if this task completed
boolean isDone();
* Waits if necessary for the computation to complete, and then
* retrieves its result.
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
V get() throws InterruptedException, ExecutionException;
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
关于 Callable 与 Future 的案例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<String> c = new Callable() {
public String call() throws Exception {
return "Hello Callable";
ExecutorService service = Executors.newCachedThreadPool();
Future<String> future = service.submit(c); // 异步
System.out.println(future.get());// 阻塞
sleep & yield & join & wait & notify
// 当前线程休眠500毫秒,休眠结束进入就绪状态
// 当前线程让出CPU,进入就绪状态
// 当前有t1,t2两个线程同时运行,在t1线程里面调用t2.join()表明进入t2线程去运行,当t2运行完后t1再运行
synchronized (this) {
// 1. 配合synchronized使用 2. 会释放锁
notify / notifyAll
synchronized (this) {
// 1. 配合synchronized使用 2. 不会释放锁
// 这里synchronized的是当前对象this
public synchronized void m() {
// 这里synchronized的是当前T.class
public synchronized static void m() {
synchronized 是可重入锁,针对同一线程,同一对象,也是重量级锁。
重入锁实现可重入性原理或机制是:每一个锁关联一个线程持有者和计数器,当计数器为 0 时表示该锁没有被任何线程持有,那么任何线程都可能获得该锁而调用相应的方法;当某一线程请求成功后,JVM会记下锁的持有线程,并且将计数器置为 1;此时其它线程请求该锁,则必须等待;而该持有锁的线程如果再次请求这个锁,就可以再次拿到这个锁,同时计数器会递增;当线程退出同步代码块时,计数器会递减,如果计数器为 0,则释放该锁。
[无锁] -> [偏向锁(markword 记录这个线程ID)] -竞争-> [自旋锁] -10次以后-> [重量级锁]
线程数少 - 自旋锁
线程数多 - 重量级锁
操作消耗时间长 - 重量级锁
底层使用 MESI 缓存一致性协议
Compare And Set 自旋 乐观锁
AtomicXXX 底层由CAS实现
cas(V, Expected, NewValue)
- V 被修改的对象(基础类型没有ABA问题)
- Expected 期望值
- NewValue 新的值
例:cas(o, 1, 2); 当整型o的值是期望值1时,将1改为2
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
volatile int state + 监控state的CLH队列(双向链表-FIFO),每个node装的是线程
- State
* The synchronization state.
private volatile int state;
- Node
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
static final int PROPAGATE = -3;
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
volatile int waitStatus;
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
volatile Node prev;
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
volatile Node next;
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
volatile Thread thread;
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
Node nextWaiter;
* Returns true if node is waiting in shared mode.
final boolean isShared() {
return nextWaiter == SHARED;
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
* @return the predecessor of this node
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
return p;
Node() { // Used to establish initial head or SHARED marker
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
- Exclusive
- Share
共享,多个线程可同时执行,如Semaphore / CountDownLatch
- tryAcquire(int)
- tryRelease(int)
- tryAcquireShared(int)
- tryReleaseShared(int)
- isHeldExclusively()
部分摘自 Java并发之AQS详解
- 可重入锁
- 底层是CAS
- 可以tryLock()
- 可以有公平锁(排队)
- 可以替代sync,但是需要解锁,写在finally
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class T02_ReentrantLock2 {
Lock lock = new ReentrantLock();
void m1() {
try {
lock.lock(); //synchronized(this)
for (int i = 0; i < 10; i++) {
} catch (InterruptedException e) {
} finally {
void m2() {
try {
System.out.println("m2 ...");
} finally {
public static void main(String[] args) {
T02_ReentrantLock2 rl = new T02_ReentrantLock2();
new Thread(rl::m1).start();
try {
} catch (InterruptedException e) {
new Thread(rl::m2).start();
public class T06_TestCountDownLatch {
public static void main(String[] args) {
private static void usingCountDownLatch() {
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
for (int i = 0; i < threads.length; i++) {
try {
} catch (InterruptedException e) {
System.out.println("end latch");
private static void usingJoin() {
Thread[] threads = new Thread[100];
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
for (int i = 0; i < threads.length; i++) {
for (int i = 0; i < threads.length; i++) {
try {
} catch (InterruptedException e) {
System.out.println("end join");
public class T07_TestCyclicBarrier {
public static void main(String[] args) {
//CyclicBarrier barrier = new CyclicBarrier(20);
CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println(""));
/*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {
public void run() {
for(int i=0; i<100; i++) {
new Thread(()->{
try {
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
public class T08_TestPhaser {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();
static void milliSleep(int milli) {
try {
} catch (InterruptedException e) {
public static void main(String[] args) {
for(int i=0; i<5; i++) {
final int nameIndex = i;
new Thread(()->{
Person p = new Person("person " + nameIndex);
static class MarriagePhaser extends Phaser {
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
return false;
case 1:
return false;
case 2:
return true;
return true;
static class Person {
String name;
public Person(String name) {
this.name = name;
public void arrive() {
System.out.printf("%s 到达现场!\n", name);
public void eat() {
System.out.printf("%s 吃完!\n", name);
public void leave() {
System.out.printf("%s 离开!\n", name);
public class T10_TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
try {
System.out.println("read over!");
} catch (InterruptedException e) {
} finally {
public static void write(Lock lock, int v) {
try {
value = v;
System.out.println("write over!");
} catch (InterruptedException e) {
} finally {
public static void main(String[] args) {
//Runnable readR = ()-> read(lock);
Runnable readR = ()-> read(readLock);
//Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = ()->write(writeLock, new Random().nextInt());
for(int i=0; i<18; i++) new Thread(readR).start();
for(int i=0; i<2; i++) new Thread(writeR).start();
public class T11_TestSemaphore {
public static void main(String[] args) {
//Semaphore s = new Semaphore(2);
Semaphore s = new Semaphore(2, true);
//Semaphore s = new Semaphore(1);
new Thread(()->{
try {
System.out.println("T1 running...");
System.out.println("T1 running...");
} catch (InterruptedException e) {
} finally {
new Thread(()->{
try {
System.out.println("T2 running...");
System.out.println("T2 running...");
} catch (InterruptedException e) {
public class T12_TestExchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(()->{
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(()->{
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
public class T13_TestLockSupport {
public static void main(String[] args) {
Thread t = new Thread(()->{
for (int i = 0; i < 10; i++) {
if(i == 5) {
try {
} catch (InterruptedException e) {
/*try {
} catch (InterruptedException e) {
System.out.println("after 8 senconds!");
ThreadLocal是JDK包提供的,它提供线程本地变量,使用 set(T value) 方法时,设置变量到了当前线程的 ThreadLocalMap 中,即调用了 ThreadLocalMap 中的 set(ThreadLocal<?> key, Object value) 方法,key 是指向 ThreadLocal 弱引用。所以访问该变量时候是访问的当前线程里的变量的值,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题。
- ThreadLocal
* Sets the current thread's copy of this thread-local variable
* to the specified value. Most subclasses will have no need to
* override this method, relying solely on the {@link #initialValue}
* method to set the values of thread-locals.
* @param value the value to be stored in the current thread's copy of
* this thread-local.
public void set(T value) {
Thread t = Thread.currentThread();
// 获取当前线程的 ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
// 调用 ThreadLocalMap的set()方法 并将当前ThreadLocal对象设置为key
map.set(this, value);
createMap(t, value);
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
* @param t the current thread
* @return the map
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
- Thread
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
- ThreadLocalMap
* Set the value associated with key.
* @param key the thread local object
* @param value the value to be set
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
if (k == null) {
replaceStaleEntry(key, value, i);
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
- 强引用
强引用用来描述普通的 new 对象。
平常使用的对象引用就是强引用,当 object = null;时,内存空间被回收。
Object object = new Object();
- 软引用
当内存不够用时,软引用所指向的内存空间才会被回收,即使让 m = null;内存空间也不会被回收,除非内存不足。
SoftReference<byte[]> m = new SoftReference<>(new byte[1024*1024*10]);
- 弱引用
WeakReference<M> m = new WeakReference<>(new M());
ThreadLocal<M> tl = new ThreadLocal<>();
tl.set(new M());
* ThreadLocalMap is a customized hash map suitable only for
* maintaining thread local values. No operations are exported
* outside of the ThreadLocal class. The class is package private to
* allow declaration of fields in class Thread. To help deal with
* very large and long-lived usages, the hash table entries use
* WeakReferences for keys. However, since reference queues are not
* used, stale entries are guaranteed to be removed only when
* the table starts running out of space.
static class ThreadLocalMap {
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
value = v;
* Set the value associated with key.
* @param key the thread local object
* @param value the value to be set
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
if (k == null) {
replaceStaleEntry(key, value, i);
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
* Remove the entry for key.
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
- 内存溢出
- 内存泄露
public class Test {
Object object;
public void test() {
object = new Object();
- 在test方法结束处,显示给object = null
- 将object作为test方法内部的局部变量
- 虚引用
ReferenceQueue<M> QUEUE = new ReferenceQueue<>();
PhantomReference<M> phantomReference = new PhantomReference<>(new M(), QUEUE);
- 构造方法
* corePoolSize 核心线程池数量
* maximumPoolSize 最大线程池数量
* keepAliveTime 线程存活时间
* unit 线程存活时间单位(时分秒等)
* workQueue 阻塞队列
* threadFactory 线程工厂
* handler 拒绝策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
ThreadPoolExecutor.AbortPolicy // 丢弃任务并抛出RejectedExecutionException异常
ThreadPoolExecutor.DiscardPolicy // 丢弃任务,但不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy // 由调用线程处理该任务
- 执行方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
* Proceed in 3 steps:
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
int c = ctl.get();
// 1. 判断核心线程池是否已满,未满则创建线程执行当前任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
c = ctl.get();
// 2. 判断阻塞队列是否已满,未满则将任务存储在队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 3. 判断线程池是否已满,未满则创建线程执行任务
else if (!addWorker(command, false))
// 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
// 8. `runStateLessThan()`,线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
return c < s;
// 9. `runStateAtLeast()`,线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
* Proceed in 3 steps:
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
int c = ctl.get();
// worker数量比核心线程数小,直接创建worker执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
c = ctl.get();
// worker数量超过核心线程数,任务直接进入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
if (! isRunning(recheck) && remove(command))
// 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
// 这儿有3点需要注意:
// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
// 2. addWorker第2个参数表示是否创建核心线程
// 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
else if (!addWorker(command, false))
private boolean addWorker(Runnable firstTask, boolean core) {
// 外层自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
// (rs > SHUTDOWN) ||
// (rs == SHUTDOWN && firstTask != null) ||
// (rs == SHUTDOWN && workQueue.isEmpty())
// 1. 线程池状态大于SHUTDOWN时,直接返回false
// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层自旋
for (;;) {
int wc = workerCountOf(c);
// worker数量超过容量,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用CAS的方式增加worker数量。
// 若增加成功,则直接跳出外层循环进入到第二部分
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态发生变化,对外层循环进行自旋
if (runStateOf(c) != rs)
continue retry;
// 其他情况,直接内层循环进行自旋即可
// else CAS failed due to workerCount change; retry inner loop
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// worker的添加必须是串行的,因此需要加锁
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 这儿需要重新检查线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker已经调用过了start()方法,则不再创建worker
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// worker创建并添加到workers成功
// 更新`largestPoolSize`变量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
} finally {
// 启动worker线程
if (workerAdded) {
workerStarted = true;
} finally {
// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
if (! workerStarted)
return workerStarted;
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
this.thread = getThreadFactory().newThread(this);
/** Delegates main run loop to outer runWorker */
public void run() {
// 省略代码...
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 调用unlock()是为了让外部可以中断
w.unlock(); // allow interrupts
// 这个变量用于判断是否进入过自旋(while循环)
boolean completedAbruptly = true;
try {
// 这儿是自旋
// 1. 如果firstTask不为null,则执行firstTask;
// 2. 如果firstTask为null,则调用getTask()从队列获取任务。
// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
while (task != null || (task = getTask()) != null) {
// 这儿对worker进行加锁,是为了达到下面的目的
// 1. 降低锁范围,提升性能
// 2. 保证每个worker执行的任务是串行的
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池正在停止,则对当前线程进行中断操作
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
// 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
// 这两个方法在当前类里面为空实现。
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
} finally {
// 帮助gc
task = null;
// 已完成任务数加一
completedAbruptly = false;
} finally {
// 自旋操作被退出,说明线程池正在结束
processWorkerExit(w, completedAbruptly);
wait / notify
- Producer
public class Producer implements Runnable { private Goods goods; public Producer(Goods goods) { this.goods = goods; } @Override public void run() { for (int i = 0; i < 2; i++) { if (i % 2 == 0) { goods.set("娃哈哈","矿泉水"); } else { goods.set("旺仔","小馒头"); } } } }
- Consumer
public class Consumer implements Runnable { private Goods goods; public Consumer(Goods goods) { this.goods = goods; } @Override public void run() { for (int i = 0; i < 2; i++) { goods.get(); } } }
- Goods
public class Goods { private String brand; private String name; // 默认是不存在商品的,如果值等于true的话,代表有商品 private boolean flag = false; public String getBrand() { return brand; } public void setBrand(String brand) { this.brand = brand; } public String getName() { return name; } public void setName(String name) { this.name = name; } // 消费者获取商品 public synchronized void get() { /* * 如果flag等于false的话,意味着生产者没有生产商品,此时消费者无法消费,需要让消费者线程进入到阻塞状态,等待生产者生产,当 有商品之后,再开始消费 */ System.out.println("消费者开始消费"); if (!flag) { try { System.out.println("消费者开始进入wait状态,并释放锁"); wait(); System.out.println("消费者被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者取走了" + this.getBrand() + "----" + this.getName()); flag = false; // 唤醒生产者去进行生产 System.out.println("消费者开始唤醒生产者"); notify(); } // 生产者生产商品 public synchronized void set(String brand, String name) { // 当生产者抢占到cpu资源之后会判断当前对象是否有值,如果有的话,以为着消费者还没有消费,需要提醒消费者消费,同时 // 当前线程进入阻塞状态,等待消费者取走商品之后,再次生产,如果没有的话,不需要等待,不需要进入阻塞状态,直接生产即可 System.out.println("生产者开始生产"); if (flag) { try { System.out.println("生产者开始进入wait状态,并释放锁"); wait(); System.out.println("生产者被唤醒"); } catch (InterruptedException e) { e.printStackTrace(); } } this.setBrand(brand); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } this.setName(name); System.out.println("生产者生产了" + this.getBrand() + "--" + this.getName()); // 如果代码执行到此处,意味着已经生产完成,需要将flag设置为true flag = true; // 唤醒消费者去进行消费 System.out.println("生产者开始唤醒消费者"); notify(); } }
- Main
public class Test { public static void main(String[] args) { Goods goods = new Goods(); Producer producer = new Producer(goods); Consumer consumer = new Consumer(goods); Thread t1 = new Thread(producer); Thread t2 = new Thread(consumer); t1.start(); t2.start(); } }
- Producer
public class ProducerQueue implements Runnable { private BlockingQueue<Goods> blockingQueue; public ProducerQueue(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { for (int i = 0; i < 10; i++) { Goods goods = null; if (i % 2 == 0) { goods = new Goods("娃哈哈", "矿泉水"); } else { goods = new Goods("旺仔", "小馒头"); } System.out.println("生产者开始生产商品:" + goods.getBrand() + "--" + goods.getName()); try { blockingQueue.put(goods); } catch (InterruptedException e) { e.printStackTrace(); } } } }
- Consumer
public class ConsumerQueue implements Runnable { private BlockingQueue<Goods> blockingQueue; public ConsumerQueue(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { for (int i = 0; i < 10; i++) { try { Goods goods = blockingQueue.take(); System.out.println("消费者消费的商品是:" + goods.getBrand() + "--" + goods.getName()); Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }
- Goods
public class Goods { private String brand; private String name; public Goods(String brand, String name) { this.brand = brand; this.name = name; } public String getBrand() { return brand; } public void setBrand(String brand) { this.brand = brand; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
- Main
public class Test { public static void main(String[] args) { BlockingQueue<Goods> queue = new ArrayBlockingQueue<Goods>(5); ProducerQueue producerQueue = new ProducerQueue(queue); ConsumerQueue consumerQueue = new ConsumerQueue(queue); new Thread(producerQueue).start(); new Thread(consumerQueue).start(); } }
wait() / notify()
/** * 曾经的面试题:(淘宝?) * 实现一个容器,提供两个方法,add,size * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束 * * 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢? * * 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁 * 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以 * * 阅读下面的程序,并分析输出结果 * 可以读到输出结果并不是size=5时t2退出,而是t1结束时t2才接收到通知而退出 * 想想这是为什么? * * notify之后,t1必须释放锁,t2退出后,也必须notify,通知t1继续执行 * 整个通信过程比较繁琐 * */ public class T04_NotifyFreeLock { //添加volatile,使t2能够得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { T04_NotifyFreeLock c = new T04_NotifyFreeLock(); final Object lock = new Object(); new Thread(() -> { synchronized(lock) { System.out.println("t2启动"); if(c.size() != 5) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 结束"); //通知t1继续执行 lock.notify(); } }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1启动"); synchronized(lock) { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); if(c.size() == 5) { lock.notify(); //释放锁,让t2得以执行 try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t1").start(); } }
/** * 曾经的面试题:(淘宝?) * 实现一个容器,提供两个方法,add,size * 写两个线程,线程1添加10个元素到容器中,线程2实现监控元素的个数,当个数到5个时,线程2给出提示并结束 * * 给lists添加volatile之后,t2能够接到通知,但是,t2线程的死循环很浪费cpu,如果不用死循环,该怎么做呢? * * 这里使用wait和notify做到,wait会释放锁,而notify不会释放锁 * 需要注意的是,运用这种方法,必须要保证t2先执行,也就是首先让t2监听才可以 * * 阅读下面的程序,并分析输出结果 * 可以读到输出结果并不是size=5时t2退出,而是t1结束时t2才接收到通知而退出 * 想想这是为什么? * * notify之后,t1必须释放锁,t2退出后,也必须notify,通知t1继续执行 * 整个通信过程比较繁琐 * * 使用Latch(门闩)替代wait notify来进行通知 * 好处是通信方式简单,同时也可以指定等待时间 * 使用await和countdown方法替代wait和notify * CountDownLatch不涉及锁定,当count的值为零时当前线程继续运行 * 当不涉及同步,只是涉及线程通信的时候,用synchronized + wait/notify就显得太重了 * 这时应该考虑countdownlatch/cyclicbarrier/semaphore * */ public class T06_LockSupport { // 添加volatile,使t2能够得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { T06_LockSupport c = new T06_LockSupport(); CountDownLatch latch = new CountDownLatch(1); Thread t2 = new Thread(() -> { System.out.println("t2启动"); if (c.size() != 5) { LockSupport.park(); } System.out.println("t2 结束"); }, "t2"); t2.start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1启动"); for (int i = 0; i < 10; i++) { c.add(new Object()); System.out.println("add " + i); if (c.size() == 5) { LockSupport.unpark(t2); } /*try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }*/ } }, "t1").start(); } }
wait() / notify()
/** * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法, * 能够支持2个生产者线程以及10个消费者线程的阻塞调用 * * 使用wait和notify/notifyAll来实现 * */ public class MyContainer1<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10个元素 private int count = 0; public synchronized void put(T t) { while(lists.size() == MAX) { //想想为什么用while而不是用if? try { this.wait(); //effective java } catch (InterruptedException e) { e.printStackTrace(); } } lists.add(t); ++count; this.notifyAll(); //通知消费者线程进行消费 } public synchronized T get() { T t = null; while(lists.size() == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } t = lists.removeFirst(); count --; this.notifyAll(); //通知生产者进行生产 return t; } public static void main(String[] args) { MyContainer1<String> c = new MyContainer1<>(); //启动消费者线程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //启动生产者线程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }
Lock / Condition
/** * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法, * 能够支持2个生产者线程以及10个消费者线程的阻塞调用 * * 使用wait和notify/notifyAll来实现 * * 使用Lock和Condition来实现 * 对比两种方式,Condition的方式可以更加精确的指定哪些线程被唤醒 * */ public class MyContainer2<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10个元素 private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public void put(T t) { try { lock.lock(); while(lists.size() == MAX) { //想想为什么用while而不是用if? producer.await(); } lists.add(t); ++count; consumer.signalAll(); //通知消费者线程进行消费 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T get() { T t = null; try { lock.lock(); while(lists.size() == 0) { consumer.await(); } t = lists.removeFirst(); count --; producer.signalAll(); //通知生产者进行生产 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } public static void main(String[] args) { MyContainer2<String> c = new MyContainer2<>(); //启动消费者线程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //启动生产者线程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }
Lock / Condition
public class T08_00_lock_condition { public static void main(String[] args) { char[] aI = "1234567".toCharArray(); char[] aC = "ABCDEFG".toCharArray(); Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(()->{ try { lock.lock(); for(char c : aI) { System.out.print(c); condition.signal(); condition.await(); } condition.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t1").start(); new Thread(()->{ try { lock.lock(); for(char c : aC) { System.out.print(c); condition.signal(); condition.await(); } condition.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "t2").start(); } }
public class T13_TransferQueue { public static void main(String[] args) { char[] aI = "1234567".toCharArray(); char[] aC = "ABCDEFG".toCharArray(); TransferQueue<Character> queue = new LinkedTransferQueue<Character>(); new Thread(()->{ try { for (char c : aI) { System.out.print(queue.take()); queue.transfer(c); } } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); new Thread(()->{ try { for (char c : aC) { queue.transfer(c); System.out.print(queue.take()); } } catch (InterruptedException e) { e.printStackTrace(); } }, "t2").start(); } }