1.
1
@SpringBootApplication

这里先单独拎出

1
@SpringBootApplication
注解说一下,虽然我们一般不会主动去使用它。

1
2
3
4
5
6
@SpringBootApplication
public class SpringSecurityJwtGuideApplication {
      public static void main(java.lang.String[] args) {
        SpringApplication.run(SpringSecurityJwtGuideApplication.class, args);
    }
}

我们可以把

1
@SpringBootApplication
看作是
1
@Configuration
1
@EnableAutoConfiguration
1
@ComponentScan
注解的集合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.springframework.boot.autoconfigure;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = {
		@Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
		@Filter(type = FilterType.CUSTOM, classes = AutoConfigurationExcludeFilter.class) })
public @interface SpringBootApplication {
   ......
}
package org.springframework.boot;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Configuration
public @interface SpringBootConfiguration {
}

根据 SpringBoot 官网,这三个注解的作用分别是:

  • 1
    
    @EnableAutoConfiguration
    
    :启用 SpringBoot 的自动配置机制
  • 1
    
    @ComponentScan
    
    : 扫描被
    1
    
    @Component
    
    (
    1
    
    @Service
    
    ,
    1
    
    @Controller
    
    )注解的 bean,注解默认会扫描该类所在的包下所有的类。
  • 1
    
    @Configuration
    
    :允许在 Spring 上下文中注册额外的 bean 或导入其他配置类

2. Spring Bean 相关

2.1.
1
@Autowired

自动导入对象到类中,被注入进的类同样要被 Spring 容器管理比如:Service 类注入到 Controller 类中。

1
2
3
4
5
6
7
8
9
10
11
@Service
public class UserService {
  ......
}
@RestController
@RequestMapping("/users")
public class UserController {
   @Autowired
   private UserService userService;
   ......
}

2.2.
1
Component
,
1
@Repository
,
1
@Service
,
1
@Controller

我们一般使用

1
@Autowired
注解让 Spring 容器帮我们自动装配 bean。要想把类标识成可用于
1
@Autowired
注解自动装配的 bean 的类,可以采用以下注解实现:

  • 1
    
    @Component
    
    :通用的注解,可标注任意类为
    1
    
    Spring
    
    组件。如果一个 Bean 不知道属于哪个层,可以使用
    1
    
    @Component
    
    注解标注。
  • 1
    
    @Repository
    
    : 对应持久层即 Dao 层,主要用于数据库相关操作。
  • 1
    
    @Service
    
    : 对应服务层,主要涉及一些复杂的逻辑,需要用到 Dao 层。
  • 1
    
    @Controller
    
    : 对应 Spring MVC 控制层,主要用户接受用户请求并调用 Service 层返回数据给前端页面。

2.3.
1
@RestController

1
@RestController
注解是
1
@Controller和
@
1
ResponseBody
的合集,表示这是个控制器 bean,并且是将函数的返回值直 接填入 HTTP 响应体中,是 REST 风格的控制器。

Guide 哥:现在都是前后端分离,说实话我已经很久没有用过

1
@Controller
。如果你的项目太老了的话,就当我没说。

单独使用

1
@Controller
不加
1
@ResponseBody
的话一般使用在要返回一个视图的情况,这种情况属于比较传统的 Spring MVC 的应用,对应于前后端不分离的情况。
1
@Controller
+
1
@ResponseBody
返回 JSON 或 XML 形式数据

关于

1
@RestController
1
@Controller
的对比,请看这篇文章:@RestController vs @Controller

2.4.
1
@Scope

声明 Spring Bean 的作用域,使用方法:

1
2
3
4
5
@Bean
@Scope("singleton")
public Person personSingleton() {
    return new Person();
}

四种常见的 Spring Bean 的作用域:

  • singleton : 唯一 bean 实例,Spring 中的 bean 默认都是单例的。
  • prototype : 每次请求都会创建一个新的 bean 实例。
  • request : 每一次 HTTP 请求都会产生一个新的 bean,该 bean 仅在当前 HTTP request 内有效。
  • session : 每一次 HTTP 请求都会产生一个新的 bean,该 bean 仅在当前 HTTP session 内有效。

2.5.
1
Configuration

一般用来声明配置类,可以使用

1
@Component
注解替代,不过使用
1
Configuration
注解声明配置类更加语义化。

1
2
3
4
5
6
7
@Configuration
public class AppConfig {
    @Bean
    public TransferService transferService() {
        return new TransferServiceImpl();
    }
}

3. 处理常见的 HTTP 请求类型

5 种常见的请求类型:

  • GET :请求从服务器获取特定资源。举个例子:
    1
    
    GET /users
    
    (获取所有学生)
  • POST :在服务器上创建一个新的资源。举个例子:
    1
    
    POST /users
    
    (创建学生)
  • PUT :更新服务器上的资源(客户端提供更新后的整个资源)。举个例子:
    1
    
    PUT /users/12
    
    (更新编号为 12 的学生)
  • DELETE :从服务器删除特定的资源。举个例子:
    1
    
    DELETE /users/12
    
    (删除编号为 12 的学生)
  • PATCH :更新服务器上的资源(客户端提供更改的属性,可以看做作是部分更新),使用的比较少,这里就不举例子了。

3.1. GET 请求

1
@GetMapping("users")
等价于
1
@RequestMapping(value="/users",method=RequestMethod.GET)

1
2
3
4
@GetMapping("/users")
public ResponseEntity<List<User>> getAllUsers() {
 return userRepository.findAll();
}

3.2. POST 请求

1
@PostMapping("users")
等价于
1
@RequestMapping(value="/users",method=RequestMethod.POST)

关于

1
@RequestBody
注解的使用,在下面的“前后端传值”这块会讲到。

1
2
3
4
@PostMapping("/users")
public ResponseEntity<User> createUser(@Valid @RequestBody UserCreateRequest userCreateRequest) {
 return userRespository.save(user);
}

3.3. PUT 请求

1
@PutMapping("/users/{userId}")
等价于
1
@RequestMapping(value="/users/{userId}",method=RequestMethod.PUT)

1
2
3
4
5
@PutMapping("/users/{userId}")
public ResponseEntity<User> updateUser(@PathVariable(value = "userId") Long userId,
  @Valid @RequestBody UserUpdateRequest userUpdateRequest) {
  ......
}

3.4. DELETE 请求

1
@DeleteMapping("/users/{userId}")
等价于
1
@RequestMapping(value="/users/{userId}",method=RequestMethod.DELETE)

1
2
3
4
@DeleteMapping("/users/{userId}")
public ResponseEntity deleteUser(@PathVariable(value = "userId") Long userId){
  ......
}

3.5. PATCH 请求

一般实际项目中,我们都是 PUT 不够用了之后才用 PATCH 请求去更新数据。

1
2
3
4
5
  @PatchMapping("/profile")
  public ResponseEntity updateStudent(@RequestBody StudentUpdateRequest studentUpdateRequest) {
        studentRepository.updateDetail(studentUpdateRequest);
        return ResponseEntity.ok().build();
    }

4. 前后端传值

掌握前后端传值的正确姿势,是你开始 CRUD 的第一步!

4.1.
1
@PathVariable
1
@RequestParam

1
@PathVariable
用于获取路径参数,
1
@RequestParam
用于获取查询参数。

举个简单的例子:

1
2
3
4
5
6
@GetMapping("/klasses/{klassId}/teachers")
public List<Teacher> getKlassRelatedTeachers(
         @PathVariable("klassId") Long klassId,
         @RequestParam(value = "type", required = false) String type ) {
...
}

如果我们请求的 url 是:

1
/klasses/{123456}/teachers?type=web

那么我们服务获取到的数据就是:

1
klassId=123456,type=web

4.2.
1
@RequestBody

用于读取 Request 请求(可能是 POST,PUT,DELETE,GET 请求)的 body 部分并且Content-Type 为 application/json 格式的数据,接收到数据之后会自动将数据绑定到 Java 对象上去。系统会使用

1
HttpMessageConverter
或者自定义的
1
HttpMessageConverter
将请求的 body 中的 json 字符串转换为 java 对象。

我用一个简单的例子来给演示一下基本使用!

我们有一个注册的接口:

1
2
3
4
5
@PostMapping("/sign-up")
public ResponseEntity signUp(@RequestBody @Valid UserRegisterRequest userRegisterRequest) {
  userService.save(userRegisterRequest);
  return ResponseEntity.ok().build();
}

1
UserRegisterRequest
对象:

1
2
3
4
5
6
7
8
9
10
11
12
@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserRegisterRequest {
    @NotBlank
    private String userName;
    @NotBlank
    private String password;
    @FullName
    @NotBlank
    private String fullName;
}

我们发送 post 请求到这个接口,并且 body 携带 JSON 数据:

1
{"userName":"coder","fullName":"shuangkou","password":"123456"}

这样我们的后端就可以直接把 json 格式的数据映射到我们的

1
UserRegisterRequest
类上。

👉 需要注意的是:一个请求方法只可以有一个

1
@RequestBody
,但是可以有多个
1
@RequestParam
1
@PathVariable
。 如果你的方法必须要用两个
1
@RequestBody
来接受数据的话,大概率是你的数据库设计或者系统设计出问题了!

5. 读取配置信息

很多时候我们需要将一些常用的配置信息比如阿里云 oss、发送短信、微信认证的相关配置信息等等放到配置文件中。

下面我们来看一下 Spring 为我们提供了哪些方式帮助我们从配置文件中读取这些配置信息。

我们的数据源

1
application.yml
内容如下::

1
2
3
4
5
6
7
8
9
10
11
12
13
wuhan2020: 2020年初武汉爆发了新型冠状病毒,疫情严重,但是,我相信一切都会过去!武汉加油!中国加油!
my-profile:
  name: Guide哥
  email: koushuangbwcx@163.com
library:
  location: 湖北武汉加油中国加油
  books:
    - name: 天才基本法
      description: 二十二岁的林朝夕在父亲确诊阿尔茨海默病这天,得知自己暗恋多年的校园男神裴之即将出国深造的消息——对方考取的学校,恰是父亲当年为她放弃的那所。
    - name: 时间的秩序
      description: 为什么我们记得过去,而非未来?时间“流逝”意味着什么?是我们存在于时间之内,还是时间存在于我们之中?卡洛·罗韦利用诗意的文字,邀请我们思考这一亘古难题——时间的本质。
    - name: 了不起的我
      description: 如何养成一个新习惯?如何让心智变得更成熟?如何拥有高质量的关系? 如何走出人生的艰难时刻?

