java进阶-线程安全

luckyFang 2021年10月02日 36次浏览

从字节码角度分析线程安全问题

话不多说看代码

package thread;

public class ThreadSafe  extends Thread{
    private  static int sum = 0;

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
           sum++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadSafe threadSafe1 = new ThreadSafe();
        ThreadSafe threadSafe2 = new ThreadSafe();
        threadSafe1.start();
        threadSafe2.start();
        threadSafe1.join();
        threadSafe2.join();
        System.out.println(sum);
    }

}

请问上面代码执行结果是多少?
20000?

16146

为什么会出现这样的问题呢? 出现了线程安全问题,通过前面简单的学习我们发现多个线程去读写一个共享的变量static int sum 会导致资源争抢,从而引发线程安全问题。

反编译字节码

javap -v -p XXX.class
javap -v -p ThreadSafe.class

Classfile /Volumes/software/Project/java/javaSE/out/production/javaSE/thread/ThreadSafe.class
  Last modified 2021-10-2; size 942 bytes
  MD5 checksum cdbe3b16310948dd2581a224e989dd18
  Compiled from "ThreadSafe.java"
public class thread.ThreadSafe extends java.lang.Thread
  minor version: 0
  major version: 52
  flags: ACC_PUBLIC, ACC_SUPER
Constant pool:
   #1 = Methodref          #9.#33         // java/lang/Thread."<init>":()V
   #2 = Fieldref           #3.#34         // thread/ThreadSafe.sum:I
   #3 = Class              #35            // thread/ThreadSafe
   #4 = Methodref          #3.#33         // thread/ThreadSafe."<init>":()V
   #5 = Methodref          #3.#36         // thread/ThreadSafe.start:()V
   #6 = Methodref          #3.#37         // thread/ThreadSafe.join:()V
   #7 = Fieldref           #38.#39        // java/lang/System.out:Ljava/io/PrintStream;
   #8 = Methodref          #40.#41        // java/io/PrintStream.println:(I)V
   #9 = Class              #42            // java/lang/Thread
  #10 = Utf8               sum
  #11 = Utf8               I
  #12 = Utf8               <init>
  #13 = Utf8               ()V
  #14 = Utf8               Code
  #15 = Utf8               LineNumberTable
  #16 = Utf8               LocalVariableTable
  #17 = Utf8               this
  #18 = Utf8               Lthread/ThreadSafe;
  #19 = Utf8               run
  #20 = Utf8               i
  #21 = Utf8               StackMapTable
  #22 = Utf8               main
  #23 = Utf8               ([Ljava/lang/String;)V
  #24 = Utf8               args
  #25 = Utf8               [Ljava/lang/String;
  #26 = Utf8               threadSafe1
  #27 = Utf8               threadSafe2
  #28 = Utf8               Exceptions
  #29 = Class              #43            // java/lang/InterruptedException
  #30 = Utf8               <clinit>
  #31 = Utf8               SourceFile
  #32 = Utf8               ThreadSafe.java
  #33 = NameAndType        #12:#13        // "<init>":()V
  #34 = NameAndType        #10:#11        // sum:I
  #35 = Utf8               thread/ThreadSafe
  #36 = NameAndType        #44:#13        // start:()V
  #37 = NameAndType        #45:#13        // join:()V
  #38 = Class              #46            // java/lang/System
  #39 = NameAndType        #47:#48        // out:Ljava/io/PrintStream;
  #40 = Class              #49            // java/io/PrintStream
  #41 = NameAndType        #50:#51        // println:(I)V
  #42 = Utf8               java/lang/Thread
  #43 = Utf8               java/lang/InterruptedException
  #44 = Utf8               start
  #45 = Utf8               join
  #46 = Utf8               java/lang/System
  #47 = Utf8               out
  #48 = Utf8               Ljava/io/PrintStream;
  #49 = Utf8               java/io/PrintStream
  #50 = Utf8               println
  #51 = Utf8               (I)V
{
  private static int sum;
    descriptor: I
    flags: ACC_PRIVATE, ACC_STATIC

  public thread.ThreadSafe();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=1, locals=1, args_size=1
         0: aload_0
         1: invokespecial #1                  // Method java/lang/Thread."<init>":()V
         4: return
      LineNumberTable:
        line 3: 0
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0       5     0  this   Lthread/ThreadSafe;

  public void run();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=2, locals=2, args_size=1
         0: iconst_0
         1: istore_1
         2: iload_1
         3: sipush        10000
         6: if_icmpge     23
         9: getstatic     #2                  // Field sum:I
        12: iconst_1
        13: iadd
        14: putstatic     #2                  // Field sum:I
        17: iinc          1, 1
        20: goto          2
        23: return
      LineNumberTable:
        line 6: 0
        line 7: 9
        line 6: 17
        line 9: 23
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            2      21     1     i   I
            0      24     0  this   Lthread/ThreadSafe;
      StackMapTable: number_of_entries = 2
        frame_type = 252 /* append */
          offset_delta = 2
          locals = [ int ]
        frame_type = 250 /* chop */
          offset_delta = 20

  public static void main(java.lang.String[]) throws java.lang.InterruptedException;
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=3, args_size=1
         0: new           #3                  // class thread/ThreadSafe
         3: dup
         4: invokespecial #4                  // Method "<init>":()V
         7: astore_1
         8: new           #3                  // class thread/ThreadSafe
        11: dup
        12: invokespecial #4                  // Method "<init>":()V
        15: astore_2
        16: aload_1
        17: invokevirtual #5                  // Method start:()V
        20: aload_2
        21: invokevirtual #5                  // Method start:()V
        24: aload_1
        25: invokevirtual #6                  // Method join:()V
        28: aload_2
        29: invokevirtual #6                  // Method join:()V
        32: getstatic     #7                  // Field java/lang/System.out:Ljava/io/PrintStream;
        35: getstatic     #2                  // Field sum:I
        38: invokevirtual #8                  // Method java/io/PrintStream.println:(I)V
        41: return
      LineNumberTable:
        line 14: 0
        line 15: 8
        line 16: 16
        line 17: 20
        line 18: 24
        line 19: 28
        line 20: 32
        line 21: 41
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      42     0  args   [Ljava/lang/String;
            8      34     1 threadSafe1   Lthread/ThreadSafe;
           16      26     2 threadSafe2   Lthread/ThreadSafe;
    Exceptions:
      throws java.lang.InterruptedException

  static {};
    descriptor: ()V
    flags: ACC_STATIC
    Code:
      stack=1, locals=0, args_size=0
         0: iconst_0
         1: putstatic     #2                  // Field sum:I
         4: return
      LineNumberTable:
        line 11: 0
}
SourceFile: "ThreadSafe.java"

我们分析这里

9: getstatic     #2                  // Field sum:I
12: iconst_1
13: iadd
14: putstatic     #2                  // Field sum:I

简单解释下这里是对sum++进行的分解操作

我们都知道 sum++ 的本质上是 sum=sum+1

  • getstatic 读取静态变量
  • iconst_1 常量1
  • iadd 自增
  • putstatic 修改后的值存入静态变量

问题出现原因: sum++非原子操作,程序内部有个计数器,用来记录上次执行位置,当cpu上下文切换时,因为是多个线程共享一个变量,别的线程修改了当前共享的变量,从而导致当前线程读取出错,进而导致结果出错。

如何解决

用原子操作类解决

package thread;

import java.util.concurrent.atomic.AtomicInteger;

public class ThreadSafe  extends Thread{
    private static AtomicInteger sum  = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
           sum.addAndGet(1);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ThreadSafe threadSafe1 = new ThreadSafe();
        ThreadSafe threadSafe2 = new ThreadSafe();
        threadSafe1.start();
        threadSafe2.start();
        threadSafe1.join();
        threadSafe2.join();
        System.out.println(sum);
    }

}

