针对单体应用中线程处理过程中如何保证消费服务的顺序性

很多刚入行业的新手来说,对于如何保证业务代码(特指)执行的顺序性问题,一直是丈二和尚摸不着头。

举个例子来说(只针对单体,高并发下的分布式架构后面扯):

一个完成的买产品流程包括多个服务之间的流转,比如说你购买一个MacBook pro,至少包括第一商品的SKU的结果,然后去查库存是否有货,其次如果有货的情况下,锁单,并且进行提交,包括调用用户的信息,然后在生成订单,根据订单不管成功与否丢入死信队列做延迟,最后等你支付(不管支付或者超时未付亦或着是取消订单等)由这些操作完成后再去从死信中出来的结果做处理。撇开后面的处理,前面至少3-4个业务服务进行流转,且每个流转之间有依赖前一个业务的,也有可以并行操作的。那这个时候在开发如何处理呢?

 

首先,思考第1个问题:

用户的请求是什么来着?

用户的每一次请求都是一个不同的线程,关于线程数的管理,请注意一下如果使用了TOMCAT自行看一下配置文件其中的Connecter。

那请问有了线程之后,是否还会有很多新手那时候学到的知识那样,代码执行是自上而下的呢?

 

第二个问题:

线程1----S1

线程2---s2

线程3--- s3

线程 服务

如果我直接执行会有什么结果?会不会是s1-s2-s3这样的执行?!

其实学过线程都知道有可能也有不可能,记得我前段时间问一个某二厂工作3年的开发人员,他们的数据是从数据中台获得的,你们的操作方式是什么?---当我听到了一个方法后,我惊呆了,对方回答:不知道要执行多久,我就sleep一下,等sleep完了就好了,差不多数据获取就需要那么久时间,我更久一点也无所谓。看似没有问题的一句话,这样的操作如果在一线大厂或者说并发请求比较大的小公司中会出现什么问题?

几种可能:

1)中台数据在5秒处理完成,你睡10秒后收到,开心开心!!!---用户在后面等了10秒的转圈圈,然后每次查一次,转圈至少10秒,于是卸载。

2)中台突然压力过大,编程11秒处理完成,好,你睡了10秒,然后。。。。。没有然后了,客户在后面的了10秒后,刷出个寂寞来~~~

 

在我们实际开发的过程中,我们讲究的是体验,尤其是互联网项目,为什么说互联网项目最能锻炼开发者遇到问题和处理问题的能力,注意这个能力是褒义词,是遇到问题如何最大化优化,而不是负优化!所以不管to c,to b 或者to g,你都不能只想着自己如何划水,而放弃客户的体验。

所以遇到上面的问题,我们有没有好的解决办法呢?当然有!!!!!

注意,这边暂时只针对单体应用,分布式项目由于不是同一个JVM等各类问题,会造成还是会出现不一致的情况,当然解决办法也有,比如说分布式锁,后面再说!

那开始进入正题:

  1. 如何能够顺序保证第一个执行的结果传入给下一个线程处理,当是也会遇到用户体验的问题,但是能够保证除非获得不到,否则一定会拿到最终结果。

    如果有印象的应该还记得一个线程的处理方式,Callable以及Future

    上述处理是用于在线程执行后返回一个想要的结果,然后放入FutureTask处理并提交给下面一个线程,这就是第一种处理方案。阻塞时线程处理。

    import java.util.Arrays;
    import java.util.concurrent.*;
    
    public class FutureDemo01 {
        public static void main(String[] args) {
            ExecutorService executor =    new ThreadPoolExecutor(4, 8,
                    1000L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(10),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.AbortPolicy());
    
            //有返回值的
            Future future = executor.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(2000);
                    return 12;
                }
            });
    
             	ture.isDone());   //false  2秒阻塞
            try {
                System.out.println(future.get());   //2秒过后拿到结果  12
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            //模拟一个场景当上面获得到结果后在给其他线程调用
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName()+":"+future.get());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            });
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName()+":"+future.get());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            executor.shutdown();
        }
    }
    

