JVM

JVM

chemex

JVM

并发原理

POSIX Thread

基础 API

jvm C 语言创建线程

os_linux.cpp

pthread_create

java 创建线程

Java 中创建线程只有 new Thread(Runable) 一种方式

public class ThreadHelloWorld {
    public static void main(String[] args) throws Exception{
        // Java 线程和 JVM OS 线程不是同一个对象
        // pthread_create()
        Thread t1 = new Thread(ThreadHelloWorld::helloWorld);
        t1.start();// pthread_create()
        t1.join();// pthread_join()
        // java 线程 isAlive() == false 时, JVM 线程已经消亡(C++: delete this)
        System.out.prinf("线程状态: %s,线程是否存活:%s", t1.getState(), t1.isAlive());
    }
    private static void helloWorld() {
        System.out.printf("Thread[id; %d] - Hello World!", Thread.currentTread().getId());
    }
}

同步 API

互斥(Mutex)

Lock

Java 内存模型(Java Memory Model - JMM)

since jdk 1.5 Doug Lea

共享变量(Shared Variables)

Java 普通对象存放在 Heap 中。 对于一个对象来说他的地址是确定的,但是对象中的属性地址是计算出来的。

重排序 Java 中 String 比较特殊 Java 不是完全的面向对象语言, Object + 基本类型 + 数组

object 地址 + offsets Java 针对对象属性采用相对地址计算真实地址,在 cpu 寄存器做了 cas(compare and set) 操作。

Local variables 线程间是隔离的。

动作(Actionss)

同步区别在于范围

顺序(Order)

程序执行顺序

  1. 顺序执行

单线程是没有数据竟争的,所有程序会顺序执行。 数据竞争应该是可以被 jvm 识别的。

hashmap 读的时候是线程安全的。

  1. 同步顺序

保证程序串行

valotile 保证可见性,写的时候加锁,读的时候 valotile

  1. Thread.start 方法在 Thread.run 方法之前执行

Thread.start method 有 sychronized 关键字。

  1. x

final 线程在对象创建之前,不会读到字段在初始化的中间状态。 对象初始化是安全的

  1. x
  2. xx

interrupt 修改了一个状态,保证该状态其他线程可见

Feture 通过判断线程的 interrupted 状态来控制业务流程

happens of before

这个跟最终一致性不是同一个思想吗

顺序不重要

  1. 同一线程,程序顺序
  2. 构造早于销毁之前

    1. 构造对象是在用户线程(main/子线程)
    2. Finalizer 操作是 JVM 线程(GC)
    3. 对象的 Heap 中, Heap 对于线程是共享的
  3. 同步关系>= heapens-before

    1. 只关心前后关系,不关心是否串行
    2. ConcurrentSkipListMap
  4. 传递性 hb(x,y) hb(y,z) -> hb(x,z)

happens-before

lock -> object.wait -> unlock

thread.join() 语义

private static void threadStartAndJoin() throws InterruptedException {
    Thread t = new Thread(() -> {
        // action
    });
    // main 线程调用线程 t 的 join() 方法
    // 在 join() 方法返回之前, t 所有的 actions 已执行结束
    t.join();
}
  • unlock happens-before 其他 lock
  • volatile 字段写 happens-before 其他 subsequent 的读
  • thread.start() happends-before 此线程其他 action
  • 当前线程中的子线程的 actions happens-before thread.join()
  • 兑现的默认初始化操作 happens-before 其他 actions(除了默认写)

延伸

c 语言来保证 volatile

保证顺序,并不易定安全。不同作用域? 最终一致性

Connection

TRANSACTIONREADCOMMITTED, 结构性不同,单机 多机器

account = 5

t1 add 5

t2 minus 2

t3 minus 3

t3 -> t1 -> t2 5 - 3 = 2 2 + 5 = 7 7 - 2 = 5 可见性,最终一致性。 假如不可见, t3 account = 5 - 3 = 2, t2 account = 5 - 2 = 3 每次读都是从内存中读的,不是 cpu 中的

锁,串行, 类似 TRANSACTION_SERIALIZEABLE

执行按照定义的顺序

5 + 5 - 2 - 3 = 5

CAS Atomic

内存屏障(Memory Barriers)

jvm orderAccess.hpp

java 几大疑问

  1. synchronized
  2. volatile

    1. 限制重排
  3. final and immutability
  4. native
  5. actomic

Java 并发框架(J.U.C)AQS 原理

AbstratQueuedSynchronizer(AQS)原理

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.

java.util.concurrent.locks.AbstractQueuedSynchronizer

双向队列

java debugger: 会锁定所有线程

头线程只有状态

公平锁:严格的 FIFO 非公平锁: 线程池中新入队的线程会竞争锁,所以在 acquireQueued 时再次 tryAcquire //PS: LockSupport.unpark() 解锁

抛出 InterruptedException 时 interrupted 状态被清除

