3年前 (2021-10-30)  Java系列 |   抢沙发  387 
文章评分 0 次,平均分 0.0
[收起] 文章目录

java.util.concurrent包提供了创建并发应用程序的工具。

在本文中,我们将对整个包进行概述。

主要组件

java.util.concurrent包含的特性太多,无法在一次编写中讨论。在本文中,我们将主要关注此软件包中一些最有用的实用程序,如:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory
  • BlockingQueue
  • DelayQueue
  • Locks
  • Phaser

Executor

Executor是一个接口,表示执行所提供任务的对象。

如果任务应该在新线程或当前线程上运行,这取决于特定的实现(从哪里启动调用)。因此,使用此接口,我们可以将任务执行流与实际的任务执行机制解耦。

这里需要注意的一点是,Executor并不严格要求任务执行是异步的。在最简单的情况下,执行者可以在调用线程中立即调用提交的任务。

我们需要创建一个调用程序来创建executor实例:

public class Invoker implements Executor {
    @Override
    public void execute(Runnable r) {
        r.run();
    }
}

现在,我们可以使用这个调用程序来执行任务。

public void execute() {
    Executor executor = new Invoker();
    executor.execute( () -> {
        // task to be performed
    });
}

这里需要注意的一点是,如果executor不能接受任务执行,它将抛出RejectedExecutionException

ExecutorService

ExecutorService是异步处理的完整解决方案。它管理内存队列,并根据线程可用性安排提交的任务。

要使用ExecutorService,我们需要创建一个可运行类。

public class Task implements Runnable {
    @Override
    public void run() {
        // task details
    }
}

现在我们可以创建ExecutorService实例并分配此任务。在创建时,我们需要指定线程池的大小。

ExecutorService executor = Executors.newFixedThreadPool(10);

如果要创建单线程executor服务实例,可以使用newSingleThreadExecutor(ThreadFactory ThreadFactory)创建实例。

一旦创建了executor,我们就可以使用它来提交任务。

public void execute() { 
    executor.submit(new Task()); 
}

我们还可以在提交任务时创建可运行实例。

executor.submit(() -> {
    new Task();
});

它还提供了两种开箱即用的执行终止方法。第一个是shutdown();它等待所有提交的任务完成执行。另一个方法是shutdownNow(),它立即终止所有挂起/正在执行的任务。

还有另一种等待终止的方法awaitTermination(long timeout, TimeUnit unit),在触发关机事件或发生执行超时或执行线程本身中断后,强制阻止所有任务完成执行,

try {
    executor.awaitTermination( 20l, TimeUnit.NANOSECONDS );
} catch (InterruptedException e) {
    e.printStackTrace();
}

ScheduledExecutorService

ScheduledExecutorService是与ExecutorService类似的接口,但它可以定期执行任务。

Executor和ExecutorService的方法在现场安排,不会引入任何人为延迟。零或任何负值表示需要立即执行请求。

我们可以使用RunnableCallable接口来定义任务。

public void execute() {
    ScheduledExecutorService executorService
      = Executors.newSingleThreadScheduledExecutor();

    Future<String> future = executorService.schedule(() -> {
        // ...
        return "Hello world";
    }, 1, TimeUnit.SECONDS);

    ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {
        // ...
    }, 1, TimeUnit.SECONDS);

    executorService.shutdown();
}

ScheduledExecutorService还可以在给定的固定延迟后安排任务:

executorService.scheduleAtFixedRate(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

executorService.scheduleWithFixedDelay(() -> {
    // ...
}, 1, 10, TimeUnit.SECONDS);

在这里,scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit )方法创建并执行一个周期性动作,该动作首先在提供的初始延迟之后调用,然后在给定的时间段内调用,直到服务实例关闭。

scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit )方法创建并执行一个周期性动作,该动作在提供的初始延迟之后首先被调用,并在执行动作的终止和下一个动作的调用之间以给定的延迟重复调用。

Future

Future用于表示异步操作的结果。它提供了检查异步操作是否完成、获取计算结果等方法。

此外,cancel(boolean mayInterruptIfRunning)API取消操作并释放正在执行的线程。如果MayInterruptFrunning的值为true,则执行任务的线程将立即终止。

否则,将允许完成正在进行的任务。

我们可以使用下面的代码片段来创建未来的实例:

public void invoke() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Future<String> future = executorService.submit(() -> {
        // ...
        Thread.sleep(10000l);
        return "Hello world";
    });
}

我们可以使用以下代码片段检查未来的结果是否准备就绪,并在计算完成后获取数据:

if (future.isDone() && !future.isCancelled()) {
    try {
        str = future.get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

我们还可以为给定的操作指定超时。如果任务花费的时间超过此时间,则会引发TimeoutException

try {
    future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

CountDownLatch

CountDownLatch(在JDK5中引入)是一个实用程序类,它在某些操作完成之前阻止一组线程。

使用counter(Integer type)初始化倒计时锁存器;当从属线程完成执行时,此计数器递减。但是一旦计数器达到零,其他线程就会被释放。

CyclicBarrier

CyclicBarrier的工作原理与CountDownLatch几乎相同,只是我们可以重用它。与CountDownLatch不同,它允许多个线程在调用最终任务之前使用wait()方法(称为barrier condition)互相等待。

我们需要创建一个可运行的任务实例来启动屏障条件:

public class Task implements Runnable {

    private CyclicBarrier barrier;

    public Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public void run() {
        try {
            LOG.info(Thread.currentThread().getName() + 
              " is waiting");
            barrier.await();
            LOG.info(Thread.currentThread().getName() + 
              " is released");
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

现在我们可以调用一些线程来竞争屏障条件:

public void start() {

    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
        // ...
        LOG.info("All previous tasks are completed");
    });

    Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); 
    Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); 
    Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); 

    if (!cyclicBarrier.isBroken()) { 
        t1.start(); 
        t2.start(); 
        t3.start(); 
    }
}

这里,isBroken()方法检查执行期间是否有任何线程被中断。在执行实际流程之前,我们应该始终执行此检查。

Semaphore

Semaphore用于阻止对物理或逻辑资源某些部分的线程级访问。Semaphore包含一组许可证;每当线程试图进入临界区时,它都需要检查Semaphore是否有许可证可用。

如果许可证不可用(通过tryAcquire()),则不允许线程跳入临界段;但是,如果许可证可用,则允许访问,并且许可证计数器减少。

一旦执行线程释放临界段,许可计数器再次增加(通过release()方法完成)。

我们可以使用tryAcquire(longtimeout,TimeUnit)方法指定获取访问权限的超时。

我们还可以检查可用许可证的数量或等待获取semaphore的线程的数量。

以下代码段可用于实现semaphore

static Semaphore semaphore = new Semaphore(10);

public void execute() throws InterruptedException {

    LOG.info("Available permit : " + semaphore.availablePermits());
    LOG.info("Number of threads waiting to acquire: " + 
      semaphore.getQueueLength());

    if (semaphore.tryAcquire()) {
        try {
            // ...
        }
        finally {
            semaphore.release();
        }
    }

}

ThreadFactory

顾名思义,ThreadFactory充当一个线程(不存在)池,根据需要创建一个新线程。它消除了实现高效线程创建机制所需的大量样板代码。

我们可以定义ThreadFactory

public class BaeldungThreadFactory implements ThreadFactory {
    private int threadId;
    private String name;

    public BaeldungThreadFactory(String name) {
        threadId = 1;
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, name + "-Thread_" + threadId);
        LOG.info("created new thread with id : " + threadId +
            " and name : " + t.getName());
        threadId++;
        return t;
    }
}

我们可以使用此newThread(Runnable r)方法在运行时创建新线程:

BaeldungThreadFactory factory = new BaeldungThreadFactory( 
    "BaeldungThreadFactory");
for (int i = 0; i < 10; i++) { 
    Thread t = factory.newThread(new Task());
    t.start(); 
}

BlockingQueue

在异步编程中,最常见的集成模式之一是生产者-消费者模式。concurrent包附带了一个称为BlockingQueue的数据结构,它在这些异步场景中非常有用。

DelayQueue

DelayQueue是一个无限大小的元素阻塞队列,其中一个元素只有在其过期时间(称为用户定义的延迟)完成时才能被提取。因此,最顶端的元素(head)将具有最多的延迟量,并且它将最后被轮询。

这里提供了更多信息和一个工作示例。

Locks

毫不奇怪,Lock是一个实用工具,用于阻止其他线程访问特定代码段,而当前正在执行该代码段的线程除外。

锁和同步块之间的主要区别在于,同步块完全包含在方法中;但是,我们可以在单独的方法中使用Lock API的Lock()unlock()操作。

这里提供了更多信息和一个工作示例。

Phaser

Phaser是一种比CyclicBarrierCountDownLatch更灵活的解决方案,用于充当可重用的barrier,动态数量的线程需要在其上等待才能继续执行。我们可以协调多个执行阶段,为每个程序阶段重用一个Phaser实例。

结论

在这篇概述性的高级文章中,我们重点介绍了java.util.concurrent包的不同实用程序。

与往常一样,完整的源代码可以在GitHub上获得:https://github.com/eugenp/tutorials/tree/master/core-java-modules/core-java-concurrency-basic

原文地址:https://www.baeldung.com/java-util-concurrent

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2488.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册