浅谈netty

要理解netty,我们需要先了解I/O Models和JAVA NIO,还有观察者模式、多Reactors线程模型等等这些内容。

I/O Models

在这里我们先要回顾一些操作系统的IO相关基础知识:

  • 用户空间与内核空间:
    现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。
  • 系统调用:
    Linux/Unix内核中设置了一组用于实现各种系统功能的子程序,称为系统调用。用户可以通过系统调用命令在自己的应用程序中调用它们。从某种角度来看,系统调用和普通的函数调用非常相似。区别仅仅在于,系统调用由操作系统核心提供,运行于核心态;而普通的函数调用由函数库或用户自己提供,运行于用户态。
  • 进程切换:

1.为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。
2.消耗CPU资源。

  • 进程的阻塞:

1.正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。
2.当进程进入阻塞状态,不消耗CPU资源。

  • 文件描述符:

1.文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
2.文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

  • 缓存 I/O:

1.缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。
2.缺点:数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

当一个输入(input)操作发生时,这里会经历两个不同的阶段:
1.等待数据就绪。
2.将数据从内核的缓冲区拷贝到进程中。

下面是5种IO模型
1.blocking I/O
2.nonblocking I/O
3.I/O multiplexing (select and poll)
4.signal driven I/O (SIGIO)
5.asynchronous I/O (the POSIX aio_functions)

Blocking I/O Model

image.png
整个读取IO的数据是同步、阻塞的。

Nonblocking I/O Model

image.png
当一个应用程序像这样对一个非阻塞描述符循环调用recvfrom时,我们称之为轮询(polling),应用程序持续轮询内核,以查看某个操作是否就绪,这么做往往耗费大量CPU时间。通常是在专门提供某一种功能的系统中才有

I/O Multiplexing Model

image.png

  • 在IO复用中,我们调用select或者poll 然后在这两个系统调用中的一个中阻塞,而不是阻塞在真实的I/O系统调用上。
  • 优势:跟阻塞I/O模型比较使用select我们可以等待多个描述符就绪,也就是一个线程可以处理多个描述符。
    signal driven I/O (SIGIO)
    image.png
  • 我们也可以用信号,让内核在描述符就绪时发送SIGIO信号通知我们。
  • 优势:等待数据报到达期间进程不被阻塞。主循环可以继续执行,只要等待来自信号处理函数的通知(既可以是数据已准备好被处理,也可以是数据报已准备好被读取。)
    asynchronous I/O (the POSIX aio_functions)
    告知内核启动某个操作,并让内核在整个操作(包括将数据从内核复制到我们自己的缓冲区)完成后通知我们。信号驱动式I/O是由内核通知我们何时可以启动一个I/O操作,而异步I/O模型是由内核通知我们I/O操作何时完成。通过状态、通知和回调来通知调用者的输入输出操作。

image.png

  • 优势:
    我们调用aio_read函数,给内核传递描述符、缓冲区指针、缓冲区大小和文件偏移,病告诉内核当整个操作完成时如何通知我们。该系统调用立即返回,而且在等待I/O完成期间,我们的进程不被阻塞。
    各种I/O模型的比较
    image.png
  • 前4中模型的主要区别在于第一阶段,因为它们的第二阶段是一样的:在数据从内核复制到调用者的缓冲区期间,进程阻塞于recvfrom调用。
  • 同步I/O操作导致请求进程阻塞,直到I/O操作完成,异步I/O操作不导致请求进程阻塞。前四种都是同步I/O模型,因为其中的I/O操作recvfrom讲阻塞进程。只有异步I/O模型与POSIX定义的异步I/O相匹配。

随着linux内核的不断发展,IO也在不断发展,所以后面有了IO多路复用模型。IO 多路复用是通过linux内核的select、poll、epoll这些来完成的。

select函数

该函数允许进程指示内核等待多个事件中的任何一个发生,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒它。
例如:我们可以调用select,告知内核仅在下列情况发生时才返回:

  • 集合{1,4,5}中的任何描述符准备好读;
  • 集合{2,7}中任何描述符准备好写;
  • 集合{1,4}中的任何描述符有异常条件待处理;
  • 已经历了10.2秒
    下面是select函数的简单结构
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    #include <sys/select.h>
    #include <sys/time.h>

    //返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
    int select(int maxfdp1,fd_set *readset,fd_set *writeset, fd_set *exceptest,const struct timeval *timeout);

    struct timeval{
    long tv_sec; /* seconds */
    long tv_used; /* microseconds */
    }

select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求
pselect : 能够处理信号阻塞并提供了更高时间分辨率的select的增强版本

poll函数
1
2
3
4
5
6
7
8
9
10
11
#include <poll.h>

//返回:若有就绪描述符则为其数目,若超时则为0,若出错则为-1
int poll (struct pollfd *fdarray, unsigned long nfds, int timeout);

//一个pollfd结构体表示一个被监视的文件描述符
struct pollfd {
int fd; /* descriptor to check */
short events; /* events of interest on fd */
short revents; /* events that occurred on fd */
};

select机制的问题
1.每次调用select,都需要把fd_set集合从用户态拷贝到内核态,如果fd_set集合很大时,那这个开销也很大
2.同时每次调用select都需要在内核遍历传递进来的所有fd_set,如果fd_set集合很大时,那这个开销也很大
3.为了减少数据拷贝带来的性能损坏,内核对被监控的fd_set集合大小做了限制,并且这个是通过宏控制的,大小不可改变(限制为1024) 【poll用数组结构体解决了大小限制问题】

