从字节码角度分析线程安全问题
话不多说看代码
package thread;
public class ThreadSafe extends Thread{
private static int sum = 0;
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
sum++;
}
}
public static void main(String[] args) throws InterruptedException {
ThreadSafe threadSafe1 = new ThreadSafe();
ThreadSafe threadSafe2 = new ThreadSafe();
threadSafe1.start();
threadSafe2.start();
threadSafe1.join();
threadSafe2.join();
System.out.println(sum);
}
}
请问上面代码执行结果是多少?
20000?
16146
为什么会出现这样的问题呢? 出现了线程安全问题,通过前面简单的学习我们发现多个线程去读写一个共享的变量static int sum
会导致资源争抢,从而引发线程安全问题。
反编译字节码
javap -v -p XXX.class
javap -v -p ThreadSafe.class
Classfile /Volumes/software/Project/java/javaSE/out/production/javaSE/thread/ThreadSafe.class
Last modified 2021-10-2; size 942 bytes
MD5 checksum cdbe3b16310948dd2581a224e989dd18
Compiled from "ThreadSafe.java"
public class thread.ThreadSafe extends java.lang.Thread
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
#1 = Methodref #9.#33 // java/lang/Thread."<init>":()V
#2 = Fieldref #3.#34 // thread/ThreadSafe.sum:I
#3 = Class #35 // thread/ThreadSafe
#4 = Methodref #3.#33 // thread/ThreadSafe."<init>":()V
#5 = Methodref #3.#36 // thread/ThreadSafe.start:()V
#6 = Methodref #3.#37 // thread/ThreadSafe.join:()V
#7 = Fieldref #38.#39 // java/lang/System.out:Ljava/io/PrintStream;
#8 = Methodref #40.#41 // java/io/PrintStream.println:(I)V
#9 = Class #42 // java/lang/Thread
#10 = Utf8 sum
#11 = Utf8 I
#12 = Utf8 <init>
#13 = Utf8 ()V
#14 = Utf8 Code
#15 = Utf8 LineNumberTable
#16 = Utf8 LocalVariableTable
#17 = Utf8 this
#18 = Utf8 Lthread/ThreadSafe;
#19 = Utf8 run
#20 = Utf8 i
#21 = Utf8 StackMapTable
#22 = Utf8 main
#23 = Utf8 ([Ljava/lang/String;)V
#24 = Utf8 args
#25 = Utf8 [Ljava/lang/String;
#26 = Utf8 threadSafe1
#27 = Utf8 threadSafe2
#28 = Utf8 Exceptions
#29 = Class #43 // java/lang/InterruptedException
#30 = Utf8 <clinit>
#31 = Utf8 SourceFile
#32 = Utf8 ThreadSafe.java
#33 = NameAndType #12:#13 // "<init>":()V
#34 = NameAndType #10:#11 // sum:I
#35 = Utf8 thread/ThreadSafe
#36 = NameAndType #44:#13 // start:()V
#37 = NameAndType #45:#13 // join:()V
#38 = Class #46 // java/lang/System
#39 = NameAndType #47:#48 // out:Ljava/io/PrintStream;
#40 = Class #49 // java/io/PrintStream
#41 = NameAndType #50:#51 // println:(I)V
#42 = Utf8 java/lang/Thread
#43 = Utf8 java/lang/InterruptedException
#44 = Utf8 start
#45 = Utf8 join
#46 = Utf8 java/lang/System
#47 = Utf8 out
#48 = Utf8 Ljava/io/PrintStream;
#49 = Utf8 java/io/PrintStream
#50 = Utf8 println
#51 = Utf8 (I)V
{
private static int sum;
descriptor: I
flags: ACC_PRIVATE, ACC_STATIC
public thread.ThreadSafe();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Thread."<init>":()V
4: return
LineNumberTable:
line 3: 0
LocalVariableTable:
Start Length Slot Name Signature
0 5 0 this Lthread/ThreadSafe;
public void run();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=2, locals=2, args_size=1
0: iconst_0
1: istore_1
2: iload_1
3: sipush 10000
6: if_icmpge 23
9: getstatic #2 // Field sum:I
12: iconst_1
13: iadd
14: putstatic #2 // Field sum:I
17: iinc 1, 1
20: goto 2
23: return
LineNumberTable:
line 6: 0
line 7: 9
line 6: 17
line 9: 23
LocalVariableTable:
Start Length Slot Name Signature
2 21 1 i I
0 24 0 this Lthread/ThreadSafe;
StackMapTable: number_of_entries = 2
frame_type = 252 /* append */
offset_delta = 2
locals = [ int ]
frame_type = 250 /* chop */
offset_delta = 20
public static void main(java.lang.String[]) throws java.lang.InterruptedException;
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: new #3 // class thread/ThreadSafe
3: dup
4: invokespecial #4 // Method "<init>":()V
7: astore_1
8: new #3 // class thread/ThreadSafe
11: dup
12: invokespecial #4 // Method "<init>":()V
15: astore_2
16: aload_1
17: invokevirtual #5 // Method start:()V
20: aload_2
21: invokevirtual #5 // Method start:()V
24: aload_1
25: invokevirtual #6 // Method join:()V
28: aload_2
29: invokevirtual #6 // Method join:()V
32: getstatic #7 // Field java/lang/System.out:Ljava/io/PrintStream;
35: getstatic #2 // Field sum:I
38: invokevirtual #8 // Method java/io/PrintStream.println:(I)V
41: return
LineNumberTable:
line 14: 0
line 15: 8
line 16: 16
line 17: 20
line 18: 24
line 19: 28
line 20: 32
line 21: 41
LocalVariableTable:
Start Length Slot Name Signature
0 42 0 args [Ljava/lang/String;
8 34 1 threadSafe1 Lthread/ThreadSafe;
16 26 2 threadSafe2 Lthread/ThreadSafe;
Exceptions:
throws java.lang.InterruptedException
static {};
descriptor: ()V
flags: ACC_STATIC
Code:
stack=1, locals=0, args_size=0
0: iconst_0
1: putstatic #2 // Field sum:I
4: return
LineNumberTable:
line 11: 0
}
SourceFile: "ThreadSafe.java"
我们分析这里
9: getstatic #2 // Field sum:I
12: iconst_1
13: iadd
14: putstatic #2 // Field sum:I
简单解释下这里是对sum++
进行的分解操作
我们都知道 sum++ 的本质上是 sum=sum+1
- getstatic 读取静态变量
- iconst_1 常量1
- iadd 自增
- putstatic 修改后的值存入静态变量
问题出现原因: sum++
是非原子操作,程序内部有个计数器,用来记录上次执行位置,当cpu上下文切换时,因为是多个线程共享一个变量,别的线程修改了当前共享的变量,从而导致当前线程读取出错,进而导致结果出错。
如何解决
用原子操作类解决
package thread;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadSafe extends Thread{
private static AtomicInteger sum = new AtomicInteger();
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
sum.addAndGet(1);
}
}
public static void main(String[] args) throws InterruptedException {
ThreadSafe threadSafe1 = new ThreadSafe();
ThreadSafe threadSafe2 = new ThreadSafe();
threadSafe1.start();
threadSafe2.start();
threadSafe1.join();
threadSafe2.join();
System.out.println(sum);
}
}
20000
Callable 与 FutureTask原理剖析
创建流程
- 新建一个类实现
Callable
方法 - 实例化实现
Callable
方法的类 - 实例化一个
FutureTask
用来接受返回值 - 通过get方法获取返回值
package thread;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class ThreadSync implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName()+" start");
Thread.sleep(3_000);
System.out.println(Thread.currentThread().getName()+" end");
return 1;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadSync threadSync = new ThreadSync();
FutureTask<Integer> task = new FutureTask<Integer>(threadSync);
new Thread(task).start();
System.out.println(task.get());
System.out.println(Thread.currentThread().getName()+" end");
}
}
手动实现一个带返回值的异步线程
首先实现写一个接口
package thread;
public interface Callable<T> {
T call();
}
然后实现任务包装类
package thread;
public class TaskWrapper <T> implements Runnable{
private Object lock = new Object();
private T res;
private Callable<T> callable;
public TaskWrapper(Callable callable) {
this.callable = callable;
}
@Override
public void run() {
synchronized (lock){
res = callable.call();
lock.notify();
}
}
public T getAndWait() throws InterruptedException {
synchronized (lock){
lock.wait();
return res;
}
}
}
最后测试
package thread;
public class ThreadWrapperTest {
public static void main(String[] args) throws InterruptedException {
Callable<Integer> callable = () -> {
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
};
TaskWrapper taskWrapper = new TaskWrapper(callable);
new Thread(taskWrapper).start();
System.out.println(taskWrapper.getAndWait());
}
}
1
原理剖析: 我们使用 synchronized 锁来实现异步任务,首先创建一个callable
接口用来包装任务,然后使用TaskWrapper
来包装callable
接口,当创建线程时执行call方法,然后使用 线程之间的通信机制完成,异步任务返回值的获取。FutureTask原理亦是如此
用LockSupport改进代码
package thread;
import java.util.Optional;
import java.util.concurrent.locks.LockSupport;
public class TaskWrapper<T> implements Runnable {
private Thread cuTH = null;
private T res;
private Callable<T> callable;
public TaskWrapper(Callable callable) {
this.callable = callable;
}
@Override
public void run() {
res = callable.call();
// 解除阻塞状态
Optional.ofNullable(cuTH).ifPresent(LockSupport::unpark);
}
public T getAndWait() throws InterruptedException {
// 记录线程
cuTH = Thread.currentThread();
// 进入阻塞状态
LockSupport.park();
return res;
}
}
手动实现一个异步日志框架
这时候就有人说了,日志框架有啥难的。无非就是写入数据到磁盘文件嘛,事情真的如此吗高频率读写,会形成io密集型程序,要知道频繁读写磁盘文件开销是非常大的,这时候我们可以参考下计算机组成原理里面的内容设立缓冲区,缓冲区满了之后写出到文件,从而减少读写文件的频率,进而提高性能。那么问题来了,这里写文件是异步的,因此需要多线程的支持,话不多说那就动手实现吧。
package thread;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingDeque;
class FastLog {
// 缓冲区
private static LinkedBlockingDeque<String> protoBuffer = new LinkedBlockingDeque<>();
// stringBuilder 缓冲区
private static final int SAVE_LIMIT = 5;
// 私有构造器 防止被 new
private FastLog() {
new SaveThread().start();
}
// 内部写日志线程 default timer: 100ms
private class SaveThread extends Thread {
@Override
public void run() {
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < SAVE_LIMIT; i++) {
Optional.ofNullable(protoBuffer.poll()).map(s -> s + '\n').ifPresent(buffer::append);
}
Optional.of(buffer.toString()).filter(s -> s.length() != 0).ifPresent(this::saveToFile);
//如果发生意外则直接保存
if (this.isInterrupted()) {
for (int i = 0; i < protoBuffer.size(); i++) {
String poll = protoBuffer.poll();
saveToFile(poll);
}
}
}
}
// 写日志到文件
public void saveToFile(String str) {
System.out.println(str);
}
}
// builder
public static FastLog logBuilder() {
return new FastLog();
}
// 信息日志
public void info(String line) {
protoBuffer.add(String.format("%s %s %s", new Date().toString(), "[INFO]", line));
}
// 警告日志
public void waring(String line) {
protoBuffer.add(String.format("%s %s %s", new Date().toString(), "[WARNING]", line));
}
// 错误日志
public void err(String line) {
protoBuffer.add(String.format("%s %s %s", new Date().toString(), "[ERROR]", line));
}
// 可变日志
public void def(String flag, String line) {
protoBuffer.add(String.format("%s %s %s", new Date().toString(), "[" + flag + "]", line));
}
// 查看缓冲区内容
public static void _cat() {
for (String line : protoBuffer) {
System.out.println(line);
}
}
}
class Main {
public static void main(String[] args) {
// 建造者模式
FastLog fastLog = FastLog.logBuilder();
fastLog.info("hello");
fastLog.waring("hello");
fastLog.err("hello");
}
}