条件变量(ConditionObject)原理

SUMMARY

JVM 并发实现

java.lang.Thread API

基本操作

创建 - new Thread()

new Thread() 创建一个普通的 Java 对象,不涉及线程启动

启动 - thread.start()

thread.start() 引导线程启动,不一定马上启动(非阻塞),最终会调用 thread.run 方法

Thread.java

// 注册 Native 方法
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
    registerNatives();
}

/**
 * 线程安全
 */
public synchronized void start() {
    ...
    group.add(this); // 将当前线程添加到所在 ThreadGroup
    boolean started = false;
    try {
        start0(); // Native 方法
        started = true;
    } finally {
        try {
            if (!started) {
                group.threadStartFailed(this);
            }
        } catch (Throwable ignore) {
            /* do nothing. If start0 threw a Throwable then
                it will be passed up the call stack */
        }
    }
}

private native void start0(); // JNI Java 调用 JVM 方法

类名: java.lang.Thread 方法名: java.lang.Thread#start0() JNI(C函数): JVM_StartThread

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
    (*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

Thread.c 方法映射

static JNINativeMethod methods[] = {
    {"start0",           "()V",        (vo id *)&JVM_StartThread},
    {"stop0",            "(" OBJ ")V", (void *)&JVM_StopThread},
    {"isAlive",          "()Z",        (void *)&JVM_IsThreadAlive},
    {"suspend0",         "()V",        (void *)&JVM_SuspendThread},
    {"resume0",          "()V",        (void *)&JVM_ResumeThread},
    {"setPriority0",     "(I)V",       (void *)&JVM_SetThreadPriority},
    {"yield",            "()V",        (void *)&JVM_Yield},
    {"sleep",            "(J)V",       (void *)&JVM_Sleep},
    {"currentThread",    "()" THD,     (void *)&JVM_CurrentThread},
    {"interrupt0",       "()V",        (void *)&JVM_Interrupt},
    {"holdsLock",        "(" OBJ ")Z", (void *)&JVM_HoldsLock},
    {"getThreads",        "()[" THD,   (void *)&JVM_GetAllThreads},
    {"dumpThreads",      "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
    {"setNativeName",    "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};

stackSize 修改方式

  1. Xss 设置 VM 全局配置
  2. Thread 针对性设置
 public Thread(ThreadGroup group, Runnable target, String name,
                  long stackSize) {
    init(group, target, name, stackSize);
}

方法签名: java.lang.Thread#start0() Native 方法: JVM_StartThread 实现入口:

JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
...
  {
    ...

    // Since JDK 5 the java.lang.Thread threadStatus is used to prevent
    // re-starting an already started thread, so we should usually find
    // that the JavaThread is null. However for a JNI attached thread
    // there is a small window between the Thread object being created
    // (with its JavaThread set) and the update to its threadStatus, so we
    // have to check for this
    if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
      throw_illegal_thread_state = true;
    } else {
        jlong size =
             java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
      NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
      size_t sz = size > 0 ? (size_t) size : 0;
      native_thread = new JavaThread(&thread_entry, sz);
        ...
    }
  }

native_thread 创建了 JavaThread 类型对象 &threadentry 即函数指针引用函数 threadentry:

static void thread_entry(JavaThread* thread, TRAPS) {
  HandleMark hm(THREAD);
  Handle obj(THREAD, thread->threadObj()); // 1 thread->threadObj() 返回 java 代码创建 java.lang.Thread 对象
  JavaValue result(T_VOID);
  JavaCalls::call_virtual(&result,
                          obj,             
                          SystemDictionary::Thread_klass(), 
                          vmSymbols::run_method_name(), // 2 java.lang.Thread#run 方法名称
                          vmSymbols::void_method_signature(), // void 方法签名
                          THREAD);
}

通过 thread_entry 方法查找 vmSymbols.hpp 模版查找到 Thread 对象的 run() 方法

JavaThread::JavaThread 构造函数

JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
                       Thread() {
    ...
    initialize();
  _jni_attach_state = _not_attaching_via_jni;
  set_entry_point(entry_point); // 设置 thread_entry 到当前对象
  // Create the native thread itself.
  // %note runtime_23
  os::ThreadType thr_type = os::java_thread;
  thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread : os::java_thread;
  os::create_thread(this, thr_type, stack_sz); // 创建操作系统线程
}

第一个构造参数 entrypoint 实际指 threadentry

os::createthread linux 实现(oslinux.cpp)

bool os::create_thread(Thread* thread, ThreadType thr_type,
                       size_t req_stack_size) {
 ...
  // Allocate the OSThread object
  OSThread* osthread = new OSThread(NULL, NULL);
  if (osthread == NULL) {
    return false;
  }

 ...

  // init thread attributes
  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

  // Calculate stack size if it's not specified by caller.
  size_t stack_size = os::Posix::get_initial_stack_size(thr_type, req_stack_size);
  // In glibc versions prior to 2.7 the guard size mechanism
  // is not implemented properly. The posix standard requires adding
  // the size of the guard pages to the stack size, instead Linux
  // takes the space out of 'stacksize'. Thus we adapt the requested
  // stack_size by the size of the guard pages to mimick proper
  // behaviour. However, be careful not to end up with a size
  // of zero due to overflow. Don't add the guard page in that case.
  size_t guard_size = os::Linux::default_guard_size(thr_type);
  // Configure glibc guard page. Must happen before calling
  // get_static_tls_area_size(), which uses the guard_size.
  pthread_attr_setguardsize(&attr, guard_size);

  size_t stack_adjust_size = 0;
  if (AdjustStackSizeForTLS) {
    // Adjust the stack_size for on-stack TLS - see get_static_tls_area_size().
    stack_adjust_size += get_static_tls_area_size(&attr);
  } else {
    stack_adjust_size += guard_size;
  }

  stack_adjust_size = align_up(stack_adjust_size, os::vm_page_size());
  if (stack_size <= SIZE_MAX - stack_adjust_size) {
    stack_size += stack_adjust_size;
  }
  assert(is_aligned(stack_size, os::vm_page_size()), "stack_size not aligned");

  int status = pthread_attr_setstacksize(&attr, stack_size);
  assert_status(status == 0, status, "pthread_attr_setstacksize");

  ThreadState state;

  {
    pthread_t tid;
    int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);

    char buf[64];
    if (ret == 0) {
      log_info(os, thread)("Thread started (pthread id: " UINTX_FORMAT ", attributes: %s). ",
        (uintx) tid, os::Posix::describe_pthread_attr(buf, sizeof(buf), &attr));
    } else {
      log_warning(os, thread)("Failed to start thread - pthread_create failed (%s) for attributes: %s.",
        os::errno_name(ret), os::Posix::describe_pthread_attr(buf, sizeof(buf), &attr));
      // Log some OS information which might explain why creating the thread failed.
      log_info(os, thread)("Number of threads approx. running in the VM: %d", Threads::number_of_threads());
      LogStream st(Log(os, thread)::info());
      os::Posix::print_rlimit_info(&st);
      os::print_memory_info(&st);
      os::Linux::print_proc_sys_info(&st);
      os::Linux::print_container_info(&st);
    }
    ...
    pthread_attr_destroy(&attr);
    ...
  return true;
}

threadnativeentry thread->call_run() 方法执行 java.lang.Thread#run()

  1. java 调用 native 方法 start0()
  2. start0 映射到 jvm 中的 JVMStartThread(&threadentry, sz)
  3. JVM_StartThread 中 new JavaThread 对象,第一个构造参数映射 java 对象相关信息
  4. JavaThread 构造函数会调用 os 的 creat_thread 方法
  5. creatthread 调用 POSIX Thread `pthreadcreate`
  6. pthread_create 执行 run 回调函数
  7. java.lang.Thread#start() 方法调用完成,JVM 所创建的 JavaThread 对象就移除(delete this)

当 java.lang.Thread#start() 方法调用完成, JVM 所创建的 JavaThread 对象就移除(delete this)。此时 java.lang.Thread 对象还没有回收。

自旋锁(spinlocks) thread.cpp SpinAcquire CLH 队列

  • 原生 - JVM ParkEvent(自旋)
  • 变种 - JDK AQS Node(阻塞)

    • LockSupport#park()
join

调用 Object 的 native 方法 wait:

public class Object {
    ...
    public final native void wait(long timeout) throws InterruptedException;
    ...
}

代码位置(jdk8): Object.c (java.lang.Object 中的 native 方法都在这里做了映射) native 方法映射 wait - JVMMonitorWait jvm.cpp JVMENTRY(void, JVM_MonitorWait)

JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
  JVMWrapper("JVM_MonitorWait");
  Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
  JavaThreadInObjectWaitState jtiows(thread, ms != 0);
  if (JvmtiExport::should_post_monitor_wait()) {
    JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);

    // The current thread already owns the monitor and it has not yet
    // been added to the wait queue so the current thread cannot be
    // made the successor. This means that the JVMTI_EVENT_MONITOR_WAIT
    // event handler cannot accidentally consume an unpark() meant for
    // the ParkEvent associated with this ObjectMonitor.
  }
  ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END

ObjectSynchronizer::wait(obj, ms, CHECK); 会创建膨胀锁

CLH 自旋 AQS.Node CLH 变体双向队列节点阻塞]

JVM parkEvent->park 及 jdk LockSupport.park(this) 底层同为 pthread_cond_wait 函数

volatile 原理

volatile 是一个 c++ 语意。

垃圾回收

线程

I/O

jdk11

quote

TODO

CLH Unsafe OS wait park yield notify

  • JMM
  • thread 原理