它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一些缺点:

  • 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
  • poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。
epoll函数

epoll在Linux2.6内核正式提出,是基于事件驱动的I/O方式,相对于select来说,epoll没有描述符个数限制,使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int epoll_create(int size);  // epoll_create 函数创建一个epoll句柄
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 函数注册要监听的事件类型
//  epoll_wait 函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */};

typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

epoll优点:
1.没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口);
2.效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

  1. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。

它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

对比:
image.png

总结

综上,在选择select,poll,epoll时要根据具体的使用场合以及这三种方式的自身特点。

1.表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。

2.select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善。

JAVA NIO

Java NIO提供了与标准IO不同的IO工作方式:

  • Channels and Buffers(通道和缓冲区):

    标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channel)和缓冲区(Buffer)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。
  • 非阻塞IO(Non-blocking IO):

    Java NIO可以让执行非阻塞IO,例如:当线程从通道读取数据到缓冲区时,线程还是可以进行其他事情。当数据被写入到缓冲区时,线程可以继续处理它。从缓冲区写入通道也类似。
  • 选择器(Selectors):

    Java NIO引入了选择器的概念,选择器用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个的线程可以监听多个数据通道。

java nio 的几个重要组件:
image.png

Channels and Buffers

基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。
image.png
常见的几种channel:

  • FileChannel
  • DatagramChannel UDP连接
  • SocketChannel 客户端socket连接
  • ServerSocketChannel 服务端socket连接

常见的几种Buffer:

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer
  • Mappedyteuffer 表示内存映射文件 java nio 提供了这个在某种场景下极大提升效率
selector

Selector允许单线程处理多个 Channel。如果你的应用打开了多个连接(通道),但每个连接的流量都很低,使用Selector就会很方便。例如,在一个聊天服务器中。
image.png

A Thread uses a Selector to handle 3 Channel's 
IO VS NIO

image.png

netty原理

Netty是一个事件驱动、异步IO的网络框架。高性能,吞吐量更高,延迟更低、高性能之处主要来自于其I/O 模型和线程处理模型(Reactor),前者决定如何收发数据,后者决定如何处理数据。
image.png

Reactor模式

image.png
Reactor模式(反应器模式)是一种处理一个或多个客户端并发交付服务请求的事件设计模式。当请求抵达后,服务处理程序使用I/O多路复用策略,然后同步地派发这些请求至相关的请求处理程序。

核心组件交互图如下:
image.png

  • Handle(句柄或描述符,在Windows下称为句柄,在Linux下称为描述符):本质上表示一种资源(比如说文件描述符,或是针对网络编程中的socket描述符),是由操作系统提供的;该资源用于表示一个个的事件,事件既可以来自于外部,也可以来自于内部。
  • Synchronous Event Demultiplexer(同步事件分离器):它本身是一个系统调用,用于等待事件的发生(事件可能是一个,也可能是多个)。调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止。对于Linux来说,同步事件分离器指的就是常用的I/O多路复用机制,比如说select、poll、epoll等。在Java NIO领域中,同步事件分离器对应的组件就是Selector;对应的阻塞方法就是select方法。
  • Event Handler(事件处理器):本身由多个回调方法构成,这些回调方法构成了与应用相关的对于某个事件的反馈机制。在Java NIO领域中并没有提供事件处理器机制让我们调用或去进行回调,是由我们自己编写代码完成的。Netty相比于Java NIO来说,在事件处理器这个角色上进行了一个升级,它为我们开发者提供了大量的回调方法,供我们在特定事件产生时实现相应的回调方法进行业务逻辑的处理,即ChannelHandler。ChannelHandler中的方法对应的都是一个个事件的回调。
  • Initiation Dispatcher(初始分发器):际上就是Reactor角色。它本身定义了一些规范,这些规范用于控制事件的调度方式,同时又提供了应用进行事件处理器的注册、删除等设施。它本身是整个事件处理器的核心所在,Initiation Dispatcher会通过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件。Netty中ChannelHandler里的一个个回调方法都是由bossGroup或workGroup中的某个EventLoop来调用的。
Basic Reactor Design

image.png

NIO实现Reactor

image.png
image.png

image.png
image.png
image.png

上面是用java nio实现基本Reactor模式,需要自己写很多代码。

Worker Thread Pools 版 Reactor模式

image.png

多Reactor模式

image.png

netty就是使用的就是多Reactor模式:
image.png

Netty的异步处理:

image.png
常见操作:
image.png

netty功能特性

image.png

Netty核心组件

image.png

ChannelPipeline处理入站事件和出站操作

image.png
image.png
image.png

Netty Reactor 工作架构图

image.png
image.png
image.png

下面有一个基于netty的简单的IM demo,可以简单了解netty的编程方法和思想:
Server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.yuanjia.im.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