5.1.
1
@value
(常用)

使用

1
@Value("${property}")
读取比较简单的配置信息:

1
2
@Value("${wuhan2020}")
String wuhan2020;

5.2.
1
@ConfigurationProperties
(常用)

通过

1
@ConfigurationProperties
读取配置信息并与 bean 绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
@ConfigurationProperties(prefix = "library")
class LibraryProperties {
    @NotEmpty
    private String location;
    private List<Book> books;
    @Setter
    @Getter
    @ToString
    static class Book {
        String name;
        String description;
    }
  省略getter/setter
  ......
}

你可以像使用普通的 Spring bean 一样,将其注入到类中使用。

5.3.
1
PropertySource
(不常用)

1
@PropertySource
读取指定 properties 文件

1
2
3
4
5
6
7
8
@Component
@PropertySource("classpath:website.properties")
class WebSite {
    @Value("${url}")
    private String url;
  省略getter/setter
  ......
}

更多内容请查看我的这篇文章:《10 分钟搞定 SpringBoot 如何优雅读取配置文件?》 。

6. 参数校验

数据的校验的重要性就不用说了,即使在前端对数据进行校验的情况下,我们还是要对传入后端的数据再进行一遍校验,避免用户绕过浏览器直接通过一些 HTTP 工具直接向后端请求一些违法数据。

JSR(Java Specification Requests) 是一套 JavaBean 参数校验的标准,它定义了很多常用的校验注解,我们可以直接将这些注解加在我们 JavaBean 的属性上面,这样就可以在需要校验的时候进行校验了,非常方便!

校验的时候我们实际用的是 Hibernate Validator 框架。Hibernate Validator 是 Hibernate 团队最初的数据校验框架,Hibernate Validator 4.x 是 Bean Validation 1.0(JSR 303)的参考实现,Hibernate Validator 5.x 是 Bean Validation 1.1(JSR 349)的参考实现,目前最新版的 Hibernate Validator 6.x 是 Bean Validation 2.0(JSR 380)的参考实现。

SpringBoot 项目的 spring-boot-starter-web 依赖中已经有 hibernate-validator 包,不需要引用相关依赖。如下图所示(通过 idea 插件—Maven Helper 生成):

非 SpringBoot 项目需要自行引入相关依赖包,这里不多做讲解,具体可以查看我的这篇文章:《如何在 Spring/Spring Boot 中做参数校验?你需要了解的都在这里!》。

👉 需要注意的是: 所有的注解,推荐使用 JSR 注解,即

1
javax.validation.constraints
,而不是
1
org.hibernate.validator.constraints

6.1. 一些常用的字段验证的注解

  • 1
    
    @NotEmpty
    
    被注释的字符串的不能为 null 也不能为空
  • 1
    
    @NotBlank
    
    被注释的字符串非 null,并且必须包含一个非空白字符
  • 1
    
    @Null
    
    被注释的元素必须为 null
  • 1
    
    @NotNull
    
    被注释的元素必须不为 null
  • 1
    
    @AssertTrue
    
    被注释的元素必须为 true
  • 1
    
    @AssertFalse
    
    被注释的元素必须为 false
  • 1
    
    @Pattern(regex=,flag=)
    
    被注释的元素必须符合指定的正则表达式
  • 1
    
    @Email
    
    被注释的元素必须是 Email 格式。
  • 1
    
    @Min(value)
    
    被注释的元素必须是一个数字,其值必须大于等于指定的最小值
  • 1
    
    @Max(value)
    
    被注释的元素必须是一个数字,其值必须小于等于指定的最大值
  • 1
    
    @DecimalMin(value)
    
    被注释的元素必须是一个数字,其值必须大于等于指定的最小值
  • 1
    
    @DecimalMax(value)
    
    被注释的元素必须是一个数字,其值必须小于等于指定的最大值
  • 1
    
    @Size(max=, min=)
    
    被注释的元素的大小必须在指定的范围内
  • 1
    
    @Digits (integer, fraction)
    
    被注释的元素必须是一个数字,其值必须在可接受的范围内
  • 1
    
    @Past
    
    被注释的元素必须是一个过去的日期
  • 1
    
    @Future
    
    被注释的元素必须是一个将来的日期
  • ……

6.2. 验证请求体(RequestBody)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Person {
    @NotNull(message = "classId 不能为空")
    private String classId;
    @Size(max = 33)
    @NotNull(message = "name 不能为空")
    private String name;
    @Pattern(regexp = "((^Man$|^Woman$|^UGM$))", message = "sex 值不在可选范围")
    @NotNull(message = "sex 不能为空")
    private String sex;
    @Email(message = "email 格式不正确")
    @NotNull(message = "email 不能为空")
    private String email;
}

我们在需要验证的参数上加上了

1
@Valid
注解,如果验证失败,它将抛出
1
MethodArgumentNotValidException

1
2
3
4
5
6
7
8
@RestController
@RequestMapping("/api")
public class PersonController {
    @PostMapping("/person")
    public ResponseEntity<Person> getPerson(@RequestBody @Valid Person person) {
        return ResponseEntity.ok().body(person);
    }
}

6.3. 验证请求参数(Path Variables 和 Request Parameters)

一定一定不要忘记在类上加上

1
Validated
注解了,这个参数可以告诉 Spring 去校验方法参数。

1
2
3
4
5
6
7
8
9
@RestController
@RequestMapping("/api")
@Validated
public class PersonController {
    @GetMapping("/person/{id}")
    public ResponseEntity<Integer> getPersonByID(@Valid @PathVariable("id") @Max(value = 5,message = "超过 id 的范围了") Integer id) {
        return ResponseEntity.ok().body(id);
    }
}

更多关于如何在 Spring 项目中进行参数校验的内容,请看《如何在 Spring/Spring Boot 中做参数校验?你需要了解的都在这里!》这篇文章。

7. 全局处理 Controller 层异常

介绍一下我们 Spring 项目必备的全局处理 Controller 层异常。

相关注解:

  1. 1
    
    @ControllerAdvice
    
    :注解定义全局异常处理类
  2. 1
    
    @ExceptionHandler
    
    :注解声明异常处理方法

如何使用呢?拿我们在第 5 节参数校验这块来举例子。如果方法参数不对的话就会抛出

1
MethodArgumentNotValidException
,我们来处理这个异常。

1
2
3
4
5
6
7
8
9
10
11
@ControllerAdvice
@ResponseBody
public class GlobalExceptionHandler {
    /**
     * 请求参数异常处理
     */
    @ExceptionHandler(MethodArgumentNotValidException.class)
    public ResponseEntity<?> handleMethodArgumentNotValidException(MethodArgumentNotValidException ex, HttpServletRequest request) {
       ......
    }
}

更多关于 Spring Boot 异常处理的内容,请看我的这两篇文章:

  1. SpringBoot 处理异常的几种常见姿势
  2. 使用枚举简单封装一个优雅的 Spring Boot 全局异常处理!

8. JPA 相关

8.1. 创建表

1
@Entity
声明一个类对应一个数据库实体。

1
@Table
设置表名

1
2
3
4
5
6
7
8
9
10
@Entity
@Table(name = "role")
public class Role {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private String description;
    省略getter/setter......
}

8.2. 创建主键

1
@Id
:声明一个字段为主键。

使用

1
@Id
声明之后,我们还需要定义主键的生成策略。我们可以使用
1
@GeneratedValue
指定主键生成策略。

1.通过

1
@GeneratedValue
直接使用 JPA 内置提供的四种主键生成策略来指定主键生成策略。

1
2
3
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

JPA 使用枚举定义了 4 中常见的主键生成策略,如下:

Guide 哥:枚举替代常量的一种用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public enum GenerationType {
    /**
     * 使用一个特定的数据库表格来保存主键
     * 持久化引擎通过关系数据库的一张特定的表格来生成主键,
     */
    TABLE,
    /**
     *在某些数据库中,不支持主键自增长,比如Oracle、PostgreSQL其提供了一种叫做"序列(sequence)"的机制生成主键
     */
    SEQUENCE,
    /**
     * 主键自增长
     */
    IDENTITY,
    /**
     *把主键生成策略交给持久化引擎(persistence engine),
     *持久化引擎会根据数据库在以上三种主键生成 策略中选择其中一种
     */
    AUTO
}

1
@GeneratedValue
注解默认使用的策略是
1
GenerationType.AUTO

1
2
3
4
public @interface GeneratedValue {
    GenerationType strategy() default AUTO;
    String generator() default "";
}

一般使用 MySQL 数据库的话,使用

1
GenerationType.IDENTITY
策略比较普遍一点(分布式系统的话需要另外考虑使用分布式 ID)。

2.通过

1
@GenericGenerator
声明一个主键策略,然后
1
@GeneratedValue
使用这个策略

1
2
3
4
@Id
@GeneratedValue(generator = "IdentityIdGenerator")
@GenericGenerator(name = "IdentityIdGenerator", strategy = "identity")
private Long id;

等价于:

1
2
3
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

jpa 提供的主键生成策略有如下几种:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class DefaultIdentifierGeneratorFactory
		implements MutableIdentifierGeneratorFactory, Serializable, ServiceRegistryAwareService {
	@SuppressWarnings("deprecation")
	public DefaultIdentifierGeneratorFactory() {
		register( "uuid2", UUIDGenerator.class );
		register( "guid", GUIDGenerator.class );			// can be done with UUIDGenerator + strategy
		register( "uuid", UUIDHexGenerator.class );			// "deprecated" for new use
		register( "uuid.hex", UUIDHexGenerator.class ); 	// uuid.hex is deprecated
		register( "assigned", Assigned.class );
		register( "identity", IdentityGenerator.class );
		register( "select", SelectGenerator.class );
		register( "sequence", SequenceStyleGenerator.class );
		register( "seqhilo", SequenceHiLoGenerator.class );
		register( "increment", IncrementGenerator.class );
		register( "foreign", ForeignGenerator.class );
		register( "sequence-identity", SequenceIdentityGenerator.class );
		register( "enhanced-sequence", SequenceStyleGenerator.class );
		register( "enhanced-table", TableGenerator.class );
	}
	public void register(String strategy, Class generatorClass) {
		LOG.debugf( "Registering IdentifierGenerator strategy [%s] -> [%s]", strategy, generatorClass.getName() );
		final Class previous = generatorStrategyToClassNameMap.put( strategy, generatorClass );
		if ( previous != null ) {
			LOG.debugf( "    - overriding [%s]", previous.getName() );
		}
	}
}