执行结果:(先等待了2秒后出结果)

12
pool-1-thread-2:12

上面这种处理方式是利用第一次通过线程获得结果,然后通过isDone()来获得是否执行完毕,当实行完毕后,就可以获得到线程处理后的结果,最终利用获得到的结果future.get()传递给其他线程,保证线程一致性,这个也可以看成是阻塞线程处理。

那这种方式的缺点也很明显,虽然 Future<V> 可以获取任务执行结果,但是获取方式十方不变。我们不得不使用 Future#get 阻塞调用线程,或者使用轮询方式判断 Future#isDone 任务是否结束,再获取结果。

这两种处理方式都不是很优雅,JDK8 之前并发类库没有提供相关的异步回调实现方式。没办法,我们只好借助第三方类库,如 Guava,扩展 Future,增加支持回调功能。相关代码如下:

package cn.majian1210.thread.guava;
​
import com.google.common.util.concurrent.*;
import org.checkerframework.checker.nullness.qual.Nullable;
​
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
​
/**
* 通过guava提供的ListeningExecutorService实现异步处理并可回调结果
*/
public class ListeningExecutorDemo01 {
   //自定义线程工厂实现自定义名称
   static class MyThreadPoolFactory implements ThreadFactory {
​
       private static final AtomicInteger poolNumber = new AtomicInteger(1);
       private final ThreadGroup group;
       private final AtomicInteger threadNumber = new AtomicInteger(1);
       private final String namePrefix;
​
       MyThreadPoolFactory() {
           SecurityManager s = System.getSecurityManager();
           group = (s != null) ? s.getThreadGroup() :
                   Thread.currentThread().getThreadGroup();
           namePrefix = "线程池-" +
                   poolNumber.getAndIncrement() +
                   "-线程-";
      }
​
       public Thread newThread(Runnable r) {
           Thread t = new Thread(group, r,
                   namePrefix + threadNumber.getAndIncrement(),
                   0);
           if (t.isDaemon())
               t.setDaemon(false);
           if (t.getPriority() != Thread.NORM_PRIORITY)
               t.setPriority(Thread.NORM_PRIORITY);
           return t;
      }
  }
​
   public static void main(String[] args) {
   //实现guava线程池
       ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
​
       //尝试定义一个线程先执行返回结果2
       ListenableFuture<Integer> listenableFuture01 = executorService.submit(new Callable<Integer>() {
           @Override
           public Integer call() throws Exception {
               try {
                   TimeUnit.SECONDS.sleep(2);
                   //随机0-100中间一个结果
              } catch (InterruptedException e) {
                   e.printStackTrace();
              }
               return new Random().nextInt(100);
          }
      });
​
       try {
           System.out.println(listenableFuture01.get() );
      } catch (InterruptedException e) {
           e.printStackTrace();
      } catch (ExecutionException e) {
           e.printStackTrace();
      }
​
      ExecutorService service =  new ThreadPoolExecutor(
               4, 8,10, TimeUnit.SECONDS,
               new LinkedBlockingQueue(100),new MyThreadPoolFactory() );
​
       Futures.addCallback(listenableFuture01, new FutureCallback<Integer>() {
                   @Override
                   public void onSuccess(@Nullable Integer integer) {
                       System.out.println(integer);
                       service.execute(()->{
                           System.out.println("---------"+integer+"----------");
                      });
                       service.shutdown();
                  }
​
                   @Override
                   public void onFailure(Throwable throwable) {
                       System.out.println(throwable.getMessage());
                  }
              },executorService);
​
​
               executorService.shutdown();
​
  }
}

实行结果:

48
48
---------48----------

上述通过Guava的实现是通过监听来操作,好处在于不会一致等待,通过监听到onSuccess来执行其他线程内容。但是又发现了问题就是无法解决多个线程之间相互依赖的问题。这个时候如果需要这种多链式依赖可以通过CompletableFuture来实现。

其中这个CompletableFuture实际还是jdk原生提供的哦,不过需要配合一些JDK1.8的新特性,比如函数式接口以及Lambda表达式,我这边就简单说明一下,其他不去深究。

