4年前 (2021-07-20)  Java系列 |   抢沙发  3667 
文章评分 1 次,平均分 5.0

Spring5.0中添加了反应式堆栈web框架Spring WebFlux。它是完全无阻塞的,支持反应流背压,并在Netty、Undertow和servlet3.1+容器等服务器上运行。在这个Spring webflux教程中,我们将学习反应式编程背后的基本概念、webflux api和一个功能齐全的helloworld示例。

反应式编程

反应式编程是一种编程范式,它提倡异步、非阻塞、事件驱动的数据处理方法。反应式编程涉及将数据和事件建模为可观察的数据流,并实现数据处理例程以对这些流中的变化作出反应。

在深入了解被动世界之前,首先要了解阻塞和非阻塞请求处理之间的区别。

阻塞与非阻塞(异步)请求处理

阻止请求处理

在传统的MVC应用程序中,当请求到达服务器时,会创建一个servlet线程。它将请求委托给工作线程进行I/O操作,如数据库访问等。在工作线程繁忙期间,servlet线程(请求线程)保持等待状态,因此被阻塞。它也称为同步请求处理。

Spring WebFlux使用教程

由于服务器可以有有限数量的请求线程,它限制了服务器在最大服务器负载下处理该数量的请求的能力。它可能会影响服务器的性能并限制服务器功能的充分利用。

非阻塞请求处理

在非阻塞或异步请求处理中,没有线程处于等待状态。通常只有一个请求线程接收请求。

所有传入请求都带有事件处理程序和回调信息。请求线程将传入的请求委托给线程池(通常是少量线程),线程池将请求委托给它的处理函数,并立即开始处理来自请求线程的其他传入请求。

处理函数完成后,池中的一个线程收集响应并将其传递给回调函数。

Spring WebFlux使用教程

线程的非阻塞特性有助于扩展应用程序的性能。少量线程意味着更少的内存利用率和更少的上下文切换。

什么是反应式编程?

术语“反应式”指的是建立在对变化作出反应的基础上的编程模型。它是围绕发布者-订阅者模式(observer模式)构建的。在反应式编程风格中,我们请求资源并开始执行其他事情。当数据可用时,我们可以通过回调函数得到通知和数据通知。在回调函数中,我们根据应用程序/用户的需要处理响应。

要记住的一点是背压。在非阻塞代码中,控制事件的速率变得非常重要,这样快速的生产者就不会压倒其目的地。

反应式web编程对于具有流式数据的应用程序以及使用流式数据并将其流式传输给用户的客户端来说非常有用。对于开发传统的CRUD应用程序来说,这并不是一件好事。如果你正在开发下一个拥有大量数据的Facebook或Twitter,那么一个反应式API可能正是你想要的。

反应流API

新的Reactive Streams API由Netflix、Pivotal、Lightbend、RedHat、Twitter和Oracle等公司的工程师创建,现在是Java 9的一部分。它定义了四个接口:

Publisher:根据从订阅服务器收到的请求,向订阅服务器发出一系列事件。发布者可以为多个订户提供服务。

它有一个单一的方法:

public interface Publisher<T> 
{
    public void subscribe(Subscriber<? super T> s);
}

Subscriber:接收和处理发布服务器发出的事件。请注意,在调用Subscription#request(long)发出请求信号之前,不会收到任何通知。

它有四种方法来处理收到的各种响应。

