并发编程基础

怎么获取共享变量的监视器锁?

方法一:通过sychronized(共享变量)

怎么验证当前线程有没有获取到锁?

可以通过调用wait()方法来验证,如果没有获取到那么会报IllegalMonitorStateException类型错误

例子:

1
2
3
4
5
6
public class getThreadExample {
public static volatile String a = "1";
public static void main(String[] args) throws InterruptedException {
a.wait();
}
}
1
38DF6C78C22E52C0F0D210263BC69DE50465BD814F903CCAD491CD88A7991864

运行结果

1
2
3
4
5
com.rose.xiaoqi.createThread.getThreadExample
Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at com.rose.xiaoqi.createThread.getThreadExample.main(getThreadExample.java:6)
尝试wait() 和 notify()方法的玩耍

情况一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class getThreadExample {
public static String a = "1";
public static void main(String[] args) throws InterruptedException {
RunnableTask runnableTask = new RunnableTask();
RunnableTask1 runnableTask1 = new RunnableTask1();
new Thread(runnableTask).start();
// Thread.sleep(1000);
new Thread(runnableTask1).start();
}
static class RunnableTask implements Runnable {
@Override
public void run() {
synchronized (a) {
try {
System.out.println("线程悬挂");
// a += "1";
a.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
static class RunnableTask1 implements Runnable {
@Override
public void run() {
synchronized (a) {
System.out.println("线程唤醒");
// a += "2";
a.notify();
}
}
}
}

上面代码会发生死锁现象

image-20231216001227871

为什么会这样,因为如果你主线程不加个睡眠时间的话,很有可能后面的线程先抢的锁先调用了notify(),导致后面线程拿到锁之后调用wait()方法一直阻塞

情况二:

把以上注解全部解开,会发现

image-20231216004633919

为什么会这样?

因为对共享变量改了之后不是同一个对象了,所以也就没有新的对象的监视器锁

image-20231216004702870

方法二: 通过sychronized void method(int a)

守护线程

守护线程应用场景:
  1. 垃圾回收器是守护线程
  2. Tomcat当中NIO实现NioEnpoint负责维护一组接受请求和处理请求业务的线程,这些线程都是守护线程
特点

JVM只会关心有没有用户线程处理完,如果不存在用户线程还没有处理完的线程,那么JVM会自动退出,不管守护线程死活,JVM退出之前会创建一个DestroyJavaVM线程来看用户线程是否有没执行完的

code show:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class DeamonExample {
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (;;) {
System.out.println("能结束吗?");
}
}
});
thread.setDaemon(true);
thread.start();
System.out.println("main is over");
}
}

image-20231224163224175

看到没,上面”能结束吗? “连一次打印的机会都没有,主线程根本不关心守护线程有没有执行完,直接结束

ThreadLcal

T get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if(map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue()
}
-----------------------
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
------------------------
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
------------------------
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
set()
1
2
3
4
5
6
7
8
9
10
11
12
13
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if(map != null) {
map.set(this,value);
} else {
createMap(t,value);
}
}
----------------------------
public void createMap(Thread t, T value) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

InheritThreadLocal

我们从Thread构造方法来说这个ThreadLocal的子类, 下面代码有部分省略了,读者可以自己去看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public Thread(Runnable target) {
init(null, target, "Thread-" + nextThreadNum(), 0);
}
----------------------------------------------
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
Thread parent = Thread.currentThread();
if(inheritThreadLocals && parent.inheritableThreadLocals != null) {
this.inheritThreadLocal = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
}
}
----------------------------------------------
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}
----------------------------------------------
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];

for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}

注意上面的ThreadLocalMap构造方法只会被createInheritedMap使用,源码里面的注释写了的

通过以上源码,说明了在new Thread()的时候,它会去判断父类线程当中inheritedThreadLocal是否为空,不为空的话,把值拷贝一份给当前线程的局部变量inheritedThreadLocals

2023-12-24-17:03 今天终于可以抽出时间来写一下博客了

Synchronized

能够保证操作变量可见性和原子性

进入

清除线程的本地内存(控制器当中的寄存器数据,Cache 1, Cache 2)

退出

把修改的数据回写给主内存

Volatile

可见性
指令重排序

伪共享

在多线程的场景下,一个缓存行缓存了多个变量,但是一个缓存行同时只能由一个线程去访问,性能不如在一个变量存储在一个缓存行;

在单线程的场景下,如果访问的是数组,数组的元素地址是连续的,缓存行存储某个变量时会把该变量附件相连的变量也附加进去,而数组恰好是连续的,这样能够提高一个访问性能