/**
* Created by bruce on 2019/6/10.
*/
public class Server {
private int port;

public Server(int port) {
this.port = port;
}

public void start() {

// netty服务端ServerBootstrap启动的时候,默认有两个eventloop分别是bossGroup和 workGroup

EventLoopGroup boosGroup = new NioEventLoopGroup(1); // bossGroup
EventLoopGroup workerGroup = new NioEventLoopGroup(); // workGroup
try {
ServerBootstrap sbs = new ServerBootstrap().group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
//ch.pipeline().addLast(new DiscardInboundHandler());
ch.pipeline().addLast(new ServerHandler());
};
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);

ChannelFuture future = sbs.bind(port).sync();
System.out.println("Server start listen at " + port);
future.channel().closeFuture().sync();
} catch (Exception e) {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8090;
}
new Server(port).start();
}
}

Client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.yuanjia.im.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
* Created by bruce on 2019/6/10.
*/
public class Client {

//server 's ip 这里需要用户根据自己server的ip来做修改,例如我这里是10.1.132.194
private static final String HOST = System.getProperty("host", "10.1.132.194");
//port 8090
private static final int PORT = Integer.parseInt(System.getProperty("port", "8090"));

public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>(){
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new StringDecoder());
p.addLast("encoder", new StringEncoder());
p.addLast(new ClientHandler());
}
});
ChannelFuture future = b.connect(HOST, PORT).sync();
//控制台输入消息给服务端让服务端转给给另外一个客户端
//消息如: 认识你真高兴我的小伙伴@10.1.8.30
//消息就转发给了10.1.8.30
Scanner sc = new Scanner(System.in);
while(sc.hasNext()){
String message = sc.nextLine();
future.channel().writeAndFlush(message);
}
future.channel().closeFuture().sync();
} finally {
group.spliterator();
}
}

}

ServerHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.yuanjia.im.netty.server;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.ConcurrentMap;

/**
* Created by bruce on 2019/6/10.
*/
@ChannelHandler.Sharable
public class ServerHandler implements ChannelInboundHandler {

//存放客户端和服务端之间的连接
private static ConcurrentMap<String,ChannelHandlerContext> channelConcurrentMap = PlatformDependent.newConcurrentHashMap();


@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {

}

@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {

}

@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
//获取客户端的ip
String hostString = ((SocketChannel)channelHandlerContext.channel()).remoteAddress().getHostString();
System.out.println(hostString + " online");
//将客户端和服务端之间的连接存放在concurrentHashMap中
channelConcurrentMap.put(hostString,channelHandlerContext);
}

@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {

}

@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
System.out.println("ServerHandler channelRead....");
//客户端通过Terminal连接后的输入格式为 message@ip ,这个消息接收者ip会收到message消息
//例如: 你最近还好吗,Bruce@10.1.128.1
String messageString = o.toString();
String[] messages = messageString.split("@");
String message = messages[0];
String targetHost = messages[1];
System.out.println(channelHandlerContext.channel().remoteAddress()+"->Server :"+o.toString());
ChannelHandlerContext targetChannelHandlerContext = channelConcurrentMap.get(targetHost);
targetChannelHandlerContext.write(channelHandlerContext.channel().remoteAddress() + " say : " + message);
targetChannelHandlerContext.flush();
}

@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {

}

@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {

}

@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {

}

@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {

}

@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {

}
}

ClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.yuanjia.im.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
* Created by bruce on 2019/6/10.
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client01Handler Active");
//ctx.fireChannelActive(); // 若把这一句注释掉将无法将event传递给下一个ClientHandler
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

代码地址:https://github.com/bruceChi/nettyIM
参考资料:
1、http://tutorials.jenkov.com/java-nio
2、 UNIX Network Programming
3、 https://www.jianshu.com/p/63a006e5e22d
4、http://tutorials.jenkov.com/netty
5、http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf
6、 https://www.cnblogs.com/winner-0715/p/8733787.html
7、 http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

浅谈dubbo微服务

实现一个简单的RPC系统

image.png

provider:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.yuanjia.demo.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;

/**
* Created by chiyuanjia on 2018/4/16.
*/
public class Provider {

public static void main(String[] args) throws Exception {

//新建一个server socket 端口号为1234,普通的RPC调用可以通过socket来实现
ServerSocket server=new ServerSocket(1234);
//监听socket连接并响应
while(true)
{
Socket socket=server.accept();
ObjectInputStream input=new ObjectInputStream(socket.getInputStream());

//获得服务端要调用的类名
String classname=input.readUTF();
//获得服务端要调用的方法名称
String methodName=input.readUTF();
//获得服务端要调用方法的参数类型
Class<?>[] parameterTypes=(Class<?>[]) input.readObject();
//获得服务端要调用方法的每一个参数的值
Object[] arguments=(Object[]) input.readObject();

//创建类
Class serviceClass=Class.forName(classname);
//创建对象
Object object = serviceClass.newInstance();
//获得该类的对应的方法
Method method=serviceClass.getMethod(methodName, parameterTypes);

//该对象调用指定方法
Object result=method.invoke(object, arguments);

ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());
output.writeObject(result);
socket.close();
}
}

}

consumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.yuanjia.demo.rpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.Socket;
/**
* Created by duheng on 2018/4/16.
*/
public class Consumer {

public void doBusiness() throws Exception{
//设置调用类的路径和要调用的方法
String classname="com.dfire.demo.rpc.RPCServiceImpl";
String method="sayHello";
Class[] argumentsType={String.class};

//获取本机计算机名称
InetAddress inetAddress = InetAddress.getLocalHost();
String hostName = inetAddress.getHostName().toString();

Object[] arguments={hostName};

//与10.1.134.145主机建立socket连接进行通讯
Socket socket = new Socket("127.0.0.1",1234);

ObjectOutputStream output=new ObjectOutputStream(socket.getOutputStream());

//输入数据
output.writeUTF(classname);
output.writeUTF(method);
output.writeObject(argumentsType);
output.writeObject(arguments);

//得到返回数据
ObjectInputStream input=new ObjectInputStream(socket.getInputStream());
Object result=input.readObject();
System.out.println(Thread.currentThread().getName()+" "+result);
socket.close();
}

public static void main(String[] args) {
try {
new Consumer().doBusiness();
}catch (Exception e){
e.printStackTrace();
}

}

}

输入命令查看:

1
lsof -i tcp:1234

image.png
上图显示线程16419在监听TCP的通讯,做为RPC的server端
下面我们看下我们的dubbo的通讯监听情况:
先看服务提供者:

1
lsof -i tcp:20881

image.png
下面为服务消费者:

1
lsof -i tcp:20880

image.png
最后边的许多连接表示10.1.134.145连接了多个机器的服务【服务提供者端口为20880】

上面通过两个简单的类Provider和Consumer实现了一个简单的RPC系统,大家应该就能清楚认识到RPC的整个架构和通讯是比较简单的,当然上面的这个RPC系统有很多缺点:

  • BIO,不能支持高并发。
  • 不支持负载均衡
  • 不支持容错机制
  • 不支持SPI扩展
  • 不支持各种序列化
  • 不支持自动注册和发现
    当然还有其他很多缺点,业界有很多RPC的框架,例如:dubbo,我们下面来讲一下讲。

下面是从dubbo官网copy下面的一段话,对dubbo的简单描述:
image.png
image.png

dubbo的拓扑图

image.png

image.png
我们上面自己实现的RPC系统就缺少Registry和Monitor这两个模块,当然其他模块也很弱

整体设计

image.png

  • config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类

  • proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory

  • registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService

  • cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口 为 Cluster, Directory, Router, LoadBalance

  • monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorServiceprotocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter

  • exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer

  • transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codecserialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool

    调用链:

    image.png
    ##服务提供者暴露一个服务的过程:
    image.png

    暴露服务时序图

    image.png
    ##服务消费者消费一个服务的过程
    image.png
    ##引用服务时序
    image.png

    服务提供 Invoker 和服务消费 Invoker

    image.png

    核心领域模型(Microkernel + Plugin 模式)

  • Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。

  • functionalities是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可

    执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可
    能一个集群实现。
  • Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。

    dubbo:// 数据传输

    Dubbo 缺省协议采用单一长连接和 NIO 异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况。反之,Dubbo 缺省协议不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。
    image.png

    + Transporter:     mina, netty, grizzy
    + Serialization:    dubbo, hessian2, java, json
    + Dispatcher: all, direct, message, execution, connection
    + ThreadPool: fixed, cached, limited

    Dispatcher

    对于Dubbo集群中的Provider角色,有IO线程池和业务处理线程池(默认200)两个线程池,所以当业务的并发比较高,或者某些业务处理变慢,业务线程池就很容易被“打满”,抛出“RejectedExecutionException: Thread pool is EXHAUSTED! ”异常。

  1. all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
  2. direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
  3. message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在IO线程上执行。
  4. execution 只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
  5. connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
    1
    <dubbo:provider version="1.0" delay="-5000" timeout="5000" dispatcher="all" threads="400" loadbalance="leastactive" actives="400" />

ThreadPool

  1. fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)。

  2. cached 缓存线程池,空闲一分钟自动删除,需要时重建。

  3. limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。

  4. eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超
    过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)。

zookeeper 注册中心

Zookeeper 是 Apacahe Hadoop 的子项目,是一个树型的目录服务,支持变更推送,适合作为 Dubbo 服务的注册中心,工业强度较高,可用于生产环境,并推荐使用。
image.png

流程说明:
  1. 服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址。
  2. 服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址。
  3. 监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。

    Fault Tolerance 容错

  4. Failover – FailoverClusterInvoker
    失败自动切换,尝试其他服务器。 (默认的方案)
  5. Failfast – FailfastClusterInvoker
    失败立即抛出异常,。通常用于非幂等性的写操作,比如新增记录。
  6. Failsafe – FailsafeClusterInvoker
    失败忽略异常。通常用于写入审计日志等操作。
  7. Failback – FailbackClusterInvoker
    失败自动恢复,记录日志并定时重试。 通常用于消息通知操作。
  8. Forking – ForkingClusterInvoker
    并行调用多个服务,一个成功立即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。
  9. Broadcast – BroadcastClusterInvoker
    广播调用所有提供者,任意一个报错则报错

Load Balancing 负载均衡

  1. random (随机,按权重设置随机概率) 默认的策略

    1. round-robin (轮循,按公约后的权重设置轮循比率)
      存在慢的提供者累积请求问题,比如:第二台机器很慢,但没挂,
      当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

    2. least-active (最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。) 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大,不支持权重。 【线上用得比较多,注意dubbo低版本有bug】

    3. consistent-hash(一致性Hash,相同参数的请求总是发到同一提供者)
      当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动,会导致压力分摊不均。

    4. 支持扩展

      需要实现AbstractLoadBalance接口

SPI扩展实现