public interface Subscriber<T> 
{
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Processor:表示由订阅者和发布者组成的处理阶段,并遵守订阅者和发布者的契约。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> 
{
}

两种流行的反应流实现是RxJava(https://github.com/ReactiveX/RxJava)和Project Reactor(https://projectreactor.io/).

什么是Spring WebFlux

Spring WebFlux是springmvc的并行版本,支持完全无阻塞的反应流。它支持背压概念,并使用Netty作为内置服务器来运行响应式应用程序。如果您熟悉spring  mvc编程风格,那么也可以轻松地使用webflux。

Spring WebFlux使用project reactor作为反应库。反应器是一个反应流库,因此,所有操作人员都支持无阻塞背压。它是与Spring密切合作开发的。

Spring WebFlux大量使用两个发布服务器:

Mono:返回0或1个元素。

Mono<String> mono = Mono.just("Alex");
Mono<String> mono = Mono.empty();

Flux:返回0…N个元素。通量可以是无止境的,这意味着它可以永远保持发射元素。它还可以返回一系列元素,然后在返回所有元素后发送完成通知。

Flux<String> flux = Flux.just("A", "B", "C");
Flux<String> flux = Flux.fromArray(new String[]{"A", "B", "C"});
Flux<String> flux = Flux.fromIterable(Arrays.asList("A", "B", "C"));
 
//To subscribe call method
 
flux.subscribe();

在Spring WebFlux中,我们调用返回monosfluxes的反应式api/函数,您的控制器将返回monos和fluxes。当您调用返回mono或flux的API时,它将立即返回。函数调用的结果将在可用时通过mono或flux传递给您。

要构建真正的无阻塞应用程序,我们必须将其所有组件创建/使用为无阻塞,即客户端、控制器、中间服务甚至数据库。如果他们中的一个阻止了请求,我们的目标就会失败。

Spring Boot WebFlux示例

在这个springboot2应用程序中,我创建了员工管理系统。我之所以选择它,是因为在学习的过程中,您可以将它与传统的MVC风格的应用程序进行比较。为了使其完全无阻塞,我使用mongodb作为后端数据库。

Maven依赖项

包括spring-boot-starter-webflux, spring-boot-starter-data-mongodb-reactive, spring-boot-starter-test 和 reactor-test依赖项。

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
 
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.1.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
 
    <groupId>com.howtodoinjava</groupId>
    <artifactId>spring-webflux-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
 
    <name>spring-webflux-demo</name>
    <url>http://maven.apache.org</url>
 
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
 
        <dependency>
            <groupId>javax.xml.bind</groupId>
            <artifactId>jaxb-api</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
 
</project>

WebFlux配置

import org.springframework.context.annotation.Configuration;
 
@Configuration
@EnableWebFlux
public class WebFluxConfig implements WebFluxConfigurer 
{   
}

MongoDB配置

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
 
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
 
@Configuration
@EnableReactiveMongoRepositories(basePackages = "com.howtodoinjava.demo.dao")
public class MongoConfig extends AbstractReactiveMongoConfiguration 
{   
    @Value("${port}")
    private String port;
     
    @Value("${dbname}")
    private String dbName;
 
    @Override
    public MongoClient reactiveMongoClient() {
        return MongoClients.create();
    }
 
    @Override
    protected String getDatabaseName() {
        return dbName;
    }
 
    @Bean
    public ReactiveMongoTemplate reactiveMongoTemplate() {
        return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
    }
}

应用程序配置

import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
 
@Configuration
public class AppConfig 
{
    @Bean
    public static PropertyPlaceholderConfigurer getPropertyPlaceholderConfigurer() 
    {
        PropertyPlaceholderConfigurer ppc = new PropertyPlaceholderConfigurer();
        ppc.setLocation(new ClassPathResource("application.properties"));
        ppc.setIgnoreUnresolvablePlaceholders(true);
        return ppc;
    }
}

Properties文件

port=27017
dbname=testdb

Logging配置

<configuration>
 
    <appender name="STDOUT"
        class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{5} - %msg%n
            </pattern>
        </encoder>
    </appender>
 
    <logger name="org.springframework" level="DEBUG"
        additivity="false">
        <appender-ref ref="STDOUT" />
    </logger>
 
    <root level="ERROR">
        <appender-ref ref="STDOUT" />
    </root>
 
</configuration>

Spring Boot应用程序

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
public class WebfluxFunctionalApp {
 
    public static void main(String[] args) {
        SpringApplication.run(WebfluxFunctionalApp.class, args);
    }
}

Rest Controller

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
 
import com.howtodoinjava.demo.model.Employee;
import com.howtodoinjava.demo.service.EmployeeService;
 
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@RestController
public class EmployeeController {
    @Autowired
    private EmployeeService employeeService;
 
    @RequestMapping(value = { "/create", "/" }, method = RequestMethod.POST)
    @ResponseStatus(HttpStatus.CREATED)
    public void create(@RequestBody Employee e) {
        employeeService.create(e);
    }
 
    @RequestMapping(value = "/{id}", method = RequestMethod.GET)
    public ResponseEntity<Mono<Employee>> findById(@PathVariable("id") Integer id) {
        Mono<Employee> e = employeeService.findById(id);
        HttpStatus status = e != null ? HttpStatus.OK : HttpStatus.NOT_FOUND;
        return new ResponseEntity<Mono<Employee>>(e, status);
    }
 
    @RequestMapping(value = "/name/{name}", method = RequestMethod.GET)
    public Flux<Employee> findByName(@PathVariable("name") String name) {
        return employeeService.findByName(name);
    }
 
    @RequestMapping(method = RequestMethod.GET, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Employee> findAll() {
        Flux<Employee> emps = employeeService.findAll();
        return emps;
    }
 
    @RequestMapping(value = "/update", method = RequestMethod.PUT)
    @ResponseStatus(HttpStatus.OK)
    public Mono<Employee> update(@RequestBody Employee e) {
        return employeeService.update(e);
    }
 
    @RequestMapping(value = "/delete/{id}", method = RequestMethod.DELETE)
    @ResponseStatus(HttpStatus.OK)
    public void delete(@PathVariable("id") Integer id) {
        employeeService.delete(id).subscribe();
    }
 
}

Service类

import com.howtodoinjava.demo.model.Employee;
 
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
public interface IEmployeeService 
{
    void create(Employee e);
     
    Mono<Employee> findById(Integer id);
 
    Flux<Employee> findByName(String name);
 
    Flux<Employee> findAll();
 
    Mono<Employee> update(Employee e);
 
    Mono<Void> delete(Integer id);
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import com.howtodoinjava.demo.dao.EmployeeRepository;
import com.howtodoinjava.demo.model.Employee;
 
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
@Service
public class EmployeeService implements IEmployeeService {
     
    @Autowired
    EmployeeRepository employeeRepo;
 
    public void create(Employee e) {
        employeeRepo.save(e).subscribe();
    }
 
    public Mono<Employee> findById(Integer id) {
        return employeeRepo.findById(id);
    }
 
    public Flux<Employee> findByName(String name) {
        return employeeRepo.findByName(name);
    }
 
    public Flux<Employee> findAll() {
        return employeeRepo.findAll();
    }
 
    public Mono<Employee> update(Employee e) {
        return employeeRepo.save(e);
    }
 
    public Mono<Void> delete(Integer id) {
        return employeeRepo.deleteById(id);
    }
 
}

DAO Repository

import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
 
import com.howtodoinjava.demo.model.Employee;
 
import reactor.core.publisher.Flux;
 
public interface EmployeeRepository extends ReactiveMongoRepository<Employee, Integer> {
    @Query("{ 'name': ?0 }")
    Flux<Employee> findByName(final String name);
}

Model

import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
 
@Scope(scopeName = "request", proxyMode = ScopedProxyMode.TARGET_CLASS)
@Document
public class Employee {
 
    @Id
    int id;
    String name;
    long salary;
 
    //Getters and setters
 
    @Override
    public String toString() {
        return "Employee [id=" + id + ", name=" + name + ", salary=" + salary + "]";
    }
}

演示

启动应用程序并检查请求和响应。

HTTP发布http://localhost:8080/create

API请求1

{
    "id":1,
    "name":"user_1",
    "salary":101
}

API请求2

{
    "id":2,
    "name":"user_2",
    "salary":102
}

HTTP PUT http://localhost:8080/update

{
    "id":2,
    "name":"user_2",
    "salary":103
}

HTTP GET http://localhost:8080/

data:{"id":1,"name":"user_1","salary":101}
 
data:{"id":2,"name":"user_2","salary":102}

Spring WebFlux使用教程

注意,我正在用Postman chrome浏览器扩展测试API,它是一个阻塞客户机。只有当它收集了两个员工的响应时,才会显示结果。

要验证非阻塞响应功能,请直接点击chrome浏览器中的URL。结果将以事件(文本/事件流)的形式逐一显示。为了更好地查看结果,请考虑向控制器API添加延迟。

Spring WebFlux使用教程

Spring WebFlux 总结

spring mvc和Spring WebFlux都支持client-server体系结构,但是在并发模型和阻塞性质和线程的默认行为上有一个关键的区别。在springmvc中,假设应用程序可以阻塞当前线程,而在webflux中,默认情况下线程是非阻塞的。这是Spring WebFlux和mvc的主要区别。

被动和非阻塞通常不会使应用程序运行得更快。反应式和非阻塞的预期好处是能够用少量固定数量的线程和较少的内存需求来扩展应用程序。它使应用程序在负载下更具弹性,因为它们以更可预测的方式扩展。

原文地址:https://howtodoinjava.com/spring-webflux/spring-webflux-tutorial/

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2144.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册