20000

Callable 与 FutureTask原理剖析

创建流程

  • 新建一个类实现Callable方法
  • 实例化实现 Callable方法的类
  • 实例化一个 FutureTask用来接受返回值
  • 通过get方法获取返回值
package thread;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class ThreadSync  implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println(Thread.currentThread().getName()+" start");
        Thread.sleep(3_000);
        System.out.println(Thread.currentThread().getName()+" end");
        return 1;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadSync threadSync = new ThreadSync();

        FutureTask<Integer> task = new FutureTask<Integer>(threadSync);

        new Thread(task).start();

        System.out.println(task.get());

        System.out.println(Thread.currentThread().getName()+" end");
    }
}

手动实现一个带返回值的异步线程

首先实现写一个接口

package thread;

public interface Callable<T> {
    T call();
}

然后实现任务包装类

package thread;

public class TaskWrapper <T> implements Runnable{
    private Object lock = new Object();
    private T res;

    private Callable<T> callable;


    public TaskWrapper(Callable callable) {
        this.callable = callable;
    }

    @Override
    public void run() {
        synchronized (lock){
            res = callable.call();
            lock.notify();
        }
    }


    public T getAndWait() throws InterruptedException {
        synchronized (lock){
            lock.wait();
            return res;
        }
    }
}

