JVM
JVM
并发原理
os_linux.cpp
pthread_create
Java 中创建线程只有 new Thread(Runable) 一种方式
public class ThreadHelloWorld {
public static void main(String[] args) throws Exception{
// Java 线程和 JVM OS 线程不是同一个对象
// pthread_create()
Thread t1 = new Thread(ThreadHelloWorld::helloWorld);
t1.start();// pthread_create()
t1.join();// pthread_join()
// java 线程 isAlive() == false 时, JVM 线程已经消亡(C++: delete this)
System.out.prinf("线程状态: %s,线程是否存活:%s", t1.getState(), t1.isAlive());
}
private static void helloWorld() {
System.out.printf("Thread[id; %d] - Hello World!", Thread.currentTread().getId());
}
}
since jdk 1.5 Doug Lea
Java 普通对象存放在 Heap 中。 对于一个对象来说他的地址是确定的,但是对象中的属性地址是计算出来的。
重排序 Java 中 String 比较特殊 Java 不是完全的面向对象语言, Object + 基本类型 + 数组
object 地址 + offsets Java 针对对象属性采用相对地址计算真实地址,在 cpu 寄存器做了 cas(compare and set) 操作。
Local variables 线程间是隔离的。
同步区别在于范围
单线程是没有数据竟争的,所有程序会顺序执行。 数据竞争应该是可以被 jvm 识别的。
hashmap 读的时候是线程安全的。
保证程序串行
valotile 保证可见性,写的时候加锁,读的时候 valotile
Thread.start method 有 sychronized 关键字。
final 线程在对象创建之前,不会读到字段在初始化的中间状态。 对象初始化是安全的
interrupt 修改了一个状态,保证该状态其他线程可见
Feture 通过判断线程的 interrupted 状态来控制业务流程
这个跟最终一致性不是同一个思想吗
顺序不重要
构造早于销毁之前
同步关系>= heapens-before
happens-before
lock -> object.wait -> unlock
thread.join() 语义
private static void threadStartAndJoin() throws InterruptedException {
Thread t = new Thread(() -> {
// action
});
// main 线程调用线程 t 的 join() 方法
// 在 join() 方法返回之前, t 所有的 actions 已执行结束
t.join();
}
c 语言来保证 volatile
保证顺序,并不易定安全。不同作用域? 最终一致性
Connection
TRANSACTIONREADCOMMITTED, 结构性不同,单机 多机器
account = 5
t1 add 5
t2 minus 2
t3 minus 3
t3 -> t1 -> t2 5 - 3 = 2 2 + 5 = 7 7 - 2 = 5 可见性,最终一致性。 假如不可见, t3 account = 5 - 3 = 2, t2 account = 5 - 2 = 3 每次读都是从内存中读的,不是 cpu 中的
锁,串行, 类似 TRANSACTION_SERIALIZEABLE
执行按照定义的顺序
5 + 5 - 2 - 3 = 5
CAS Atomic
jvm orderAccess.hpp
volatile
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
java.util.concurrent.locks.AbstractQueuedSynchronizer
双向队列
java debugger: 会锁定所有线程
头线程只有状态
公平锁:严格的 FIFO 非公平锁: 线程池中新入队的线程会竞争锁,所以在 acquireQueued 时再次 tryAcquire //PS: LockSupport.unpark() 解锁
抛出 InterruptedException 时 interrupted 状态被清除
new Thread() 创建一个普通的 Java 对象,不涉及线程启动
thread.start() 引导线程启动,不一定马上启动(非阻塞),最终会调用 thread.run 方法
Thread.java
// 注册 Native 方法
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
registerNatives();
}
/**
* 线程安全
*/
public synchronized void start() {
...
group.add(this); // 将当前线程添加到所在 ThreadGroup
boolean started = false;
try {
start0(); // Native 方法
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0(); // JNI Java 调用 JVM 方法
类名: java.lang.Thread 方法名: java.lang.Thread#start0() JNI(C函数): JVM_StartThread
JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}
Thread.c 方法映射
static JNINativeMethod methods[] = {
{"start0", "()V", (vo id *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
{"setNativeName", "(" STR ")V", (void *)&JVM_SetNativeThreadName},
};
stackSize 修改方式
public Thread(ThreadGroup group, Runnable target, String name,
long stackSize) {
init(group, target, name, stackSize);
}
方法签名: java.lang.Thread#start0() Native 方法: JVM_StartThread 实现入口:
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
...
{
...
// Since JDK 5 the java.lang.Thread threadStatus is used to prevent
// re-starting an already started thread, so we should usually find
// that the JavaThread is null. However for a JNI attached thread
// there is a small window between the Thread object being created
// (with its JavaThread set) and the update to its threadStatus, so we
// have to check for this
if (java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread)) != NULL) {
throw_illegal_thread_state = true;
} else {
jlong size =
java_lang_Thread::stackSize(JNIHandles::resolve_non_null(jthread));
NOT_LP64(if (size > SIZE_MAX) size = SIZE_MAX;)
size_t sz = size > 0 ? (size_t) size : 0;
native_thread = new JavaThread(&thread_entry, sz);
...
}
}
native_thread 创建了 JavaThread 类型对象
&threadentry 即函数指针引用函数 threadentry:
static void thread_entry(JavaThread* thread, TRAPS) {
HandleMark hm(THREAD);
Handle obj(THREAD, thread->threadObj()); // 1 thread->threadObj() 返回 java 代码创建 java.lang.Thread 对象
JavaValue result(T_VOID);
JavaCalls::call_virtual(&result,
obj,
SystemDictionary::Thread_klass(),
vmSymbols::run_method_name(), // 2 java.lang.Thread#run 方法名称
vmSymbols::void_method_signature(), // void 方法签名
THREAD);
}
通过 thread_entry 方法查找 vmSymbols.hpp 模版查找到 Thread 对象的 run() 方法
JavaThread::JavaThread 构造函数
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread() {
...
initialize();
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point); // 设置 thread_entry 到当前对象
// Create the native thread itself.
// %note runtime_23
os::ThreadType thr_type = os::java_thread;
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread : os::java_thread;
os::create_thread(this, thr_type, stack_sz); // 创建操作系统线程
}
第一个构造参数 entrypoint 实际指 threadentry
os::createthread linux 实现(oslinux.cpp)
bool os::create_thread(Thread* thread, ThreadType thr_type,
size_t req_stack_size) {
...
// Allocate the OSThread object
OSThread* osthread = new OSThread(NULL, NULL);
if (osthread == NULL) {
return false;
}
...
// init thread attributes
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
// Calculate stack size if it's not specified by caller.
size_t stack_size = os::Posix::get_initial_stack_size(thr_type, req_stack_size);
// In glibc versions prior to 2.7 the guard size mechanism
// is not implemented properly. The posix standard requires adding
// the size of the guard pages to the stack size, instead Linux
// takes the space out of 'stacksize'. Thus we adapt the requested
// stack_size by the size of the guard pages to mimick proper
// behaviour. However, be careful not to end up with a size
// of zero due to overflow. Don't add the guard page in that case.
size_t guard_size = os::Linux::default_guard_size(thr_type);
// Configure glibc guard page. Must happen before calling
// get_static_tls_area_size(), which uses the guard_size.
pthread_attr_setguardsize(&attr, guard_size);
size_t stack_adjust_size = 0;
if (AdjustStackSizeForTLS) {
// Adjust the stack_size for on-stack TLS - see get_static_tls_area_size().
stack_adjust_size += get_static_tls_area_size(&attr);
} else {
stack_adjust_size += guard_size;
}
stack_adjust_size = align_up(stack_adjust_size, os::vm_page_size());
if (stack_size <= SIZE_MAX - stack_adjust_size) {
stack_size += stack_adjust_size;
}
assert(is_aligned(stack_size, os::vm_page_size()), "stack_size not aligned");
int status = pthread_attr_setstacksize(&attr, stack_size);
assert_status(status == 0, status, "pthread_attr_setstacksize");
ThreadState state;
{
pthread_t tid;
int ret = pthread_create(&tid, &attr, (void* (*)(void*)) thread_native_entry, thread);
char buf[64];
if (ret == 0) {
log_info(os, thread)("Thread started (pthread id: " UINTX_FORMAT ", attributes: %s). ",
(uintx) tid, os::Posix::describe_pthread_attr(buf, sizeof(buf), &attr));
} else {
log_warning(os, thread)("Failed to start thread - pthread_create failed (%s) for attributes: %s.",
os::errno_name(ret), os::Posix::describe_pthread_attr(buf, sizeof(buf), &attr));
// Log some OS information which might explain why creating the thread failed.
log_info(os, thread)("Number of threads approx. running in the VM: %d", Threads::number_of_threads());
LogStream st(Log(os, thread)::info());
os::Posix::print_rlimit_info(&st);
os::print_memory_info(&st);
os::Linux::print_proc_sys_info(&st);
os::Linux::print_container_info(&st);
}
...
pthread_attr_destroy(&attr);
...
return true;
}
threadnativeentry thread->call_run() 方法执行 java.lang.Thread#run()
当 java.lang.Thread#start() 方法调用完成, JVM 所创建的 JavaThread 对象就移除(delete this)。此时 java.lang.Thread 对象还没有回收。
自旋锁(spinlocks) thread.cpp SpinAcquire CLH 队列
变种 - JDK AQS Node(阻塞)
调用 Object 的 native 方法 wait:
public class Object {
...
public final native void wait(long timeout) throws InterruptedException;
...
}
代码位置(jdk8): Object.c (java.lang.Object 中的 native 方法都在这里做了映射) native 方法映射 wait - JVMMonitorWait jvm.cpp JVMENTRY(void, JVM_MonitorWait)
JVM_ENTRY(void, JVM_MonitorWait(JNIEnv* env, jobject handle, jlong ms))
JVMWrapper("JVM_MonitorWait");
Handle obj(THREAD, JNIHandles::resolve_non_null(handle));
JavaThreadInObjectWaitState jtiows(thread, ms != 0);
if (JvmtiExport::should_post_monitor_wait()) {
JvmtiExport::post_monitor_wait((JavaThread *)THREAD, (oop)obj(), ms);
// The current thread already owns the monitor and it has not yet
// been added to the wait queue so the current thread cannot be
// made the successor. This means that the JVMTI_EVENT_MONITOR_WAIT
// event handler cannot accidentally consume an unpark() meant for
// the ParkEvent associated with this ObjectMonitor.
}
ObjectSynchronizer::wait(obj, ms, CHECK);
JVM_END
ObjectSynchronizer::wait(obj, ms, CHECK); 会创建膨胀锁
CLH 自旋 AQS.Node CLH 变体双向队列节点阻塞]
JVM parkEvent->park 及 jdk LockSupport.park(this) 底层同为 pthread_cond_wait 函数
volatile 是一个 c++ 语意。
CLH Unsafe OS wait park yield notify