8.3. 设置字段类型

1
@Column
声明字段。

示例:

设置属性 userName 对应的数据库字段名为 user_name,长度为 32,非空

1
2
@Column(name = "user_name", nullable = false, length=32)
private String userName;

设置字段类型并且加默认值,这个还是挺常用的。

1
2
Column(columnDefinition = "tinyint(1) default 1")
private Boolean enabled;

8.4. 指定不持久化特定字段

1
@Transient
:声明不需要与数据库映射的字段,在保存的时候不需要保存进数据库 。

如果我们想让

1
secrect
这个字段不被持久化,可以使用
1
@Transient
关键字声明。

1
2
3
4
5
6
Entity(name="USER")
public class User {
    ......
    @Transient
    private String secrect; // not persistent because of @Transient
}

除了

1
@Transient
关键字声明, 还可以采用下面几种方法:

1
2
3
static String secrect; // not persistent because of static
final String secrect = Satish; // not persistent because of final
transient String secrect; // not persistent because of transient

一般使用注解的方式比较多。

8.5. 声明大字段

1
@Lob
:声明某个字段为大字段。

1
2
@Lob
private String content;

更详细的声明:

1
2
3
4
5
6
@Lob
//指定 Lob 类型数据的获取策略, FetchType.EAGER 表示非延迟 加载,而 FetchType. LAZY 表示延迟加载 ;
@Basic(fetch = FetchType.EAGER)
//columnDefinition 属性指定数据表对应的 Lob 字段类型
@Column(name = "content", columnDefinition = "LONGTEXT NOT NULL")
private String content;

8.6. 创建枚举类型的字段

可以使用枚举类型的字段,不过枚举字段要用

1
@Enumerated
注解修饰。