函数式接口就是一个打上@FunctionalInterface的接口,里面只有一个抽象方法,然后调用可以通过Lambda表达式调用。

/**
* 函数式接口有且只有一个抽象方法
*/
@FunctionalInterface
 interface  Test{
//   public void a(){
//       System.out.println("aaaa");
//   }
   public   String b();
​
//   public void c(String s);
}

实现:

Test t2 = () ->{
  return "ewewewew";
};

上面就相当于

写了一个实现类,然后生成对象,然后对象在去调用b()方法,明白了不?~

其中()代表b()是无参数的,其实这个()都可以不写,直接= ->{}来实现,最后还有一个return就是返回的那个字符串。

好,继续进入正题:那如何通过CompletableFuture来实现呢?

其实CompletableFuture是一个类似于链式执行的线程处理方式,可以把我要执行的一类线程放在前面,当执行完毕之后,我要如何处理。下面附上一张图---饿,不是我画的,但是基本上涵盖了所有的方法。

 

1)首先第一步先创建CompletableFuture completableFuture的对象。在这个实现中包括5种实现的方案--网络图

 

其中自定义线程池还好说,基本上直接就可以操作线程池对象即可。另外一个是forkjoin,需要在线程中fork线程。

第一个方法创建一个具有默认结果的 CompletableFuture,这个没啥好讲。我们重点讲述下下面四个异步方法。

前两个方法 runAsync 不支持返回值,而 supplyAsync 可以支持返回结果。

这个两个方法默认将会使用公共的 ForkJoinPool 线程池执行,这个线程池默认线程数是 CPU 的核数。

可以设置 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数

使用共享线程池将会有个弊端,一旦有任务被阻塞,将会造成其他任务没机会执行。所以强烈建议使用后两个方法,根据任务类型不同,主动创建线程池,进行资源隔离,避免互相干扰。

​
​
import java.util.concurrent.*;
import java.util.function.BiConsumer;
import java.util.function.Function;
​
public class CompletableFutureDemo01 {
​
   public static void main(String[] args) {
       ExecutorService executor =    new ThreadPoolExecutor(4, 8,
               1000L, TimeUnit.MILLISECONDS,
               new LinkedBlockingQueue<Runnable>(10),
               Executors.defaultThreadFactory(),
               new ThreadPoolExecutor.AbortPolicy());
​
       //有返回值的---串行执行
     CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
         try {
             TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
        }
         System.out.println("执行 ...");
                return 100;
    },executor).thenApplyAsync(num->num = num +100).thenApplyAsync(num -> num = num + 200);/**/
​
       System.out.println(completableFuture.join());
​
​
       CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(()->{
           try {
               TimeUnit.SECONDS.sleep(2);
          } catch (InterruptedException e) {
          }
           System.out.println("执行1 ...");
           return "订航班";
      },executor);
​
       CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(()->{
           try {
               TimeUnit.SECONDS.sleep(2);
          } catch (InterruptedException e) {
          }
           System.out.println("执行2 ...");
           return "订酒店";
      },executor);
​
       CompletableFuture<String> completableFuture3 = completableFuture2.thenCombineAsync(completableFuture1,(hotel,airplane) -> {
           System.out.println(airplane+hotel+"之后执行");
           try {
               Thread.sleep(1000L);
          } catch (InterruptedException e) {
               e.printStackTrace();
          }
           return "租车";
      });
​
       System.out.println(completableFuture3.join());
​
​
//       System.out.println("==");
//
//       // 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
//       try {
//           TimeUnit.SECONDS.sleep(2);
//       } catch (InterruptedException e) {
//           e.printStackTrace();
//       }
​
       executor.shutdown();
  }
​
}

执行结果是:

执行 ...
400
执行1 ...
执行2 ...
订航班订酒店之后执行
租车
​
​
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
​
public class CompletableFutureDemo02 {
​
   public static void printThread(String tag){
       System.out.println(new StringJoiner("\t")
                          .add(Thread.currentThread().getName())
                            .add(tag));
  }
​
   public static void main(String[] args) {
       CompletableFutureDemo02.printThread("大Logo进入餐厅");
       CompletableFutureDemo02.printThread("黄晓明接待,点餐");
​
       CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()->{
           CompletableFutureDemo02.printThread("服务员,给大Logo最贵的菜");
           try {
               TimeUnit.MILLISECONDS.sleep(2000);
          } catch (InterruptedException e) {
               e.printStackTrace();
          }
           return "黄晓明专属辣椒炒肉";
      }).thenCompose(foodname -> CompletableFuture.supplyAsync(()->{
           CompletableFutureDemo02.printThread("厨师做起来" + foodname);
           try {
               TimeUnit.MILLISECONDS.sleep(2000);
          } catch (InterruptedException e) {
               e.printStackTrace();
          }
           return foodname;
      }).thenComposeAsync(food ->CompletableFuture.supplyAsync(()->{
           CompletableFutureDemo02.printThread("服务员上菜");
           return food + "大份米饭";
      })));
       cf1.join();
       CompletableFutureDemo02.printThread("大logo开吃");
​
  }
}

执行结果:

main大Logo进入餐厅
main黄晓明接待,点餐
ForkJoinPool.commonPool-worker-3服务员,给大Logo最贵的菜
ForkJoinPool.commonPool-worker-5厨师做起来黄晓明专属辣椒炒肉
ForkJoinPool.commonPool-worker-3服务员上菜
main大logo开吃

细品细品~~!!!

热门文章

暂无图片
编程学习 ·

exe4j详细使用教程(附下载安装链接)

一、exe4j介绍 ​ exe4j是一个帮助你集成Java应用程序到Windows操作环境的java可执行文件生成工具&#xff0c;无论这些应用是用于服务器&#xff0c;还是图形用户界面&#xff08;GUI&#xff09;或命令行的应用程序。如果你想在任务管理器中及Windows XP分组的用户友好任务栏…
暂无图片
编程学习 ·

AUTOSAR从入门到精通100讲(126)-浅谈车载充电系统通信方案

01 引言 本文深入研究车载充电系统策略,设计出一套基于电动汽车电池管理系统与车载充电机的CAN通信协议,可供电动汽车设计人员参考借鉴。 02 电动汽车充电系统通讯网络 电动汽车整车控制系统中采用的是CAN总线通信方式,由一个整车内部高速CAN网络、内部低速CAN网络和一个充电…
暂无图片
编程学习 ·

CMake(九):生成器表达式

当运行CMake时&#xff0c;开发人员倾向于认为它是一个简单的步骤&#xff0c;需要读取项目的CMakeLists.txt文件&#xff0c;并生成相关的特定于生成器的项目文件集(例如Visual Studio解决方案和项目文件&#xff0c;Xcode项目&#xff0c;Unix Makefiles或Ninja输入文件)。然…
暂无图片
编程学习 ·

47.第十章 网络协议和管理配置 -- 网络配置(八)

4.3.3 route 命令 路由表管理命令 路由表主要构成: Destination: 目标网络ID,表示可以到达的目标网络ID,0.0.0.0/0 表示所有未知网络,又称为默认路由,优先级最低Genmask:目标网络对应的netmaskIface: 到达对应网络,应该从当前主机哪个网卡发送出来Gateway: 到达非直连的网络,…
暂无图片
编程学习 ·

元宇宙技术基础

请看图&#xff1a; 1、通过AR、VR等交互技术提升游戏的沉浸感 回顾游戏的发展历程&#xff0c;沉浸感的提升一直是技术突破的主要方向。从《愤怒的小鸟》到CSGO,游戏建模方式从2D到3D的提升使游戏中的物体呈现立体感。玩家在游戏中可以只有切换视角&#xff0c;进而提升沉浸…
暂无图片
编程学习 ·

flink的伪分布式搭建