最后测试

package thread;

public class ThreadWrapperTest {
    public static void main(String[] args) throws InterruptedException {
        Callable<Integer> callable = () -> {
            try {
                Thread.sleep(3_000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        };
        TaskWrapper taskWrapper = new TaskWrapper(callable);

        new Thread(taskWrapper).start();

        System.out.println(taskWrapper.getAndWait());
    }
}

1

原理剖析: 我们使用 synchronized 锁来实现异步任务,首先创建一个callable接口用来包装任务,然后使用TaskWrapper来包装callable接口,当创建线程时执行call方法,然后使用 线程之间的通信机制完成,异步任务返回值的获取。FutureTask原理亦是如此

用LockSupport改进代码

package thread;

import java.util.Optional;
import java.util.concurrent.locks.LockSupport;

public class TaskWrapper<T> implements Runnable {
    private Thread cuTH = null;
    private T res;
    private Callable<T> callable;

    public TaskWrapper(Callable callable) {
        this.callable = callable;
    }

    @Override
    public void run() {
        res = callable.call();
        // 解除阻塞状态
        Optional.ofNullable(cuTH).ifPresent(LockSupport::unpark);
    }

    public T getAndWait() throws InterruptedException {
        // 记录线程
        cuTH = Thread.currentThread();
        // 进入阻塞状态
        LockSupport.park();
        return res;
    }
}

手动实现一个异步日志框架

这时候就有人说了,日志框架有啥难的。无非就是写入数据到磁盘文件嘛,事情真的如此吗高频率读写,会形成io密集型程序,要知道频繁读写磁盘文件开销是非常大的,这时候我们可以参考下计算机组成原理里面的内容设立缓冲区,缓冲区满了之后写出到文件,从而减少读写文件的频率,进而提高性能。那么问题来了,这里写文件是异步的,因此需要多线程的支持,话不多说那就动手实现吧。

image.png

package thread;

import java.util.Date;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingDeque;

class FastLog {
    // 缓冲区
    private static LinkedBlockingDeque<String> protoBuffer = new LinkedBlockingDeque<>();
    // stringBuilder 缓冲区
    private static final int SAVE_LIMIT = 5;

    // 私有构造器 防止被 new
    private FastLog() {
        new SaveThread().start();
    }

    // 内部写日志线程  default timer: 100ms
    private class SaveThread extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                StringBuffer buffer = new StringBuffer();
                for (int i = 0; i < SAVE_LIMIT; i++) {
                    Optional.ofNullable(protoBuffer.poll()).map(s -> s + '\n').ifPresent(buffer::append);
                }
                Optional.of(buffer.toString()).filter(s -> s.length() != 0).ifPresent(this::saveToFile);
                //如果发生意外则直接保存
                if (this.isInterrupted()) {
                    for (int i = 0; i < protoBuffer.size(); i++) {
                        String poll = protoBuffer.poll();
                        saveToFile(poll);
                    }
                }
            }
        }

        // 写日志到文件
        public void saveToFile(String str) {
            System.out.println(str);
        }

    }


    // builder
    public static FastLog logBuilder() {
        return new FastLog();
    }


    // 信息日志
    public void info(String line) {
        protoBuffer.add(String.format("%s %s %s", new Date().toString(), "[INFO]", line));
    }

    // 警告日志
    public void waring(String line) {
        protoBuffer.add(String.format("%s %s %s", new Date().toString(), "[WARNING]", line));
    }

    // 错误日志
    public void err(String line) {
        protoBuffer.add(String.format("%s %s %s", new Date().toString(), "[ERROR]", line));
    }

    // 可变日志
    public void def(String flag, String line) {
        protoBuffer.add(String.format("%s %s %s", new Date().toString(), "[" + flag + "]", line));
    }

    // 查看缓冲区内容
    public static void _cat() {
        for (String line : protoBuffer) {
            System.out.println(line);
        }
    }

}


class Main {
    public static void main(String[] args) {
        // 建造者模式
        FastLog fastLog = FastLog.logBuilder();

        fastLog.info("hello");
        fastLog.waring("hello");
        fastLog.err("hello");


    }
}