1
2
3
4
5
6
7
8
public enum Gender {
    MALE("男性"),
    FEMALE("女性");
    private String value;
    Gender(String str){
        value=str;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
@Entity
@Table(name = "role")
public class Role {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private String description;
    @Enumerated(EnumType.STRING)
    private Gender gender;
    省略getter/setter......
}

数据库里面对应存储的是 MAIL/FEMAIL。

8.7. 增加审计功能

只要继承了

1
AbstractAuditBase
的类都会默认加上下面四个字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Data
@AllArgsConstructor
@NoArgsConstructor
@MappedSuperclass
@EntityListeners(value = AuditingEntityListener.class)
public abstract class AbstractAuditBase {
    @CreatedDate
    @Column(updatable = false)
    @JsonIgnore
    private Instant createdAt;
    @LastModifiedDate
    @JsonIgnore
    private Instant updatedAt;
    @CreatedBy
    @Column(updatable = false)
    @JsonIgnore
    private String createdBy;
    @LastModifiedBy
    @JsonIgnore
    private String updatedBy;
}

我们对应的审计功能对应地配置类可能是下面这样的(Spring Security 项目):

1
2
3
4
5
6
7
8
9
10
11
@Configuration
@EnableJpaAuditing
public class AuditSecurityConfiguration {
    @Bean
    AuditorAware<String> auditorAware() {
        return () -> Optional.ofNullable(SecurityContextHolder.getContext())
                .map(SecurityContext::getAuthentication)
                .filter(Authentication::isAuthenticated)
                .map(Authentication::getName);
    }
}

简单介绍一下上面设计到的一些注解:

  1. 1
    
    @CreatedDate
    
    : 表示该字段为创建时间时间字段,在这个实体被 insert 的时候,会设置值
  2. 1
    
    @CreatedBy
    
    :表示该字段为创建人,在这个实体被 insert 的时候,会设置值

    1
    
    @LastModifiedDate
    
    1
    
    @LastModifiedBy
    
    同理。

1
@EnableJpaAuditing
:开启 JPA 审计功能。

8.8. 删除/修改数据

1
@Modifying
注解提示 JPA 该操作是修改操作,注意还要配合
1
@Transactional
注解使用。

1
2
3
4
5
6
@Repository
public interface UserRepository extends JpaRepository<User, Integer> {
    @Modifying
    @Transactional(rollbackFor = Exception.class)
    void deleteByUserName(String userName);
}

8.9. 关联关系

  • 1
    
    @OneToOne
    
    声明一对一关系
  • 1
    
    @OneToMany
    
    声明一对多关系
  • 1
    
    @ManyToOne
    
    声明多对一关系
  • 1
    
    MangToMang
    
    声明多对多关系

更多关于 Spring Boot JPA 的文章请看我的这篇文章:一文搞懂如何在 Spring Boot 正确中使用 JPA

9. 事务
1
@Transactional

在要开启事务的方法上使用

1
@Transactional
注解即可!

1
2
3
4
@Transactional(rollbackFor = Exception.class)
public void save() {
  ......
}

我们知道 Exception 分为运行时异常 RuntimeException 和非运行时异常。在

1
@Transactional
注解中如果不配置
1
rollbackFor
属性,那么事物只会在遇到
1
RuntimeException
的时候才会回滚,加上
1
rollbackFor=Exception.class
,可以让事物在遇到非运行时异常时也回滚。

1
@Transactional
注解一般用在可以作用在
1
或者
1
方法
上。

  • 作用于类:当把
    1
    
    @Transactional 注解放在类上时,表示所有该类的
    
    public 方法都配置相同的事务属性信息。
  • 作用于方法:当类配置了
    1
    
    @Transactional
    
    ,方法也配置了
    1
    
    @Transactional
    
    ,方法的事务会覆盖类的事务配置信息。

更多关于关于 Spring 事务的内容请查看:

  1. 可能是最漂亮的 Spring 事务管理详解
  2. 一口气说出 6 种 @Transactional 注解失效场景

10. json 数据处理

10.1. 过滤 json 数据

1
@JsonIgnoreProperties
作用在类上用于过滤掉特定字段不返回或者不解析。

1
2
3
4
5
6
7
8
9
//生成json时将userRoles属性过滤
@JsonIgnoreProperties({"userRoles"})
public class User {
    private String userName;
    private String fullName;
    private String password;
    @JsonIgnore
    private List<UserRole> userRoles = new ArrayList<>();
}

1
@JsonIgnore
一般用于类的属性上,作用和上面的
1
@JsonIgnoreProperties
一样。

1
2
3
4
5
6
7
8
public class User {
    private String userName;
    private String fullName;
    private String password;
   //生成json时将userRoles属性过滤
    @JsonIgnore
    private List<UserRole> userRoles = new ArrayList<>();
}

10.2. 格式化 json 数据

1
@JsonFormat
一般用来格式化 json 数据。:

比如:

1
2
@JsonFormat(shape=JsonFormat.Shape.STRING, pattern="yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone="GMT")
private Date date;

10.3. 扁平化对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Getter
@Setter
@ToString
public class Account {
    @JsonUnwrapped
    private Location location;
    @JsonUnwrapped
    private PersonInfo personInfo;
  @Getter
  @Setter
  @ToString
  public static class Location {
     private String provinceName;
     private String countyName;
  }
  @Getter
  @Setter
  @ToString
  public static class PersonInfo {
    private String userName;
    private String fullName;
  }
}

未扁平化之前:

1
2
3
4
5
6
7
8
9
10
{
    "location": {
        "provinceName":"湖北",
        "countyName":"武汉"
    },
    "personInfo": {
        "userName": "coder1234",
        "fullName": "shaungkou"
    }
}

使用

1
@JsonUnwrapped
扁平对象之后:

1
2
3
4
5
6
7
8
9
10
@Getter
@Setter
@ToString
public class Account {
    @JsonUnwrapped
    private Location location;
    @JsonUnwrapped
    private PersonInfo personInfo;
    ......
}
1
2
3
4
5
6
{
  "provinceName":"湖北",
  "countyName":"武汉",
  "userName": "coder1234",
  "fullName": "shaungkou"
}

11. 测试相关

1
@ActiveProfiles
一般作用于测试类上, 用于声明生效的 Spring 配置文件。

1
2
3
4
5
6
@SpringBootTest(webEnvironment = RANDOM_PORT)
@ActiveProfiles("test")
@Slf4j
public abstract class TestBase {
  ......
}

1
@Test
声明一个方法为测试方法

1
@Transactional
被声明的测试方法的数据会回滚,避免污染测试数据。

1
@WithMockUser
Spring Security 提供的,用来模拟一个真实用户,并且可以赋予权限。

1
2
3
4
5
6
    @Test
    @Transactional
    @WithMockUser(username = "user-id-18163138155", authorities = "ROLE_TEACHER")
    void should_import_student_success() throws Exception {
        ......
    }

1. 什么是设计模式

在软件工程中,设计模式(design pattern)是对软件设计中普遍存在(反复出现)的各种问题 ,所提出的解决方案。这个术语是由埃里希·伽玛(Erich Gamma)等人在1990年代从建筑设计领 域引入到计算机科学的。

著名的4人帮: Erich Gamma,Richard Helm, Ralph Johnson ,John Vlissides (Gof)

《设计模式:可复用面向对象软件的基础》收录23种模式

2. 单例模式

单例对象的类必须保证只有一个实例存在。许多时候整个系统只需要拥有一个的全局对象,这样有利于我们协调系统整体的行为

比如:全局信息配置

单例模式最简单的实现:

public class Singleton {
	private Singleton() {
		System.out.println("Singleton is create");
	}
	private static Singleton instance = new Singleton();
	public static Singleton getInstance() {
		return instance;
	}
}

由私有构造方法和static来确定唯一性。

缺点:何时产生实例 不好控制

虽然我们知道,在类Singleton第一次被加载的时候,就产生了一个实例。

但是如果这个类中有其他属性

public class Singleton {
	public static int STATUS=1; 
	private Singleton() {
		System.out.println("Singleton is create");
	}
	private static Singleton instance = new Singleton();
	public static Singleton getInstance() {
		return instance;
	}
}

当使用

System.out.println(Singleton.STATUS);

这个实例就被产生了。也许此时你并不希望产生这个实例。

如果系统特别在意这个问题,这种单例的实现方法就不太好。

第二种单例模式的解决方式:

public class Singleton {
	private Singleton() {
		System.out.println("Singleton is create");
	}
	private static Singleton instance = null;
	public static synchronized Singleton getInstance() {
		if (instance == null)
			instance = new Singleton();
		return instance;
	}
}

让instance只有在调用getInstance()方式时被创建,并且通过synchronized来确保线程安全。

这样就控制了何时创建实例。

这种方法是延迟加载的典型。

但是有一个问题就是,在高并发的场景下性能会有影响,虽然只有一个判断就return了,但是在并发量很高的情况下,或多或少都会有点影响,因为都要去拿synchronized的锁。

为了高效,有了第三种方式:

public class StaticSingleton {
	private StaticSingleton(){  
		System.out.println("StaticSingleton is create");
	}
	private static class SingletonHolder {
		private static StaticSingleton instance = new StaticSingleton();
	}
	public static StaticSingleton getInstance() {
		return SingletonHolder.instance;
	}
}

由于加载一个类时,其内部类不会被加载。这样保证了只有调用getInstance()时才会产生实例,控制了生成实例的时间,实现了延迟加载。

并且去掉了synchronized,让性能更优,用static来确保唯一性。

3. 不变模式

一个类的内部状态创建后,在整个生命期间都不会发生变化时,就是不变类

不变模式不需要同步

创建一个不变的类:

public final class Product {
	// 确保无子类
	private final String no;
	// 私有属性,不会被其他对象获取
	private final String name;
	// final保证属性不会被2次赋值
	private final double price;

	public Product(String no, String name, double price) {
		// 在创建对象时,必须指定数据
		super();
		// 因为创建之后,无法进行修改
		this.no = no;
		this.name = name;
		this.price = price;
	}

	public String getNo() {
		return no;
	}

	public String getName() {
		return name;
	}

	public double getPrice() {
		return price;
	}

}

Java中不变的模式的案例有:

  • java.lang.String
  • java.lang.Boolean
  • java.lang.Byte
  • java.lang.Character
  • java.lang.Double
  • java.lang.Float
  • java.lang.Integer
  • java.lang.Long
  • java.lang.Short

4. Future模式

核心思想是异步调用

第一次的call_return由于任务还没完成,所以返回的是一个空的。

但是这个返回类似于购物中的订单,将来可以根据这个订单来得到一个结果。

所以这个Future模式意思就是,“未来”可以得到,就是指这个订单或者说是契约,“承诺”未来就会给结果。

Future模式简单的实现:

调用者得到的是一个Data,一开始可能是一个FutureData,因为RealData构建很慢。在未来的某个时间,可以通过FutureData来得到RealData。

代码实现:

public interface Data {     
	public String getResult (); 
}
public class FutureData implements Data {     
	protected RealData realdata = null;   //FutureData是RealData的包装     
	protected boolean isReady = false;     
	public synchronized void setRealData(RealData realdata) {         
		if (isReady) {              
			return;         
		}         
		this.realdata = realdata;         
		isReady = true;         
		notifyAll();    //RealData已经被注入,通知getResult()     
	}     
	public synchronized String getResult()//会等待RealData构造完成         
	{  
		while (!isReady) {             
			try {                 
				wait();    //一直等待,知道RealData被注入            
			} catch (InterruptedException e) {             
				}         
		}         
		return realdata.result;  //由RealData实现       
	} 
}
public class RealData implements Data {
	protected final String result;
	public RealData(String para) {
		// RealData的构造可能很慢,需要用户等待很久,这里使用sleep模拟
		StringBuffer sb = new StringBuffer();
		for (int i = 0; i < 10; i++) {
			sb.append(para);
			try {
				// 这里使用sleep,代替一个很慢的操作过程
				Thread.sleep(100);
			} catch (InterruptedException e) {
			}
		}
		result = sb.toString();
	}
	public String getResult() {
		return result;
	}
}
public class Client {     
	public Data request(final String queryStr) {         
		final FutureData future = new FutureData();         
		new Thread() {
			public void run() 
			{
				// RealData的构建很慢,           
				//所以在单独的线程中进行                
				RealData realdata = new RealData(queryStr);                 
				future.setRealData(realdata);             
			}                                                        
		}.start();         
		return future; // FutureData会被立即返回     
	} 
}
public static void main(String[] args) {
		Client client = new Client();
		// 这里会立即返回,因为得到的是FutureData而不是RealData
		Data data = client.request("name");
		System.out.println("请求完毕");
		try {
			// 这里可以用一个sleep代替了对其他业务逻辑的处理
			// 在处理这些业务逻辑的过程中,RealData被创建,从而充分利用了等待时间
			Thread.sleep(2000);
		} catch (InterruptedException e) {
		}
		// 使用真实的数据
		System.out.println("数据 = " + data.getResult());
	}

JDK中也有多Future模式的支持:

接下来使用JDK提供的类和方法来实现刚刚的代码:

import java.util.concurrent.Callable;

public class RealData implements Callable<String> {
	private String para;

	public RealData(String para) {
		this.para = para;
	}

	@Override
	public String call() throws Exception {
		StringBuffer sb = new StringBuffer();
		for (int i = 0; i < 10; i++) {
			sb.append(para);
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {

			}
		}
		return sb.toString();
	}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class FutureMain {
	public static void main(String[] args) throws InterruptedException,
			ExecutionException {
		// 构造FutureTask
		FutureTask<String> future = new FutureTask<String>(new RealData("a"));
		ExecutorService executor = Executors.newFixedThreadPool(1);
		// 执行FutureTask,相当于上例中的 client.request("a") 发送请求
		// 在这里开启线程进行RealData的call()执行
		executor.submit(future);
		System.out.println("请求完毕");
		try {
			// 这里依然可以做额外的数据操作,这里使用sleep代替其他业务逻辑的处理
			Thread.sleep(2000);
		} catch (InterruptedException e) {
		}
		// 相当于data.getResult (),取得call()方法的返回值
		// 如果此时call()方法没有执行完成,则依然会等待
		System.out.println("数据 = " + future.get());
	}
}

这里要注意的是FutureTask是即具有 Future功能又具有Runnable功能的类。所以又可以运行,最后还能get。

当然如果在调用到future.get()时,真实数据还没准备好,仍然会产生阻塞状况,直到数据准备完成。

当然还有更加简便的方式:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureMain2 {
	public static void main(String[] args) throws InterruptedException,
			ExecutionException {
		ExecutorService executor = Executors.newFixedThreadPool(1);
		// 执行FutureTask,相当于上例中的 client.request("a") 发送请求
		// 在这里开启线程进行RealData的call()执行
		Future<String> future = executor.submit(new RealData("a"));
		System.out.println("请求完毕");
		try {
			// 这里依然可以做额外的数据操作,这里使用sleep代替其他业务逻辑的处理
			Thread.sleep(2000);
		} catch (InterruptedException e) {
		}
		// 相当于data.getResult (),取得call()方法的返回值
		// 如果此时call()方法没有执行完成,则依然会等待
		System.out.println("数据 = " + future.get());
	}
}

由于Callable是有返回值的,可以直接返回future对象。

5. 生产者消费者

生产者-消费者模式是一个经典的多线程设计模式。它为多线程间的协作提供了良好的解决方案。 在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。生产者线 程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。生产者和消费者之间则通 过共享内存缓冲区进行通信。

1. 线程池的基本使用

1.1.为什么需要线程池

平时的业务中,如果要使用多线程,那么我们会在业务开始前创建线程,业务结束后,销毁线程。但是对于业务来说,线程的创建和销毁是与业务本身无关的,只关心线程所执行的任务。因此希望把尽可能多的cpu用在执行任务上面,而不是用在与业务无关的线程创建和销毁上面。而线程池则解决了这个问题,线程池的作用就是将线程进行复用。

1.2.JDK为我们提供了哪些支持

JDK中的相关类图如上图所示。

其中要提到的几个特别的类。

Callable类和Runable类相似,但是区别在于Callable有返回值。

ThreadPoolExecutor是线程池的一个重要实现。

而Executors是一个工厂类。

1.3.线程池的使用

1.3.1.线程池的种类

  • new FixedThreadPool 固定数量的线程池,线程池中的线程数量是固定的,不会改变。
  • new SingleThreadExecutor 单一线程池,线程池中只有一个线程。
  • new CachedThreadPool 缓存线程池,线程池中的线程数量不固定,会根据需求的大小进行改变。
  • new ScheduledThreadPool 计划任务调度的线程池,用于执行计划任务,比如每隔5分钟怎么样,
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

从方法上来看,显然 FixedThreadPool,SingleThreadExecutor,CachedThreadPool都是ThreadPoolExecutor的不同实例,只是参数不同。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
}

下面来简述下 ThreadPoolExecutor构造函数中参数的含义。

  • corePoolSize 线程池中核心线程数的数目
  • maximumPoolSize 线程池中最多能容纳多少个线程
  • keepAliveTime 当现在线程数目大于corePoolSize时,超过keepAliveTime时间后,多出corePoolSize的那些线程将被终结。
  • unit keepAliveTime的单位
  • workQueue 当任务数量很大,线程池中线程无法满足时,提交的任务会被放到阻塞队列中,线程空闲下来则会不断从阻塞队列中取数据。

这样在来看上面所说的FixedThreadPool,它的线程的核心数目和最大容纳数目都是一样的,以至于在工作期间,并不会创建和销毁线程。当任务数量很大,线程池中的线程无法满足时,任务将被保存到LinkedBlockingQueue中,而LinkedBlockingQueue的大小是Integer.MAX_VALUE。这就意味着,任务不断地添加,会使内存消耗越来越大。

而CachedThreadPool则不同,它的核心线程数量是0,最大容纳数目是Integer.MAX_VALUE,它的阻塞队列是SynchronousQueue,这是一个特别的队列,它的大小是0。由于核心线程数量是0,所以必然要将任务添加到SynchronousQueue中,这个队列只有一个线程在从中添加数据,同时另一个线程在从中获取数据时,才能成功。独自往这个队列中添加数据会返回失败。当返回失败时,则线程池开始扩展线程,这就是为什么CachedThreadPool的线程数目是不固定的。当60s该线程仍未被使用时,线程则被销毁。

1.4.线程池使用的小例子

1.4.1.简单线程池

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {
	public static class MyTask implements Runnable {
		@Override
		public void run() {
			System.out.println(System.currentTimeMillis() + "Thread ID:"
					+ Thread.currentThread().getId());
			try {
				Thread.sleep(1000);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		MyTask myTask = new MyTask();
		ExecutorService es = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 10; i++) {
			es.submit(myTask);
		}
	}
}

由于使用的newFixedThreadPool(5),但是启动了10个线程,所以每次执行5个,并且 可以很明显的看到线程的复用,ThreadId是重复的,也就是前5个任务和后5个任务都是同一批线程去执行的。

这里用的是

es.submit(myTask);

还有一种提交方式:

es.execute(myTask);

区别在于submit会返回一个Future对象,这个将在以后介绍。

1.4.2.ScheduledThreadPool

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {
	public static void main(String[] args) {
		ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
		//如果前面的任务还未完成,则调度不会启动。
		ses.scheduleWithFixedDelay(new Runnable() {

			@Override
			public void run() {
				try {
					Thread.sleep(1000);
					System.out.println(System.currentTimeMillis()/1000);
				} catch (Exception e) {
					// TODO: handle exception
				}

			}
		}, 0, 2, TimeUnit.SECONDS);//启动0秒后执行,然后周期2秒执行一次
	}
}

输出:

1454832514
1454832517
1454832520
1454832523
1454832526
...

由于任务执行需要1秒,任务调度必须等待前一个任务完成。也就是这里的每隔2秒的意思是,前一个任务完成后2秒再开启新的一个任务。

2. 扩展和增强线程池

2.1.回调接口

线程池中有一些回调的api来给我们提供扩展的操作。

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>()){

					@Override
					protected void beforeExecute(Thread t, Runnable r) {
						System.out.println("准备执行");
					}

					@Override
					protected void afterExecute(Runnable r, Throwable t) {
						System.out.println("执行完成");
					}

					@Override
					protected void terminated() {
						System.out.println("线程池退出");
					}

		};

我们可以通过实现ThreadPoolExecutor的子类去覆盖ThreadPoolExecutor的beforeExecute,afterExecute,terminated方法来实现在线程执行前后,线程池退出时的日志管理或其他操作。

2.2.拒绝策略

有时候,任务非常繁重,导致系统负载太大。在上面说过,当任务量越来越大时,任务都将放到FixedThreadPool的阻塞队列中,导致内存消耗太大,最终导致内存溢出。这样的情况是应该要避免的。因此当我们发现线程数量要超过最大线程数量时,我们应该放弃一些任务。丢弃时,我们应该把任务记下来,而不是直接丢掉。

ThreadPoolExecutor中还有另一个构造函数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

threadFactory我们在后面再介绍。

而handler就是拒绝策略的实现,它会告诉我们,如果任务不能执行了,该怎么做。

共有以上4种策略。

AbortPolicy:如果不能接受任务了,则抛出异常。

CallerRunsPolicy:如果不能接受任务了,则让调用的线程去完成。

DiscardOldestPolicy:如果不能接受任务了,则丢弃最老的一个任务,由一个队列来维护。

DiscardPolicy:如果不能接受任务了,则丢弃任务。

ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(),
				new RejectedExecutionHandler() {

					@Override
					public void rejectedExecution(Runnable r,
							ThreadPoolExecutor executor) {
						System.out.println(r.toString() + "is discard");
					}
				});

当然我们也可以自己实现RejectedExecutionHandler接口来自己定义拒绝策略。

2.3.自定义ThreadFactory

刚刚已经看到了,在ThreadPoolExecutor的构造函数中可以指定threadFactory。

线程池中的线程都是由线程工厂创建出来,我们可以自定义线程工厂。

默认的线程工厂:

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

3. ForkJoin

3.1.思想

就是分而治之的思想。

fork/join类似MapReduce算法,两者区别是:Fork/Join 只有在必要时如任务非常大的情况下才分割成一个个小任务,而 MapReduce总是在开始执行第一步进行分割。看来,Fork/Join更适合一个JVM内线程级别,而MapReduce适合分布式系统。

4.2.使用接口

RecursiveAction:无返回值

RecursiveTask:有返回值

4.3.简单例子

import java.util.ArrayList;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Long>{

	private static final int THRESHOLD = 10000;
	private long start;
	private long end;

	public CountTask(long start, long end) {
		super();
		this.start = start;
		this.end = end;
	}

	@Override
	protected Long compute() {
		long sum = 0;
		boolean canCompute = (end - start) < THRESHOLD;
		if(canCompute)
		{
			for (long i = start; i <= end; i++) {
				sum = sum + i;
			}
		}else
		{
			//分成100个小任务
			long step = (start + end)/100;
			ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
			long pos = start;
			for (int i = 0; i < 100; i++) {
				long lastOne = pos + step;
				if(lastOne > end )
				{
					lastOne = end;
				}
				CountTask subTask = new CountTask(pos, lastOne);
				pos += step + 1;
				subTasks.add(subTask);
				subTask.fork();//把子任务推向线程池
			}
			for (CountTask t : subTasks) {
				sum += t.join();//等待所有子任务结束
			}
		}
		return sum;
	}

	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		CountTask task = new CountTask(0, 200000L);
		ForkJoinTask<Long> result = forkJoinPool.submit(task);
		try {
			long res = result.get();
			System.out.println("sum = " + res);
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}

}

上述例子描述了一个累加和的任务。将累加任务分成100个任务,每个任务只执行一段数字的累加和,最后join后,把每个任务计算出的和再累加起来。

4.4.实现要素

4.4.1.WorkQueue与ctl

每一个线程都会有一个工作队列

static final class WorkQueue

在工作队列中,会有一系列对线程进行管理的字段

volatile int eventCount;   // encoded inactivation count; < 0 if inactive
        int nextWait;              // encoded record of next event waiter
        int nsteals;               // number of steals
        int hint;                  // steal index hint
        short poolIndex;           // index of this queue in pool
        final short mode;          // 0: lifo, > 0: fifo, < 0: shared
        volatile int qlock;        // 1: locked, -1: terminate; else 0
        volatile int base;         // index of next slot for poll
        int top;                   // index of next slot for push
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        final ForkJoinPool pool;   // the containing pool (may be null)
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        volatile Thread parker;    // == owner during call to park; else null
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        ForkJoinTask<?> currentSteal; // current non-local task being executed

这里要注意的是,JDK7和JDK8在ForkJoin的实现上有了很大的差别。我们这里介绍的是JDK8中的。 在线程池中,有时不是所有的线程都在执行的,部分线程会被挂起,那些挂起的线程会被存放到一个栈中。内部通过一个链表表示。

nextWait会指向下一个等待的线程。

poolIndex线程在线程池中的下标索引。

eventCount 在初始化时,eventCount与poolIndex有关。总共32位,第一位表示是否被激活,15位表示被挂起的次数eventCount,剩下的表示poolIndex。用一个字段来表示多个意思。

工作队列WorkQueue用ForkJoinTask<?>[] array来表示。而top,base来表示队列的两端,数据在这两者之间。

在ForkJoinPool中维护着ctl(64位long型)

volatile long ctl;
* Field ctl is a long packed with:
     * AC: Number of active running workers minus target parallelism (16 bits)
     * TC: Number of total workers minus target parallelism (16 bits)
     * ST: true if pool is terminating (1 bit)
     * EC: the wait count of top waiting thread (15 bits)
     * ID: poolIndex of top of Treiber stack of waiters (16 bits)

AC表示活跃的线程数减去并行度(大概就是CPU个数)

TC表示总的线程数减去并行度

ST表示线程池本身是否是激活的

EC表示顶端等待线程的挂起数

ID表示顶端等待线程的poolIndex

很明显ST+EC+ID就是我们刚刚所说的 eventCount 。

那么为什么明明5个变量,非要合成一个变量呢。其实用5个变量占用容量也差不多。

用一个变量代码的可读性上会差很多。

那么为什么用一个变量呢?其实这点才是最巧妙的地方,因为这5个变量是一个整体,在多线程中,如果用5个变量,那么当修改其中一个变量时,如何保证5个变量的整体性。那么用一个变量则就解决了这个问题。如果用锁解决,则会降低性能。

用一个变量则保证了数据的一致性和原子性。

在ForkJoin中队ctl的更改都是使用CAS操作,在前面系列的文章中已经介绍过,CAS是无锁的操作,性能很好。

由于CAS操作也只能针对一个变量,所以这种设计是最优的。

4.4.2.工作窃取

接下来要介绍下整个线程池的工作流程。

每个线程都会调用runWorker

final void runWorker(WorkQueue w) {
        w.growArray(); // allocate queue
        for (int r = w.hint; scan(w, r) == 0; ) {
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }

scan()函数是扫描是否有任务要做。

r是一个相对随机的数字。

private final int scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m;
        long c = ctl;                            // for consistency check
        if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
            for (int j = m + m + 1, ec = w.eventCount;;) {
                WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                if ((q = ws[(r - j) & m]) != null &&
                    (b = q.base) - q.top < 0 && (a = q.array) != null) {
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    if ((t = ((ForkJoinTask<?>)
                              U.getObjectVolatile(a, i))) != null) {
                        if (ec < 0)
                            helpRelease(c, ws, w, q, b);
                        else if (q.base == b &&
                                 U.compareAndSwapObject(a, i, t, null)) {
                            U.putOrderedInt(q, QBASE, b + 1);
                            if ((b + 1) - q.top < 0)
                                signalWork(ws, q);
                            w.runTask(t);
                        }
                    }
                    break;
                }
                else if (--j < 0) {
                    if ((ec | (e = (int)c)) < 0) // inactive or terminating
                        return awaitWork(w, c, ec);
                    else if (ctl == c) {         // try to inactivate and enqueue
                        long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
                        w.nextWait = e;
                        w.eventCount = ec | INT_SIGN;
                        if (!U.compareAndSwapLong(this, CTL, c, nc))
                            w.eventCount = ec;   // back out
                    }
                    break;
                }
            }
        }
        return 0;
    }

我们接下来看看scan方法,scan的一个参数是WorkQueue,上面已经说过,每个线程都会拥有一个WorkQueue,那么多个线程的WorkQueue就会保存在workQueues里面,r是一个随机数,通过r来找到某一个WorkQueue,在WorkQueue里面有所要做的任务。

然后通过WorkQueue的base,取得base的偏移量。

b = q.base
..
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
..

然后通过偏移量得到最后一个的任务,运行这个任务

t = ((ForkJoinTask<?>)U.getObjectVolatile(a, i))
..
w.runTask(t);
..

通过这个大概的分析理解了过程,我们发现,当前线程调用scan方法后,不会执行当前的WorkQueue中的任务,而是通过一个随机数r,来得到其他 WorkQueue的任务。这就是ForkJoinPool的主要的一个机理。

当前线程不会只着眼于自己的任务,而是优先完成其他任务。这样做来,防止了饥饿现象的发生。这样就预防了某些线程因为卡死或者其他原因而无法及时完成任务,或者某个线程的任务量很大,其他线程却没事可做。

然后来看看runTask方法

final void runTask(ForkJoinTask<?> task) {
            if ((currentSteal = task) != null) {
                ForkJoinWorkerThread thread;
                task.doExec();
                ForkJoinTask<?>[] a = array;
                int md = mode;
                ++nsteals;
                currentSteal = null;
                if (md != 0)
                    pollAndExecAll();
                else if (a != null) {
                    int s, m = a.length - 1;
                    ForkJoinTask<?> t;
                    while ((s = top - 1) - base >= 0 &&
                           (t = (ForkJoinTask<?>)U.getAndSetObject
                            (a, ((m & s) << ASHIFT) + ABASE, null)) != null) {
                        top = s;
                        t.doExec();
                    }
                }
                if ((thread = owner) != null) // no need to do in finally clause
                    thread.afterTopLevelExec();
            }
        }

有一个有趣的命名:currentSteal,偷得的任务,的确是刚刚解释的那样。

task.doExec();

将会完成这个任务。

完成了别人的任务以后,将会完成自己的任务。

通过得到top来获得自己任务第一个任务

while ((s = top - 1) - base >= 0 && (t = (ForkJoinTask<?>)U.getAndSetObject(a, ((m & s) << ASHIFT) + ABASE, null)) != null)
{
       top = s;
       t.doExec();
}

接下来,通过一个图来总结下刚刚线程池的流程

比如有T1,T2两个线程,T1会通过T2的base来获得T2的最后一个任务(当然实际上是通过一个随机数r来取得某个线程最后一个任务),T1也会通过自己的top来执行自己的第一个任务。反之,T2也会如此。

拿其他线程的任务都是从base开始拿的,自己拿自己的任务是从top开始拿的。这样可以减少冲突

如果没有找到其他任务

else if (--j < 0) {
                    if ((ec | (e = (int)c)) < 0) // inactive or terminating
                        return awaitWork(w, c, ec);
                    else if (ctl == c) {         // try to inactivate and enqueue
                        long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
                        w.nextWait = e;
                        w.eventCount = ec | INT_SIGN;
                        if (!U.compareAndSwapLong(this, CTL, c, nc))
                            w.eventCount = ec;   // back out
                    }
                    break;
                }

那么首先会通过一系列运行来改变ctl的值,获得了nc,然后用CAS将新的值赋值。然后就调用awaitWork()将线程进入等待状态(调用的前面系列文章中提到的unsafe的park方法)。

这里要说明的是改变ctl值这里,首先是将ctl中的AC-1,AC是占ctl的前16位,所以不能直接-1,而是通过AC_UNIT(0×1000000000000)来达到使ctl的前16位-1的效果。

前面说过eventCount中有保存poolIndex,通过poolIndex以及WorkQueue中的nextWait,就能遍历所有的等待线程。

在高并发Java(2):多线程基础中,我们已经初步提到了基本的线程同步操作。这次要提到的是在并发包中的同步控制工具。

1. 各种同步控制工具的使用

1.1 ReentrantLock

ReentrantLock感觉上是synchronized的增强版,synchronized的特点是使用简单,一切交给JVM去处理,但是功能上是比较薄弱的。在JDK1.5之前,ReentrantLock的性能要好于synchronized,由于对JVM进行了优化,现在的JDK版本中,两者性能是不相上下的。如果是简单的实现,不要刻意去使用ReentrantLock。

相比于synchronized,ReentrantLock在功能上更加丰富,它具有可重入、可中断、可限时、公平锁等特点。

首先我们通过一个例子来说明ReentrantLock最初步的用法:

package test;
 
import java.util.concurrent.locks.ReentrantLock;
 
public class Test implements Runnable
{
    public static ReentrantLock lock = new ReentrantLock();
    public static int i = 0;
 