一、调用拦截扩展
扩展说明:服务提供方和服务消费方调用过程拦截,Dubbo 本身的大多功能均基于此扩展点实现,每次远程方法执行,该拦截都会被执行,请注意对性能的影响。dubbo层的限流和熔断可以用这个filter扩展来实现。

约定:

  • 用户自定义 filter 默认在内置 filter 之后。
  • 特殊值 default,表示缺省扩展点插入的位置。比如:filter=“xxx,default,yyy”,表示 xxx 在缺省 filter 之前,yyy 在缺省 filter 之后。
  • 特殊符号 -,表示剔除。比如:filter=“-foo1”,剔除添加缺省扩展点 foo1。比如:filter=“-default”,剔除添加所有缺省扩展点。
  • provider 和 service 同时配置的 filter 时,累加所有 filter,而不是覆盖。比如:<dubbo:provider filter=”xxx,yyy”/> 和 <dubbo:service filter=”aaa,bbb” />,则 xxx,yyy,aaa,bbb 均会生效。如果要覆盖,需配置:<dubbo:service filter=”-xxx,-yyy,aaa,bbb” />

接口com.alibaba.dubbo.rpc.Filter
扩展配置
<dubbo:reference filter= “xxx,yyy” />
<dubbo:consumer filter= “xxx,yyy” />
<dubbo:service filter= “xxx,yyy” />
<dubbo:provider filter= “xxx,yyy” />
已知扩展
com.alibaba.dubbo.rpc.filter.EchoFilter
com.alibaba.dubbo.rpc.filter.GenericFilter
com.alibaba.dubbo.rpc.filter.GenericImplFilter
com.alibaba.dubbo.rpc.filter.TokenFilter
com.alibaba.dubbo.rpc.filter.AccessLogFilter
com.alibaba.dubbo.rpc.filter.CountFilter
com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
com.alibaba.dubbo.rpc.filter.ContextFilter
com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
com.alibaba.dubbo.rpc.filter.ExceptionFilter
com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
com.alibaba.dubbo.rpc.filter.DeprecatedFilter

总结得比较简单,摘自我的培训课件,适合根据这个文章来做分享,欢迎大家一起交流。

深入理解volatile

#JMM Java 内存模型
Java的内存模型指定了Java虚拟机如何与计算机的内存进行工作
image.png
Java内存模型决定了一个线程对共享变量的写入何时对其他线程可见,Java内存模型定义了线程和主内存之间的抽象关系,具体如下:
1、共享变量存储在主内存中,每个线程都可以访问。
2、每个线程都有私有的工作内存。
3、工作内存只存储该线程对共享变量的副本。
4、线程不能直接操作主内存,只有先操作了工作内存之后才能写入内存。
假设主内存的共享变量为0,线程1和线程2分享拥有共享变量X的副本,假设线程1此时将工作内存中的X修改为1,同时刷新到主内存中,当线程2想要去使用副本X的时候,就会发现该变量已经失效了,必须到主内存中再次获取然后存入自己的工作内容中,这一点和CPU与CPU Cache之间的关系非常类似。
image.png
当同一个数据被分别存储到了计算机的各个内存区域时,就会导致多个线程在各自的工作内存中看到的可能不一样。后面会讲到Java语言中如何保证不通线程对某个共享变量的可见性。

#多线程可见性例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.concurrent.TimeUnit;
/**
* Created by chiyuanjia on 2019/7/17.
*/
public class VolatileFoo {

//init_value 的最大值
final static int MAX = 5 ;

//init_value 的初始值
static int init_value = 0;

public static void main(String[] args) {
//启动一个Reader线程 ,当发现local_value 和 init_value 不同时,则输出 init_value 被修改的信息
new Thread(new Runnable() {
public void run() {
int localValue = init_value;
while (localValue < MAX){
if(init_value != localValue){
System.out.printf("The init_value is updated to [%d]\n",init_value);
//对localValue 进行重新赋值
localValue = init_value;
}
}
}
},"Reader").start();

//启动Updater线程,主要用于对init_value的修改,当local_value>=5的时候则退出生命周期
new Thread(new Runnable() {
public void run() {
int localValue = init_value;
while(localValue < MAX){
//修改init_value
System.out.printf("The init_value will be changed to [%d]\n",++localValue);
init_value = localValue;
try {
//短暂休眠 目的是为了使Reader线程来得及输出变化内容
TimeUnit.SECONDS.sleep(2);
}catch (Exception e){
e.printStackTrace();
}
}
}
}).start();

}
}

大家先猜一下,运行结果是怎么样的?可能会大失所望
运行结果如下:

1
2
3
4
5
6
/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -Dvisualvm.id=83453442187216 -Xms20m -Xmx20m -XX:+HeapDumpOnOutOfMemoryError -Didea.launcher.port=7534 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/tools.jar:/Users/bruce/2dfire/workspace/jvmstudy/target/classes:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain com.just.study.jvm.concurrent.VolatileFoo
The init_value will be changed to [1]
The init_value will be changed to [2]
The init_value will be changed to [3]
The init_value will be changed to [4]
The init_value will be changed to [5]

通过控制台的输出我们发现:Reader线程压根就没有感知到init_value的变化、并且进入了死循环线程没有退出
我们对代码做一个调整,将init_value变量设置为volatile:

1
2
//init_value 的初始值
static volatile int init_value = 0;

运行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -Dvisualvm.id=83688650193139 -Xms20m -Xmx20m -XX:+HeapDumpOnOutOfMemoryError -Didea.launcher.port=7535 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/tools.jar:/Users/bruce/2dfire/workspace/jvmstudy/target/classes:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain com.just.study.jvm.concurrent.VolatileFoo
The init_value will be changed to [1]
The init_value is updated to [1]
The init_value will be changed to [2]
The init_value is updated to [2]
The init_value will be changed to [3]
The init_value is updated to [3]
The init_value will be changed to [4]
The init_value is updated to [4]
The init_value will be changed to [5]
The init_value is updated to [5]

Process finished with exit code 0

为啥会发生这样的改变、后面会慢慢讲到、这里是因为volatile可以保证多线程环境下的可见性、还有volatile的变量是先被写后再被读(后续会讲到)。

#CPU缓存模型和缓存一致性
CPU在速度上的发展要快与内存在速度上的发展,由于两边速度严重的不等,所以为了增加吞吐量,缩小CPU和内存的速度差,建立了CPU Cache模型,就是大家所熟知的L1、L2、L3 CPU高速缓存。CPU Cache又由很多个Cache Line构成,Cache Line可以认为是CPU Cache中最小的缓存单元。
image.png

image.png

程序运行过程中,会将运算锁需要的数据从内存复制一份到CPU Cache中,然后进行读取和写入,当运算结束之后,在将CPU Cache中的最新数据刷新到内存中,这样通过CPU Cache在中间做交互,提高了CPU的吞吐能力。
image.png

CPU Cache虽然提高了CPU的吞吐能力,同时也带来了一个问题:缓存不一致的问题,比如i++这个操作,运行的过程如下;
1、读取主内存的i到CPU Cache中。
2、对i进行加一的操作。
3、将结果写回到CPU Cache中。
4、将数据刷新到主内存中。
i++在单线程的情况下不会有任何问题,但在多线程的情况下就会有问题,每个线程都有自己的工作内存(对于于CPU的Cache),变量i会在多个线程的本地内存中都存在一个副本。如果同时有两个线程执行i++操作,假设i的初始值为0,每一个线程都从主内存获取i的值存入CPU Cache中,然后经过计算在写入主内存中,很有可能i在经过了两次自增之后结果还是1,这就是典型的缓存不一致问题。
主要有两种解决方法:
1、通过总线加锁。
2、通过缓存一致性协议。
第一种是悲观的实现方式,CPU和其他组件的通信都是通过总线来进行,会有阻塞,效率低下。

第二种:
image.png
在缓存一致性中最为出名的是Intel的MESI协议,MESI协议保证了每一个缓存汇中使用的是共享变量副本都是一致的,大概意思就是当CPU在操作Cache中的数据时,如果发现该变量是一个共享变量,也就是说在其他的CPU Cache中也存在一个副本,那么进行如下操作:
1、读取操作,不做任何处理,只是将Cache中的数据读取到寄存器。
2、写入操作,发出信号通知其他CPU将该变量的Cache line置为无效状态,其他CPU在进行该变量读取的时候不得不到主内存中再次获取。

#并发编程三大特性:原子性、可见性、有序性

##原子性
原子性是值指在一次的操作或者多次操作中,要么所有的操作全部得到执行,要么所有的操作都不执行。i++ 是由三个原子操作组成get i, i+1 ,set i = x,但是i++就不是原子性操作。volatile不保证原子性,synchronized保证原子性,JUC的原子性类型保证原子性,例如:AtomicInteger,通过volatile和CAS来实现。

##可见性
可见性是指当一个线程对共享变量进行了修改,那么另外的线程可以立即看到修改后的新值。例如上面我们的例子Reader线程会将init_value从内存缓存到CPU Cache中,也就是从主内存缓存到线程的工作内存中,Updater线程对init_value的修改对Reader线程是不可见的。

##有序性
有序性就是程序代码在执行过程中的先后顺序,Java在编译器以及运行期的优化,会产生指令重排序,导致了代码的执行顺序不一定是编写代码时的顺序,指令重排序是在不影响运行结果的情况下进行重排序,对于单线程来说指令重排序不会有问题。例如:

1
2
3
4
int x = 10;
int y = 0;
x++;
y=20;

但是在多线程的情况下,如果有序性得不到保证,那么很有可能就会出现问题,例如如下代码:

1
2
3
4
5
6
7
8
9
private boolean initialized = false;
private Context context = null;
public Context load(){
if(!initialized){
context = loadContext();
initialized = true;
}
return context;
}

在单线程情况,这段代码重排序,录入把 initialized = true;放到 context = loadContext();调换位置,不会有问题,但是如果多线程情况下第二个线程在调用load方法后可能会得到一个null。

#JMM如何保证原子性、可见性、有序性
JVM采用内存模型的机制来屏蔽哥哥平台与操作系统之间内存访问的差异,以实现让Java程序在各种平台下达到一致的内存访问效果。比如C语言中的整型变量,在某些平台下占用了两个字节的内存,在某些平台下则占用了四个字节的内存,Java则在任何平台下,int类型就是四个字节,这就是一直内存访问效果。