一 flink的伪分布式搭建 1.1 执行架构图 1.Flink程序需要提交给 Job Client2.Job Client将作业提交给 Job Manager3.Job Manager负责协调资源分配和作业执行。 资源分配完成后&#xff0c;任务将提交给相应的 Task Manage。4.Task Manager启动一个线程以开始执行。Task Manage…
暂无图片
编程学习 ·

十进制正整数与二进制字符串的转换(C++)

Function one&#xff1a; //十进制数字转成二进制字符串 string Binary(int x) {string s "";while(x){if(x % 2 0) s 0 s;else s 1 s;x / 2;}return s; } Function two&#xff1a; //二进制字符串变为十进制数字 int Decimal(string s) {int num 0, …
暂无图片
编程学习 ·

[含lw+源码等]微信小程序校园辩论管理平台+后台管理系统[包运行成功]Java毕业设计计算机毕设

项目功能简介: 《微信小程序校园辩论管理平台后台管理系统》该项目含有源码、论文等资料、配套开发软件、软件安装教程、项目发布教程等 本系统包含微信小程序做的辩论管理前台和Java做的后台管理系统&#xff1a; 微信小程序——辩论管理前台涉及技术&#xff1a;WXML 和 WXS…
暂无图片
编程学习 ·

树莓派驱动DHT11温湿度传感器

1&#xff0c;直接使用python库 代码如下 import RPi.GPIO as GPIO import dht11 import time import datetimeGPIO.setwarnings(True) GPIO.setmode(GPIO.BCM)instance dht11.DHT11(pin14)try:while True:result instance.read()if result.is_valid():print(ok)print(&quo…
暂无图片
编程学习 ·

ELK简介

ELK简介 ELK是三个开源软件的缩写&#xff0c;Elasticsearch、Logstash、Kibana。它们都是开源软件。不过现在还新增了一个 Beats&#xff0c;它是一个轻量级的日志收集处理工具(Agent)&#xff0c;Beats 占用资源少&#xff0c;适合于在各个服务器上搜集日志后传输给 Logstas…
暂无图片
编程学习 ·

Linux 基础

通常大数据框架都部署在 Linux 服务器上&#xff0c;所以需要具备一定的 Linux 知识。Linux 书籍当中比较著名的是 《鸟哥私房菜》系列&#xff0c;这个系列很全面也很经典。但如果你希望能够快速地入门&#xff0c;这里推荐《Linux 就该这么学》&#xff0c;其网站上有免费的电…
暂无图片
编程学习 ·

Windows2022 无线网卡装不上驱动

想来 Windows2022 和 windows10/11 的驱动应该差不多通用的&#xff0c;但是死活装不上呢&#xff1f; 搜一下&#xff0c;有人提到 “默认安装时‘无线LAN服务’是关闭的&#xff0c;如果需要开启&#xff0c;只需要在“添加角色和功能”中&#xff0c;选择开启“无线LAN服务…
暂无图片
编程学习 ·

【嵌入式面试宝典】版本控制工具Git常用命令总结

目录 创建仓库 查看信息 版本回退 版本检出 远程库 Git 创建仓库 git initgit add <file> 可反复多次使用&#xff0c;添加多个文件git commit -m <message> 查看信息 git status 仓库当前的状态git diff 差异对比git log 历史记录&#xff0c;提交日志--pret…
暂无图片
编程学习 ·

用Postman生成测试报告

newman newman是一款基于nodejs开发的可以运行postman脚本的工具&#xff0c;使用Newman&#xff0c;可以直接从命令运行和测试postman集合。 安装nodejs 下载地址&#xff1a;https://nodejs.org/en/download/ 选择自己系统相对应的版本内容进行下载&#xff0c;然后傻瓜式安…
暂无图片
编程学习 ·

Java面向对象之多态、向上转型和向下转型

文章目录前言一、多态二、引用类型之间的转换Ⅰ.向上转型Ⅱ.向下转型总结前言 今天继续Java面向对象的学习&#xff0c;学习面向对象的第三大特征&#xff1a;多态&#xff0c;了解多态的意义&#xff0c;以及两种引用类型之间的转换&#xff1a;向上转型、向下转型。  希望能…