    @Override
    public void run()
    {
        for (int j = 0; j < 10000000; j++)
        {
            lock.lock();
            try
            {
                i++;
            }
            finally
            {
                lock.unlock();
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        Test test = new Test();
        Thread t1 = new Thread(test);
        Thread t2 = new Thread(test);
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(i);
    }
 
}

有两个线程都对i进行++操作,为了保证线程安全,使用了 ReentrantLock,从用法上可以看出,与 synchronized相比,ReentrantLock就稍微复杂一点。因为必须在finally中进行解锁操作,如果不在 finally解锁,有可能代码出现异常锁没被释放,而synchronized是由JVM来释放锁。

那么ReentrantLock到底有哪些优秀的特点呢?

1.1.1 可重入

单线程可以重复进入,但要重复退出

lock.lock();
lock.lock();
try
{
    i++;
 
}           
finally
{
    lock.unlock();
    lock.unlock();
}

由于ReentrantLock是重入锁,所以可以反复得到相同的一把锁,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放(重入锁)。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized 块时,才释放锁。

public class Child extends Father implements Runnable{
    final static Child child = new Child();//为了保证锁唯一
    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            new Thread(child).start();
        }
    }
 
    public synchronized void doSomething() {
        System.out.println("1child.doSomething()");
        doAnotherThing(); // 调用自己类中其他的synchronized方法
    }
 