我们用代码来演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class FalseShare {
public static int row = 1024;
public static int col = 1024;
public static void main(String[] args) {
long t1 = System.currentTimeMillis();
int[][] arr = new int[row][col];
for (int i = 0; i < row; i++) {
for (int j = 0; j < col; j++) {
arr[i][j] = 0;
}
}
long t2 = System.currentTimeMillis();
System.out.println(t2 - t1);
}
}
-------------------------------------------------------------
public class FalseShare {
public static int row = 1024;
public static int col = 1024;
public static void main(String[] args) {
long t1 = System.currentTimeMillis();
int[][] arr = new int[row][col];
for (int i = 0; i < row; i++) {
for (int j = 0; j < col; j++) {
arr[j][i] = 0;
}
}
long t2 = System.currentTimeMillis();
System.out.println(t2 - t1);
}
}

经过多次的测试,发现代码一执行的时间不会超过10,而代码二执行的时间最小是11

image-20240101235052360

举个例子什么是伪共享

缓存行当中有a,b两个变量
线程A去访问缓存行中的变量a,此时线程B读取了变量b, 线程A并对a就行修改,在线程A回写变量a到主内存前,根据一致性协议msci线程B读取的变量b缓存失效

什么是缓存行

在CPU的缓存中,缓存是一个个缓存行组成,一般一个缓存行是64Byte, 同时缓存行也是缓存与主内存交互的数据单位

如何避免伪共享问题?
  1. @sun.misc.Contented注解实现字节填充

  2. 在自己的变量手动添加一些变量,如添加p1,p2,p3,p4,p5,p6,p7来实现填充

    1
    2
    3
    4
    public class Test {
    volatile Long value = 0;
    Long p1,p2,p3,p4,p5,p6,p7;
    }

乐观锁和悲观锁

这是根据思想来分

CAS

Sychorinzed

独占锁和共享锁
可重入锁

Sychronized,ReentryLock

公平锁和非公平锁

公平锁下,多个线程会排队等待持有锁的线程释放锁,而非公平锁不这么玩,不管你前面有没有线程等待

Java并发包–并发编程高级篇

ThreadLocalRandom

Seed
Bound

AtomicLong原子类

LongAddr

Cells

AQS

AQS的架构

image-20240120152909097

基于AQS实现同步器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class NoReentrantLock implements Lock,Serializable {
private final Sync sync = new Sync();

public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public Condition newCondition() {
return sync.newCondition();
}

private static class Sync extends AbstractQueuedSynchronizer {
public boolean tryAcquire(int acquires) {
assert acquires == 1;
if(compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
public boolean tryRelease(int releases) {
assert releases == 1;
if(getState() == 0 || Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException(null);
}
setState(0);
setExclusiveOwnerThread(null);
return true;
}

Condition newCondition() {
return new ConditionObject();
}

public boolean isHeldExclusively() {
return getState() != 0;
}

}
}
使用手写的NoReentryLock实现生产-消费模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ProducerConsumerModel {
final static NoReentrantLock lock = new NoReentrantLock();
final static Condition isFull = lock.newCondition();
final static Condition isEmpty = lock.newCondition();
final static Queue<String> queue = new LinkedBlockingQueue<>();
final static int queueSize = 10;

public static void main(String[] args) {
Thread consumer = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
while (queue.size() == 0) {
isEmpty.await();
}
String poll = queue.poll();
isFull.signalAll();
// 把条件队列里面的节点全部给唤醒,去争夺锁
// 每个ObjectCondition对应一个节点
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});
Thread producer = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
while (queue.size() == queueSize) {
isFull.await();
}
queue.add("ele");
isEmpty.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
});
producer.start();
consumer.start();
}

}
条件变量

条件变量里面能够维护一个单链表实现的条件队列

以下代码为自己看完源码后写的,并不是真正源码

增加一个Node节点
1
2
3
4
5
6
7
8
9
10
11
public Node addCondition() {
Node t = lastWaiter;
Node node = new Node(Thread.currentThread(), Node.Condition);
if(t == null) {
firstWaiter = node;
} else {
t.next = node;
}
lastWaiter = node;
return lastWaiter;
}
减少一个节点
1
2
3
4
5
6
public void signal() {
if(!isHeldExclusively()) throw new IllegalMonitorStateException();
Node next = firstWaiter.next;
firstWaiter.next = null;
firstWaiter = next;
}

ReentryLock