##JMM与原子性
在Java语言中,对基本数据类型的变量读取赋值操作都是原子性的,对引用类型的变量读取和赋值的操作也是原子性的。
1、x=10 原子操作
2、y=x 非原子操作【两个原子操作合在一起就不是原子操作】
1)执行线程从主内存中读取x的值(如果在工作内存就直接从工作内存获取)
2)在执行线程的工作内存中修改y的值为x,然后将y的值写入主内存之中。
3、y++ 自增操作 不是原子的,因为包含三个原子操作:
1)执行线程从主内存中读取y的值(如果y已存在于执行线程的工作内存中,则直接获取),然后将其存入当前线程的工作内存中。
2)在执行线程工作内存中为y执行加1的操作。
3)将y的值写入主内存。
结论:
a、多个原子性操作在一起就不在是原子性操作了。
b、简单的读取和赋值操作是原子性操作,将一个变量赋给另外一个变量的操作不是原子性操作。
c、Java内存模型只保证了基本读取和赋值的原子性操作,其他的均不保证,如果先更要使得某些代码片段具备原子性,需使用关键字synchronized,或者JUC中的lock。原子封装类:AtomicInteger等。
总结:volatile不具备保证原子性的语义

#JMM与可见性
在多线程的环境下,如果某个线程首次读取共享变量,首先到主内存获取该变量,然后存入工作内存中,以后只需要在工作内存中读取该变量即可。同样如果对该变量执行了修改的操作,则先将新值写入工作内存中,然后在刷新至主内存中。但是什么时候最新的值会被刷新至主内存是不太确定的,这就解释了为什么没有加volatile关键字的时候VolatileFoo中的Reader线程始终无法获取到init_value最新的变化。
Java提供三种方式来保证可见性:
1)使用关键字volatile,共享资源的读操作直接在内存中进行。写操作是先写工作内存,然后立刻刷新到主内存中。
2)synchronized保证可见性,synchronized关键字能够保证同一时刻只有一个线程获得锁,然后执行同步方法,并且还会确保在锁释放之前,会将对变量的修改刷新到主内存中。
3)通过JUC提供的显示锁lock也能够保证可见性,Lock的lock防范能够保证在同一时刻只有一个线程获得锁然后执行同步方法,并且会确保在锁释放之前会将对变量的修改刷新到主内存当中。

注:1、JVM禁用JIT即时编译器的后多线程环境下共享变量也具有可见性
例如下面代码,如果添加JVM参数 -server -Djava.compiler=NONE 或者 -Xint 多线程环境下flag共享变量就具有可见性
2、System.out.print() 输出流会加锁,也具有可见性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Created by chiyuanjia on 2019/7/26.
*/
public class Zuo {

private static boolean flag = true;

public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
print();
}
}).start();
TimeUnit.SECONDS.sleep(2);
flag = false;
System.out.println("flag set to false");
}

private static void print() {
while (flag) {
}
}
}

#JMM与有序性
在Java内存模型中,允许编译器和处理器对指令进行重排序,在单线程的情况下,重排序不会有问题,但是多线程的情况下,会影响程序的正确运行。
Java提供了三种保证有序性的方式:

  • 使用关键字volatile。
  • synchronized关键字。
  • 使用显示Lock。
    后两者是采用同步。

Java内存的天生有一些有序性规则-Happens-before原则。如果两个操作无法从happens-before推导出来,那么他们就无法保证有序性。

  • 程序次序规则:在一个线程内,代码按照编写时的次序执行,编写在后面的操作发生于编写在前面的操作之后,虚拟机还是会对程序代码的指令进行重排序,只要确保在一个线程内最终的结果和代码顺序执行的结果一致即可。
  • 锁定规则:一个unlock操作要先行发生在对同一个锁的lock操作。
  • volatile变量规则:对一个变量的写操作要早与对这个变量之后的读操作。意思是一个变量volatile,一个线程对它进行读,一个线程对它进行写,写操作一定是先行发生于读操作。
  • 传递规则:如果操作A先于操作B,而操作B又先于操作C,则A先于操作C。
  • 线程启动规则:Thread对象的start()方法先行于线程的任何动作。
  • 线程中断规则:对线程执行interrupt()方法肯定要优先于捕捉到中断信号,意
    思是如果线程收到了中断信号,那么在此之前势必要有interrupt()。
  • 线程的终结规则:线程中所有的操作都要先行发生于线程的终止检测,意识是线程的任务执行,逻辑单元执行肯定要发生于线程死亡之前。
  • 对象的终结规则:一个对象初始化的完成先行发生于finalize()方法之前,意思是先生后死。
    总结:volatile关键字保证有序性
    #volatile关键字深入解析
    volatile具有两个语义:
  • 保证了不同线程之间对共享变量操作时的可见性,也就是说当一个线程修改volatile修改的变量,另外一个线程会立即看到最新的值。
  • 禁止对指令进行重排序操作。
    (1)理解volatile保证可见性:
    VolatileFoo例子,Updater线程对init_value变量的每一次更改都会使得Reader线程能够看到(happens-before规则中,第三条volatile变量规则:对一个变量的写操作要早于对这个变量之后的读操作),步骤:
  1. Reader线程从主内存获取init_value的值为0,并且将其缓存到本地工作内存中。
  2. Updater线程将init_value的值在本地工作内存中修改为1,然后立即刷新至主内存中。
  3. Reader线程在本地工作内存中的init_value失效。(反映到硬件上就是CPU Cache 的 Cache Line失效)
  4. 由于Reader线程的工作内存中的init_value失效,因此需要从主内存中从新读取init_value的值。
    (2)理解volatile保证有序性
    volatile关键字对有序性的保证比较粗暴,直接静止JVM和处理器对volatile关键字修改的指令重排序,但是对volatile前后无依赖关系的指令则可以随便怎么排序。
    (3)理解volatile不保证原子性
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicInteger;

    /**
    * Created by chiyuanjia on 2019/7/21.
    * //没次的运行结果不一样,具体原因是 i++ 不是一个原子操作,i++操作分三步:
    * 1、从主内存中获取i的值,然后魂村至线程工作内存中。
    * 2、在线程工作内存中为进行加1的操作。
    * 3、将i的最新值写入主内存中。
    * 上面三个操作单独的每一个操作都是原子性操作,但是合起来就不是原子性操作了。
    */
    public class VolatileTest {

    //使用volatile修改共享资源i
    private static volatile int i = 0;
    //private static AtomicInteger i = new AtomicInteger(0);
    //10个线程
    private static final CountDownLatch latch = new CountDownLatch(10);

    private static void inc(){
    i++;
    //i.addAndGet(1);
    }

    public static void main(String[] args) throws InterruptedException {

    for (int i = 0; i< 10;i++){
    new Thread(new Runnable() {
    public void run() {

    for (int x = 0; x < 1000; x++){
    inc();
    }
    //使计算器减1
    latch.countDown();
    }
    }).start();
    }
    //等待所有的线程完成工作
    latch.await();
    System.out.println(i);

    }
    }