    private synchronized void doAnotherThing() {
        super.doSomething(); // 调用父类的synchronized方法
        System.out.println("3child.doAnotherThing()");
    }
 
    @Override
    public void run() {
        child.doSomething();
    }
}
class Father {
    public synchronized void doSomething() {
        System.out.println("2father.doSomething()");
    }
}

我们可以看到一个线程进入不同的 synchronized方法,是不会释放之前得到的锁的。所以输出还是顺序输出。所以synchronized也是重入锁

1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
1child.doSomething()
2father.doSomething()
3child.doAnotherThing()
...

1.1.2.可中断

与synchronized不同的是,ReentrantLock对中断是有响应的。中断相关知识查看高并发Java(2):多线程基础

普通的lock.lock()是不能响应中断的,lock.lockInterruptibly()能够响应中断。

我们模拟出一个死锁现场,然后用中断来处理死锁

package test;
 
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.locks.ReentrantLock;
 
public class Test implements Runnable
{
    public static ReentrantLock lock1 = new ReentrantLock();
    public static ReentrantLock lock2 = new ReentrantLock();
 
    int lock;
 
    public Test(int lock)
    {
        this.lock = lock;
    }
 
    @Override
    public void run()
    {
        try
        {
            if (lock == 1)
            {
                lock1.lockInterruptibly();
                try
                {
                    Thread.sleep(500);
                }
                catch (Exception e)
                {
                    // TODO: handle exception
                }
                lock2.lockInterruptibly();
            }
            else
            {
                lock2.lockInterruptibly();
                try
                {
                    Thread.sleep(500);
                }
                catch (Exception e)
                {
                    // TODO: handle exception
                }
                lock1.lockInterruptibly();
            }
        }
        catch (Exception e)
        {
            // TODO: handle exception
        }
        finally
        {
            if (lock1.isHeldByCurrentThread())
            {
                lock1.unlock();
            }
            if (lock2.isHeldByCurrentThread())
            {
                lock2.unlock();
            }
            System.out.println(Thread.currentThread().getId() + ":线程退出");
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        Test t1 = new Test(1);
        Test t2 = new Test(2);
        Thread thread1 = new Thread(t1);
        Thread thread2 = new Thread(t2);
        thread1.start();
        thread2.start();
        Thread.sleep(1000);
        //DeadlockChecker.check();
    }
 
    static class DeadlockChecker
    {
        private final static ThreadMXBean mbean = ManagementFactory
                .getThreadMXBean();
        final static Runnable deadlockChecker = new Runnable()
        {
            @Override
            public void run()
            {
                // TODO Auto-generated method stub
                while (true)
                {
                    long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
                    if (deadlockedThreadIds != null)
                    {
                        ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreadIds);
                        for (Thread t : Thread.getAllStackTraces().keySet())
                        {
                            for (int i = 0; i < threadInfos.length; i++)
                            {
                                if(t.getId() == threadInfos[i].getThreadId())
                                {
                                    t.interrupt();
                                }
                            }
                        }
                    }
                    try
                    {
                        Thread.sleep(5000);
                    }
                    catch (Exception e)
                    {
                        // TODO: handle exception
                    }
                }
 
            }
        };
 
        public static void check()
        {
            Thread t = new Thread(deadlockChecker);
            t.setDaemon(true);
            t.start();
        }
    }
 
}

上述代码有可能会发生死锁,线程1得到lock1,线程2得到lock2,然后彼此又想获得对方的锁。

我们用jstack查看运行上述代码后的情况

的确发现了一个死锁。

DeadlockChecker.check();方法用来检测死锁,然后把死锁的线程中断。中断后,线程正常退出。

1.1.3.可限时

超时不能获得锁,就返回false,不会永久等待构成死锁

使用lock.tryLock(long timeout, TimeUnit unit)来实现可限时锁,参数为时间和单位。

举个例子来说明下可限时:

package test;
 
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
 
public class Test implements Runnable
{
    public static ReentrantLock lock = new ReentrantLock();
 
    @Override
    public void run()
    {
        try
        {
            if (lock.tryLock(5, TimeUnit.SECONDS))
            {
                Thread.sleep(6000);
            }
            else
            {
                System.out.println("get lock failed");
            }
        }
        catch (Exception e)
        {
        }
        finally
        {
            if (lock.isHeldByCurrentThread())
            {
                lock.unlock();
            }
        }
    }
 
    public static void main(String[] args)
    {
        Test t = new Test();
        Thread t1 = new Thread(t);
        Thread t2 = new Thread(t);
        t1.start();
        t2.start();
    }
 
}

使用两个线程来争夺一把锁,当某个线程获得锁后,sleep6秒,每个线程都只尝试5秒去获得锁。

所以必定有一个线程无法获得锁。无法获得后就直接退出了。

输出:

get lock failed

1.1.4.公平锁

使用方式:

public ReentrantLock(boolean fair) 
 
public static ReentrantLock fairLock = new ReentrantLock(true);

一般意义上的锁是不公平的,不一定先来的线程能先得到锁,后来的线程就后得到锁。不公平的锁可能会产生饥饿现象。

公平锁的意思就是,这个锁能保证线程是先来的先得到锁。虽然公平锁不会产生饥饿现象,但是公平锁的性能会比非公平锁差很多。

1.2 Condition

Condition与ReentrantLock的关系就类似于synchronized与Object.wait()/signal()

await()方法会使当前线程等待,同时释放当前锁,当其他线程中使用signal()时或者signalAll()方法时,线 程会重新获得锁并继续执行。或者当线程被中断时,也能跳出等待。这和Object.wait()方法很相似。

awaitUninterruptibly()方法与await()方法基本相同,但是它并不会再等待过程中响应中断。 singal()方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。这和Obejct.notify()方法很类似。

这里就不再详细介绍了。举个例子来说明:

package test;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
 
public class Test implements Runnable
{
    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();
 
    @Override
    public void run()
    {
        try
        {
            lock.lock();
            condition.await();
            System.out.println("Thread is going on");
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            lock.unlock();
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        Test t = new Test();
        Thread thread = new Thread(t);
        thread.start();
        Thread.sleep(2000);
 
        lock.lock();
        condition.signal();
        lock.unlock();
    }
 
}

上述例子很简单,让一个线程await住,让主线程去唤醒它。condition.await()/signal只能在得到锁以后使用。

1.3.Semaphore

对于锁来说,它是互斥的排他的。意思就是,只要我获得了锁,没人能再获得了。

而对于Semaphore来说,它允许多个线程同时进入临界区。可以认为它是一个共享锁,但是共享的额度是有限制的,额度用完了,其他没有拿到额度的线程还是要阻塞在临界区外。当额度为1时,就相等于lock

下面举个例子:

package test;
 
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
 
public class Test implements Runnable
{
    final Semaphore semaphore = new Semaphore(5);
    @Override
    public void run()
    {
        try
        {
            semaphore.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + " done");
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final Test t = new Test();
        for (int i = 0; i < 20; i++)
        {
            executorService.submit(t);
        }
    }
 
}

有一个20个线程的线程池,每个线程都去 Semaphore的许可,Semaphore的许可只有5个,运行后可以看到,5个一批,一批一批地输出。

当然一个线程也可以一次申请多个许可

public void acquire(int permits) throws InterruptedException

1.4 ReadWriteLock

ReadWriteLock是区分功能的锁。读和写是两种不同的功能,读-读不互斥,读-写互斥,写-写互斥。

这样的设计是并发量提高了,又保证了数据安全。

使用方式:

private static ReentrantReadWriteLock readWriteLock=new ReentrantReadWriteLock(); 
private static Lock readLock = readWriteLock.readLock(); 
private static Lock writeLock = readWriteLock.writeLock();

详细例子可以查看 Java实现生产者消费者问题与读者写者问题,这里就不展开了。

1.5 CountDownLatch

倒数计时器

一种典型的场景就是火箭发射。在火箭发射前,为了保证万无一失,往往还要进行各项设备、仪器的检查。 只有等所有检查完毕后,引擎才能点火。这种场景就非常适合使用CountDownLatch。它可以使得点火线程 ,等待所有检查线程全部完工后,再执行

使用方式:

static final CountDownLatch end = new CountDownLatch(10);
end.countDown(); 
end.await();

示意图:

一个简单的例子:

package test;
 
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
public class Test implements Runnable
{
    static final CountDownLatch countDownLatch = new CountDownLatch(10);
    static final Test t = new Test();
    @Override
    public void run()
    {
        try
        {
            Thread.sleep(2000);
            System.out.println("complete");
            countDownLatch.countDown();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++)
        {
            executorService.execute(t);
        }
        countDownLatch.await();
        System.out.println("end");
        executorService.shutdown();
    }
 
}

主线程必须等待10个线程全部执行完才会输出”end”。

1.6 CyclicBarrier

和CountDownLatch相似,也是等待某些线程都做完以后再执行。与CountDownLatch区别在于这个计数器可以反复使用。比如,假设我们将计数器设置为10。那么凑齐第一批1 0个线程后,计数器就会归零,然后接着凑齐下一批10个线程

使用方式:

public CyclicBarrier(int parties, Runnable barrierAction) 
 
barrierAction就是当计数器一次计数完成后系统会执行的动作
 
await()

下面举个例子:

package test;
 
import java.util.concurrent.CyclicBarrier;
 
public class Test implements Runnable
{
    private String soldier;
    private final CyclicBarrier cyclic;
 
    public Test(String soldier, CyclicBarrier cyclic)
    {
        this.soldier = soldier;
        this.cyclic = cyclic;
    }
 
    @Override
    public void run()
    {
        try
        {
            //等待所有士兵到齐
            cyclic.await();
            dowork();
            //等待所有士兵完成工作
            cyclic.await();
        }
        catch (Exception e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
 
    }
 
    private void dowork()
    {
        // TODO Auto-generated method stub
        try
        {
            Thread.sleep(3000);
        }
        catch (Exception e)
        {
            // TODO: handle exception
        }
        System.out.println(soldier + ": done");
    }
 
    public static class BarrierRun implements Runnable
    {
 
        boolean flag;
        int n;
 
        public BarrierRun(boolean flag, int n)
        {
            super();
            this.flag = flag;
            this.n = n;
        }
 
        @Override
        public void run()
        {
            if (flag)
            {
                System.out.println(n + "个任务完成");
            }
            else
            {
                System.out.println(n + "个集合完成");
                flag = true;
            }
 
        }
 
    }
 
    public static void main(String[] args)
    {
        final int n = 10;
        Thread[] threads = new Thread[n];
        boolean flag = false;
        CyclicBarrier barrier = new CyclicBarrier(n, new BarrierRun(flag, n));
        System.out.println("集合");
        for (int i = 0; i < n; i++)
        {
            System.out.println(i + "报道");
            threads[i] = new Thread(new Test("士兵" + i, barrier));
            threads[i].start();
        }
    }
 
}

打印结果:

集合
0报道
1报道
2报道
3报道
4报道
5报道
6报道
7报道
8报道
9报道
10个集合完成
士兵5: done
士兵7: done
士兵8: done
士兵3: done
士兵4: done
士兵1: done
士兵6: done
士兵2: done
士兵0: done
士兵9: done
10个任务完成

1.7 LockSupport

提供线程阻塞原语

和suspend类似

LockSupport.park(); 
LockSupport.unpark(t1);

与suspend相比 不容易引起线程冻结

LockSupport的思想呢,和 Semaphore有点相似,内部有一个许可,park的时候拿掉这个许可,unpark的时候申请这个许可。所以如果unpark在park之前,是不会发生线程冻结的。

下面的代码是高并发Java(2):多线程基础中suspend示例代码,在使用suspend时会发生死锁。

package test;
 
import java.util.concurrent.locks.LockSupport;
 
public class Test
{
    static Object u = new Object();
    static TestSuspendThread t1 = new TestSuspendThread("t1");
    static TestSuspendThread t2 = new TestSuspendThread("t2");
 
    public static class TestSuspendThread extends Thread
    {
        public TestSuspendThread(String name)
        {
            setName(name);
        }
 
        @Override
        public void run()
        {
            synchronized (u)
            {
                System.out.println("in " + getName());
                //Thread.currentThread().suspend();
                LockSupport.park();
            }
        }
    }
 
    public static void main(String[] args) throws InterruptedException
    {
        t1.start();
        Thread.sleep(100);
        t2.start();
//        t1.resume();
//        t2.resume();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2);
        t1.join();
        t2.join();
    }
}

而使用 LockSupport则不会发生死锁。

另外

park()能够响应中断,但不抛出异常。中断响应的结果是,park()函数的返回,可以从Thread.interrupted()得到中断标志。

在JDK当中有大量地方使用到了park,当然LockSupport的实现也是使用unsafe.park()来实现的。

public static void park() {
        unsafe.park(false, 0L);
    }

1.8 ReentrantLock 的实现

下面来介绍下ReentrantLock的实现,ReentrantLock的实现主要由3部分组成:

  • CAS状态
  • 等待队列
  • park()

ReentrantLock的父类中会有一个state变量来表示同步的状态

/**
     * The synchronization state.
     */
    private volatile int state;

通过CAS操作来设置state来获取锁,如果设置成了1,则将锁的持有者给当前线程

final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

如果拿锁不成功,则会做一个申请

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

首先,再去申请下试试看tryAcquire,因为此时可能另一个线程已经释放了锁。

如果还是没有申请到锁,就addWaiter,意思是把自己加到等待队列中去

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

其间还会有多次尝试去申请锁,如果还是申请不到,就会被挂起

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

同理,如果在unlock操作中,就是释放了锁,然后unpark,这里就不具体讲了。

2. 并发容器及典型源码分析

2.1 ConcurrentHashMap

我们知道HashMap不是一个线程安全的容器,最简单的方式使HashMap变成线程安全就是使用Collections.synchronizedMap,它是对HashMap的一个包装

public static Map m=Collections.synchronizedMap(new HashMap());

同理对于List,Set也提供了相似方法。

但是这种方式只适合于并发量比较小的情况。

我们来看下synchronizedMap的实现

private final Map<K,V> m;     // Backing Map
        final Object      mutex;        // Object on which to synchronize
 
        SynchronizedMap(Map<K,V> m) {
            if (m==null)
                throw new NullPointerException();
            this.m = m;
            mutex = this;
        }
 
        SynchronizedMap(Map<K,V> m, Object mutex) {
            this.m = m;
            this.mutex = mutex;
        }
 
        public int size() {
            synchronized (mutex) {return m.size();}
        }
        public boolean isEmpty() {
            synchronized (mutex) {return m.isEmpty();}
        }
        public boolean containsKey(Object key) {
            synchronized (mutex) {return m.containsKey(key);}
        }
        public boolean containsValue(Object value) {
            synchronized (mutex) {return m.containsValue(value);}
        }
        public V get(Object key) {
            synchronized (mutex) {return m.get(key);}
        }
 