运行结果:

1
2
/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java -Dvisualvm.id=89098433865570 -Xms20m -Xmx20m -XX:+HeapDumpOnOutOfMemoryError -Didea.launcher.port=7533 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/lib/tools.jar:/Users/bruce/2dfire/workspace/jvmstudy/target/classes:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain com.just.study.jvm.concurrent.VolatileTest
9656

上面代码创建了10个线程,每个线程执行1000次对共享变量i的自增操作,但是最终结果可能不是10000,因为这段代码的 i++ 操作其实是3个原子操作合起来的,3个原子操作合起来就不是原子操作了。

#volatile的原理和实现机制
下面为OpenJDK下的unsafe.cpp源码,会发现被volatile装饰的变量存在于一个”lock”的前缀,源码如下:
image.png

#volatile的使用场景
虽然volatile有部分synchronized关键字的语义,但是volatile不可能完全替代synchronized关键字,因为volatile关键字不具备原子性操作语义,我们在使用volatile关键字的时候也是充分利用它的可见性以及有序性(防止重排序)特点。

  1. 开关控制-利用可见性的特点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * Created by chiyuanjia on 2019/7/25.
    */
    public class ThreadCloseable extends Thread {

    //volatile 关键字保证了started线程的可见性
    private volatile boolean started = true;

    @Override
    public void run() {

    while (started) {
    //do work
    System.out.println("I am working");
    }

    }

    public void shutdown() {
    this.started = false;
    }
    }

2.状态标记顺序性

1
2
3
4
5
6
7
8
9
10
11
12
//阻止重排序
private volatile boolean initialized = false;
private Context context;
public Context load() {
if(!initialized){
context = loadContext();
//如果这里的initialized变量不是volatile的,那么指令重排序后
//假设 initialized = true;重排到context = loadContext();之前多线程访问情况下就会出现问题
initialized = true;
}
return context;
}

3.单例模式的double-check也利用了volatile的有序性

#volatile和synchronized对比
(1)使用上的区别

  • volatile关键字只能用于修改实例变量或者类变量,不能用于修改方法以及方法参数和局部变量、常量等。
  • synchronized关键字不能用于对变量的修饰,只能用于修饰方法或者语句块。
  • volatile修饰的变量可以为null,synchronized关键字同步块的monitor对象不能为null。
    (2)对原子性的保证
  • volatile无法保证原子性。
  • 由于synchronized是一种排他的机制,因此被synchronized关键字修饰的同步代码是无法被中途打断的,因此其能够保证代码的原子性。
    (3)对可见性的保证
  • 两者均可以保证资源在多线程间的可见性,但是实现机制完全不同。
  • synchronized借助于JVM指令monitor enter 和 monitor exit对通过排他的方式使得同步代码串行化,在monitor exit时所有共享资源都会被刷新到主内存中。
  • 相比较于synchronized关键字volatile使用机器指令(偏硬件)“lock;”的方式迫使其他线程工作内存中的数据失效,需要到主内存中进行再次加载。
    (4)对有序性的保证
  • volatile关键字禁止JVM编译器以及处理器对其进行重排序。
  • 虽然synchronized关键字所修饰的同步方法也可以保证顺序性,但是这种顺序性是以程序的串行化执行换来的,在synchronized关键字所修饰的代码中代码指令也会发生指令重排序的情况,比如:
    1
    2
    3
    4
    5
    6
    synchronized(this){
    int x = 10;
    int y =20;
    x++;
    y = y+1;
    }

x和y谁先定义谁最先进行运算,对结果没有影响。达到了最终的输出结果和代码编写顺序的一致性。
(5)其他

  • volatile不会使线程陷入阻塞。
  • synchronized会使线程进入阻塞状态。