        public V put(K key, V value) {
            synchronized (mutex) {return m.put(key, value);}
        }
        public V remove(Object key) {
            synchronized (mutex) {return m.remove(key);}
        }
        public void putAll(Map<? extends K, ? extends V> map) {
            synchronized (mutex) {m.putAll(map);}
        }
        public void clear() {
            synchronized (mutex) {m.clear();}
        }

它会将HashMap包装在里面,然后将HashMap的每个操作都加上synchronized。

由于每个方法都是获取同一把锁(mutex),这就意味着,put和remove等操作是互斥的,大大减少了并发量。

下面来看下ConcurrentHashMap是如何实现的

public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

在 ConcurrentHashMap内部有一个Segment段,它将大的HashMap切分成若干个段(小的HashMap),然后让数据在每一段上Hash,这样多个线程在不同段上的Hash操作一定是线程安全的,所以只需要同步同一个段上的线程就可以了,这样实现了锁的分离,大大增加了并发量。

在使用ConcurrentHashMap.size时会比较麻烦,因为它要统计每个段的数据和,在这个时候,要把每一个段都加上锁,然后再做数据统计。这个就是把锁分离后的小小弊端,但是size方法应该是不会被高频率调用的方法。

在实现上,不使用synchronized和lock.lock而是尽量使用trylock,同时在HashMap的实现上,也做了一点优化。这里就不提了。

2.2 BlockingQueue

BlockingQueue不是一个高性能的容器。但是它是一个非常好的共享数据的容器。是典型的生产者和消费者的实现。

1 无锁类的原理详解

1.1 CAS

CAS算法的过程是这样:它包含3个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后,CAS返回当前V的真实值。CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余均会失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰,并进行恰当的处理。

我们会发现,CAS的步骤太多,有没有可能在判断V和E相同后,正要赋值时,切换了线程,更改了值。造成了数据不一致呢?

事实上,这个担心是多余的。CAS整一个操作过程是一个原子操作,它是由一条CPU指令完成的。

1.2 CPU指令

CAS的CPU指令是cmpxchg

指令代码如下:

/*
    accumulator = AL, AX, or EAX, depending on whether
    a byte, word, or doubleword comparison is being performed
    */
    if(accumulator == Destination) {
    ZF = 1;
    Destination = Source;
    }
    else {
    ZF = 0;
    accumulator = Destination;
    }

目标值和寄存器里的值相等的话,就设置一个跳转标志,并且把原始数据设到目标里面去。如果不等的话,就不设置跳转标志了。

Java当中提供了很多无锁类,下面来介绍下无锁类。

2 无所类的使用

我们已经知道,无锁比阻塞效率要高得多。我们来看看Java是如何实现这些无锁类的。

2.1. AtomicInteger

AtomicInteger和Integer一样,都继承与Number类

public class AtomicInteger extends Number implements java.io.Serializable

AtomicInteger里面有很多CAS操作,典型的有:

public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

这里来解释一下unsafe.compareAndSwapInt方法,他的意思是,对于this这个类上的偏移量为valueOffset的变量值如果与期望值expect相同,那么把这个变量的值设为update。

其实偏移量为valueOffset的变量就是value

static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
}

我们此前说过,CAS是有可能会失败的,但是失败的代价是很小的,所以一般的实现都是在一个无限循环体内,直到成功为止。

public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

2.2 Unsafe

从类名就可知,Unsafe操作是非安全的操作,比如:

  • 根据偏移量设置值(在刚刚介绍的AtomicInteger中已经看到了这个功能)
  • park()(把这个线程停下来,在以后的Blog中会提到)
  • 底层的CAS操作

非公开API,在不同版本的JDK中,可能有较大差异

2.3. AtomicReference

前面已经提到了AtomicInteger,当然还有AtomicBoolean,AtomicLong等等,都大同小异。

这里要介绍的是AtomicReference。

AtomicReference是一种模板类

public class AtomicReference<V>  implements java.io.Serializable

它可以用来封装任意类型的数据。

比如String

package test;
 
import java.util.concurrent.atomic.AtomicReference;
 
public class Test
{ 
    public final static AtomicReference<String> atomicString = new AtomicReference<String>("hosee");
    public static void main(String[] args)
    {
        for (int i = 0; i < 10; i++)
        {
            final int num = i;
            new Thread() {
                public void run() {
                    try
                    {
                        Thread.sleep(Math.abs((int)Math.random()*100));
                    }
                    catch (Exception e)
                    {
                        e.printStackTrace();
                    }
                    if (atomicString.compareAndSet("hosee", "ztk"))
                    {
                        System.out.println(Thread.currentThread().getId() + "Change value");
                    }else {
                        System.out.println(Thread.currentThread().getId() + "Failed");
                    }
                };
            }.start();
        }
    }
}

结果:

10Failed
13Failed
9Change value
11Failed
12Failed
15Failed
17Failed
14Failed
16Failed
18Failed

可以看到只有一个线程能够修改值,并且后面的线程都不能再修改。

2.4.AtomicStampedReference

我们会发现CAS操作还是有一个问题的

比如之前的AtomicInteger的incrementAndGet方法

public final int incrementAndGet() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

假设当前value=1当某线程int current = get()执行后,切换到另一个线程,这个线程将1变成了2,然后又一个线程将2又变成了1。此时再切换到最开始的那个线程,由于value仍等于1,所以还是能执行CAS操作,当然加法是没有问题的,如果有些情况,对数据的状态敏感时,这样的过程就不被允许了。

此时就需要AtomicStampedReference类。

其内部实现一个Pair类来封装值和时间戳。

private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }

这个类的主要思想是加入时间戳来标识每一次改变。

//比较设置 参数依次为:期望值 写入新值 期望时间戳 新时间戳
public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

当期望值等于当前值,并且期望时间戳等于现在的时间戳时,才写入新值,并且更新新的时间戳。

这里举个用AtomicStampedReference的场景,可能不太适合,但是想不到好的场景了。

场景背景是,某公司给余额少的用户免费充值,但是每个用户只能充值一次。

package test;
 
import java.util.concurrent.atomic.AtomicStampedReference;
 
public class Test
{
    static AtomicStampedReference<Integer> money = new AtomicStampedReference<Integer>(
            19, 0);
 
    public static void main(String[] args)
    {
        for (int i = 0; i < 3; i++)
        {
            final int timestamp = money.getStamp();
            new Thread()
            {
                public void run()
                {
                    while (true)
                    {
                        while (true)
                        {
                            Integer m = money.getReference();
                            if (m < 20)
                            {
                                if (money.compareAndSet(m, m + 20, timestamp,
                                        timestamp + 1))
                                {
                                    System.out.println("充值成功,余额:"
                                            + money.getReference());
                                    break;
                                }
                            }
                            else
                            {
                                break;
                            }
                        }
                    }
                };
            }.start();
        }
 
        new Thread()
        {
            public void run()
            {
                for (int i = 0; i < 100; i++)
                {
                    while (true)
                    {
                        int timestamp = money.getStamp();
                        Integer m = money.getReference();
                        if (m > 10)
                        {
                            if (money.compareAndSet(m, m - 10, timestamp,
                                    timestamp + 1))
                            {
                                System.out.println("消费10元,余额:"
                                            + money.getReference());
                                break;
                            }
                        }else {
                            break;
                        }
                    }
                    try
                    {
                        Thread.sleep(100);
                    }
                    catch (Exception e)
                    {
                        // TODO: handle exception
                    }
                }
            };
        }.start();
    }
 
}

解释下代码,有3个线程在给用户充值,当用户余额少于20时,就给用户充值20元。有100个线程在消费,每次消费10元。用户初始有9元,当使用AtomicStampedReference来实现时,只会给用户充值一次,因为每次操作使得时间戳+1。运行结果:

充值成功余额:39
消费10元余额:29
消费10元余额:19
消费10元余额:9

如果使用AtomicReference或者 Atomic Integer来实现就会造成多次充值。

充值成功余额:39
消费10元余额:29
消费10元余额:19
充值成功余额:39
消费10元余额:29
消费10元余额:19
充值成功余额:39
消费10元余额:29

2.5. AtomicIntegerArray

与AtomicInteger相比,数组的实现不过是多了一个下标。

public final boolean compareAndSet(int i, int expect, int update) {
        return compareAndSetRaw(checkedByteOffset(i), expect, update);
    }

它的内部只是封装了一个普通的array

private final int[] array;

里面有意思的是运用了二进制数的前导零来算数组中的偏移量。

shift = 31 - Integer.numberOfLeadingZeros(scale);

前导零的意思就是比如8位表示12,00001100,那么前导零就是1前面的0的个数,就是4。

具体偏移量如何计算,这里就不再做介绍了。

2.6. AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater类的主要作用是让普通变量也享受原子操作。

就比如原本有一个变量是int型,并且很多地方都应用了这个变量,但是在某个场景下,想让int型变成AtomicInteger,但是如果直接改类型,就要改其他地方的应用。AtomicIntegerFieldUpdater就是为了解决这样的问题产生的。

package test;
 
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
public class Test
{
    public static class V{
        int id;
        volatile int score;
        public int getScore()
        {
            return score;
        }
        public void setScore(int score)
        {
            this.score = score;
        }
 
    }
    public final static AtomicIntegerFieldUpdater<V> vv = AtomicIntegerFieldUpdater.newUpdater(V.class, "score");
 
    public static AtomicInteger allscore = new AtomicInteger(0);
 
    public static void main(String[] args) throws InterruptedException
    {
        final V stu = new V();
        Thread[] t = new Thread[10000];
        for (int i = 0; i < 10000; i++)
        {
            t[i] = new Thread() {
                @Override
                public void run()
                {
                    if(Math.random()>0.4)
                    {
                        vv.incrementAndGet(stu);
                        allscore.incrementAndGet();
                    }
                }
            };
            t[i].start();
        }
        for (int i = 0; i < 10000; i++)
        {
            t[i].join();
        }
        System.out.println("score="+stu.getScore());
        System.out.println("allscore="+allscore);
    }
}

上述代码将score使用 AtomicIntegerFieldUpdater变成 AtomicInteger。保证了线程安全。

这里使用allscore来验证,如果score和allscore数值相同,则说明是线程安全的。

小说明:

  • Updater只能修改它可见范围内的变量。因为Updater使用反射得到这个变量。如果变量不可见,就会出错。比如如果某变量申明为private,就是不可行的。
  • 为了确保变量被正确的读取,它必须是volatile类型的。如果我们原有代码中未申明这个类型,那么简单得申明一下就行,这不会引起什么问题。
  • 由于CAS操作会通过对象实例中的偏移量直接进行赋值,因此,它不支持static字段(Unsafe.objectFieldOffset()不支持静态变量)。