精华内容
下载资源
问答
  • 在App.java中开启容错保护(加入此注解开启容错机制@EnableCircuitBreaker)@SpringBootApplication@EnableEurekaClient@EnableCircuitBreakerpublic class RequestApp {/*** @param args*/publ...

    Spring-cloud-hystrix-容错机制(当服务调用异常时进行响应)

    1.在App.java中开启容错保护(加入此注解开启容错机制@EnableCircuitBreaker)

    @SpringBootApplication

    @EnableEurekaClient

    @EnableCircuitBreaker

    public class RequestApp {

    /**

    * @param args

    */

    public static void main(String[] args) {

    // TODO Auto-generated method stub

    SpringApplication.run(RequestApp.class, args);

    }

    @Bean

    @LoadBalanced

    public RestTemplate newRestTemplate(){

    return new RestTemplate();

    }

    }

    2.pom.xml中引用hystrix依赖

    org.springframework.boot

    spring-boot-starter-parent

    1.3.7.RELEASE

    org.springframework.boot

    spring-boot-starter-web

    org.springframework.boot

    spring-boot-starter-test

    test

    org.springframework.cloud

    spring-cloud-starter-eureka

    org.springframework.cloud

    spring-cloud-starter-hystrix

    org.springframework.cloud

    spring-cloud-dependencies

    Camden.SR3

    pom

    import

    3。Controller

    @RequestMapping(value = "CircuitBreakerInfo")

    public List circuitBreakerInfo(){

    return circuitBreakerserver.circuitBreakerInfoService();

    }

    Service

    @HystrixCommand(fallbackMethod = "circuiterrorMethod")

    public List circuitBreakerInfoService(){

    System.out.println("测试Hystrix / circuitBreaker 方法");

    Map maps = new HashMap();

    maps.put("name", "request");

    return restTemplate.getForObject(ribbonUrl, List.class,maps);

    }

    展开全文
  • import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org...

    Storm中有个特殊的Executor叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树。当acker发现一个Tuple树已经处理完成了,它会告诉框架回调Spout的ack(),否则回调Spout的fail()。

    Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。

    我们期望的是,如果某个Tuple被Bolt执行失败了,则Spout端可以重新发送该Tuple。但很遗憾的是,框架不会自动重新发送,需要我们自己手工编码实现。后续给大家实战案例!

    什么是Tuple树?

    83bb030c52fa0448f49b08b32babb984.png

    2589ec5901d868d207bdadde2c0cda56.png

    8dd4a94e7bced3b73c02f3523c23d9fc.png

    Spout类代码如下:

    package les19.Ack_Fail;

    import java.io.BufferedReader;

    import java.io.FileInputStream;

    import java.io.InputStreamReader;

    import java.util.Map;

    import java.util.UUID;

    import java.util.concurrent.ConcurrentHashMap;

    import org.apache.storm.spout.SpoutOutputCollector;

    import org.apache.storm.task.TopologyContext;

    import org.apache.storm.topology.IRichSpout;

    import org.apache.storm.topology.OutputFieldsDeclarer;

    import org.apache.storm.tuple.Fields;

    import org.apache.storm.tuple.Values;

    public class AckSpout implements IRichSpout{

    /**

    *

    */

    private static final long serialVersionUID = 1L;

    FileInputStream fis;

    InputStreamReader isr;

    BufferedReader br;

    private ConcurrentHashMap _pending;//线程安全的Map,存储emit过的tuple

    private ConcurrentHashMap fail_pending;//存储失败的tuple和其失败次数

    SpoutOutputCollector collector = null;

    String str = null;

    @Override

    public void nextTuple() {

    try {

    while ((str = this.br.readLine()) != null) {

    // 过滤动作

    UUID msgId = UUID.randomUUID();

    String arr[] = str.split("\t");

    String date = arr[2].substring(0, 10);

    String orderAmt = arr[1];

    Values val = new Values(date,orderAmt);

    this._pending.put(msgId, val);

    collector.emit(val, msgId);

    System.out.println("_pending.size()="+_pending.size());

    }

    } catch (Exception e) {

    // TODO: handle exception

    }

    }

    @Override

    public void close() {

    // TODO Auto-generated method stub

    try {

    br.close();

    isr.close();

    fis.close();

    } catch (Exception e) {

    // TODO: handle exception

    e.printStackTrace();

    }

    }

    @Override

    //初始化函数

    public void open(Map conf, TopologyContext context,

    SpoutOutputCollector collector) {

    try {

    this.collector = collector;

    this.fis = new FileInputStream("order.log");

    this.isr = new InputStreamReader(fis, "UTF-8");

    this.br = new BufferedReader(isr);

    _pending = new ConcurrentHashMap();

    fail_pending = new ConcurrentHashMap();

    } catch (Exception e) {

    e.printStackTrace();

    }

    // TODO Auto-generated method stub

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    // TODO Auto-generated method stub

    declarer.declare(new Fields("date","orderAmt"));

    }

    @Override

    public Map getComponentConfiguration() {

    // TODO Auto-generated method stub

    return null;

    }

    @Override

    public void ack(Object msgId) {

    // TODO Auto-generated method stub

    System.out.println("_pending size 共有:"+_pending.size());

    System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass());

    this._pending.remove(msgId);

    System.out.println("_pending size 剩余:"+_pending.size());

    }

    @Override

    public void activate() {

    // TODO Auto-generated method stub

    }

    @Override

    public void deactivate() {

    // TODO Auto-generated method stub

    }

    @Override

    public void fail(Object msgId) {

    // TODO Auto-generated method stub

    System.out.println("spout fail:"+msgId.toString());

    Integer fail_count = fail_pending.get(msgId);//获取该Tuple失败的次数

    if (fail_count == null) {

    fail_count = 0;

    }

    fail_count ++ ;

    if (fail_count>=3) {

    //重试次数已满,不再进行重新emit

    fail_pending.remove(msgId);

    }else {

    //记录该tuple失败次数

    fail_pending.put(msgId, fail_count);

    //重发

    this.collector.emit(this._pending.get(msgId), msgId);

    }

    }

    }

    Bolt如下:

    package les19.Ack_Fail;

    import java.util.Map;

    import org.apache.storm.task.OutputCollector;

    import org.apache.storm.task.TopologyContext;

    import org.apache.storm.topology.IRichBolt;

    import org.apache.storm.topology.OutputFieldsDeclarer;

    import org.apache.storm.tuple.Fields;

    import org.apache.storm.tuple.Tuple;

    import org.apache.storm.tuple.Values;

    public class AckBolt implements IRichBolt {

    /**

    *

    */

    private static final long serialVersionUID = 1L;

    OutputCollector collector = null;

    TopologyContext context = null;

    @Override

    public void cleanup() {

    // TODO Auto-generated method stub

    }

    int num = 0;

    String url = null;

    String session_id = null;

    String date = null;

    String province_id = null;

    @Override

    public void execute(Tuple input) {

    try {

    date = input.getStringByField("date") ;

    Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt"));

    collector.emit(input,new Values(date,orderAmt));//注意参数,第一个参数是Tuple本身

    collector.ack(input);

    //     Thread.sleep(300);

    } catch (Exception e) {

    collector.fail(input);

    e.printStackTrace();

    }

    }

    //初始化,对应spout的open函数

    @Override

    public void prepare(Map stormConf, TopologyContext context,

    OutputCollector collector) {

    // TODO Auto-generated method

    this.context = context ;

    this.collector = collector ;

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    // TODO Auto-generated method stub

    declarer.declare(new Fields("date","orderAmt")) ;

    }

    @Override

    public Map getComponentConfiguration() {

    // TODO Auto-generated method stub

    return null;

    }

    }

    TOPO类如下:

    package les19.Ack_Fail;

    import org.apache.storm.Config;

    import org.apache.storm.LocalCluster;

    import org.apache.storm.StormSubmitter;

    import org.apache.storm.topology.TopologyBuilder;

    public class Ack_FailTopo {

    /**

    * @param args

    */

    public static void main(String[] args) {

    // TODO Auto-generated method stub

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new AckSpout(), 1);

    builder.setBolt("bolt", new AckBolt(), 1).shuffleGrouping("spout");

    Config conf = new Config() ;

    //conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);

    conf.setDebug(false);

    if (args.length > 0) {

    try {

    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

    } catch (Exception e) {

    e.printStackTrace();

    }

    }else {

    LocalCluster localCluster = new LocalCluster();

    localCluster.submitTopology("mytopology", conf, builder.createTopology());

    }

    }

    }

    ,本节来自第18-19讲。

    展开全文
  • 1 Dubbo简介Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。作为一个轻量级RPC框架,Dubbo的设计架构简洁清晰,主要...

    1 Dubbo简介

    Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。

    作为一个轻量级RPC框架,Dubbo的设计架构简洁清晰,主要组件包括Provider(服务提供者),Consumer(服务消费者),Registry(注册中心)三部分组成。此外还有用于服务监控的Monitor,它们之间的关系如下所示:

    AAffA0nNPuCLAAAAAElFTkSuQmCC

    在一个分布式系统中,为了做到系统的高可用,即服务宕机时不影响对外正常提供服务,需要组建负载集群,当集群中某一节点没有及时返回数据时,需要有集群容错(重试)机制。

    2 Dubbo负载均衡

    在集群负载均衡时,Dubbo 提供了以下5种均衡策略,缺省为 random 随机调用。

    Random

    随机调用

    Random LoadBalance

    随机,按权重设置随机概率。

    在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

    RoundRobin LoadBalance

    轮循,按公约后的权重设置轮循比率。

    存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

    LeastActive LoadBalance

    最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

    使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

    ConsistentHash LoadBalance

    一致性 Hash,相同参数的请求总是发到同一提供者。

    当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

    3 负载均衡配置

    服务端服务级别

    ;="...">

    客户端服务级别

    ="...">

    服务端方法级别

    ="...">

    ="...">

    客户端方法级别

    ="...">

    4 Dubbo集群容错(重试机制)

    在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 Failover 重试。

    AAffA0nNPuCLAAAAAElFTkSuQmCC

    各节点关系:

    这里的 Invoker 是 Provider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息

    Directory 代表多个 Invoker,可以把它看成 List ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更

    Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个

    Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等

    LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选

    5 重试模式及其特点

    Failover Cluster(默认)

    失败自动切换,当出现失败,重试其它服务器 [1]。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。

    重试次数配置如下:

    ="2">

    ="2">

    ="findfoo">

    Failfast Cluster

    快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

    Failsafe Cluster

    失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

    Failback Cluster

    失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

    Forking Cluster

    并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。

    Broadcast Cluster

    广播调用所有提供者,逐个调用,任意一台报错则报错 [2]。通常用于通知所有提供者更新缓存或日志等本地资源信息。

    作者:慕容千语

    链接:https://www.jianshu.com/p/16e174573acb

    展开全文
  • Spark 容错机制任何容错机制的设计都是先考虑正常情况下是如何处理的,然后去考虑各种失败场景,失败场景可分 Crash(kill -9,掉电等),正常退出(例如抛异常,程序可以做善后处理),网络分区。Task我们先考虑最底层...

    Spark 容错机制

    任何容错机制的设计都是先考虑正常情况下是如何处理的,然后去考虑各种失败场景,失败场景可分 Crash(kill -9,掉电等),正常退出(例如抛异常,程序可以做善后处理),网络分区。

    Task

    我们先考虑最底层的失败,即某一个 Task 执行失败了。

    先来看应该如何处理:

    某 task A 因为取 shuffle 数据取失败而失败了。

    首先,确认失败前应该重试几次,以防止网络分区造成的短暂失败

    然后应该和 driver 通信,driver 负责重新跑该数据对应的 ShuffleMapTask B,driver 需要记录 A 在等待 B 的数据,并且当 B 好了之后通知 A,而 A 与 driver 通信后就睡觉,等待 B 好了之后 driver 的唤醒。

    不能取消 A,也不能认为 A 对应的 Stage 失败。不能取消 A 是因为不能浪费 A 已经完成的工作,例如 A 可能已经取了别的 shuffle 数据,不能因为一个 ShuffleMapTask 的输出丢了就丢掉之前取的数据。对于 A 对应的 Stage 可以这样处理:还未启动的 task 可以先不启动;已经启动的 task 之后可能会遇到取失败,如果遇到,则像 A 一样处理就行;已经启动的 task 可能已经取完那部分数据了,因此最后可能成功。

    其他原因的失败只涉及 task 内部,因此重启该 task 就行。

    上面是我能想到的最优的处理,我们来看 spark 是如何做的。

    Task 失败由 executor 感知到,通过 statusUpdate 层层传递到 driver 端的 TaskSetManager.handleFailedTask,其基本逻辑是:

    如果是 FetchFailed,即 reduce task 拿不到 shuffle 数据,那么上一个 Stage 需要重跑。(FetchFailed 的处理之后会详讲)

    如果不是 FetchFailed,并且该 Task 的其他尝试(spark 可能会启动 task 的多个副本)还未成功,会重新启动该 task。

    当然,重启次数超过 spark.task.maxFailures,taskSet 会失败,这意味着一个 stage 失败了。stage 失败整个任务就失败了,spark 会取消该 stage 对应的 job 包含的所有 task,并返回用户任务执行失败。

    FetchFailed

    我们来考虑 FetchFailed 的处理,值得注意的是 FetchFailed 的处理会在未来有比较大的变化,见 SPARK-20178。

    FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,然后由 executor 通过 statusUpdate 传到 driver 端,实际的处理会在 DAGScheduler.handleTaskCompletion,它会重新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts 时会退出。

    Speculative task

    有时候 task 不是失败了,而是太慢了,这时 spark 会多启动几个实例(spark 中一个实例叫 attempt),有任何一个实例成功了该 task 就成功了。spark 会周期性检查是否有 task 符合 speculation 的条件,检查逻辑在 TaskSetManager.checkSpeculatableTasks。

    那么多个实例如何防止其输出不干扰呢?例如一个实例 A成功了,输出,另一个实例 B成功了,其输出覆盖了 A 的输出,甚至更糟糕的是两者同时写一个文件,导致输出错乱。

    有几个层面:

    首先,输出到文件是通过 ShuffleWriter.write 写的。以 SortShuffleWriter 为例,最后在 IndexShuffleBlockResolver.writeIndexFileAndCommit 里会先写数据到临时文件,然后检查文件是否存在,如果文件不存在,就重命名临时文件为正式文件。

    如果两个 task attempt 在同一个 executor 执行,这种情况是不可能的,调度时保证两个 attempt 不会运行在同一台机器上,详情见 TaskSetManager.dequeueSpeculativeTask(IndexShuffleBlockResolver中,检查文件是否存在和文件重命名是在一个 synchronized 块里的,这个 synchronized 看起来没什么必要)

    如果两个 task attempt 在两个 executor 执行,但这两个 executor 在同一台机器上。这种情况在 speculative task 是不可能出现的,这在 speculative task 调度时保证。(不过好像也没事,就是重命名文件时,后面的覆盖前面的,反正内容都一样)

    两个 task attempt 在不同机器上运行,这时两台机器上都有输出文件

    一个 attempt 成功了,会在 TaskSetManager.handleSuccessfulTask 中取消其他正在运行的 task 实例。如果在取消前,有另一个 attempt 成功了,那么另一个 attempt 的输出会注册到 mapOutputTracker 里,从而覆盖之前的实例的注册,这会导致不同 task 会读不同位置的输出,可能会有问题。

    Executor

    Executor crash

    executor crash 是指 executor 异常退出了,比如 executor JVM 崩溃了。

    先来看应该如何处理。

    可以通过心跳,RPC 通信失败,外部的 executor 管理程序来检测 executor

    executor 所运行的 task 应该重新被调度到其他 executor

    如果没有 external shuffle server,那么 shuffle 数据也丢了,需要重新启动生成 shuffle 数据的 task

    重启 executor

    我们来看 spark 是如何做的。在 deploy mode 为 spark standalone 情况下,系统实际运行的进程是 CoarseGrainedExecutorBackend(粗粒度的excutor监控,比如Executor的启停等),所以 Eexecutor crash 就等于该进程 crash。有几个检测 crash 的方式:

    CoarseGrainedSchedulerBackend.onDisconnected 这是 RPC 系统发现 executor 连不上了,该函数会调用 removeExecutor,进而调用 TaskSchedulerImpl.removeExecutor。

    Worker 里的 ExecutorRunner 会监控 executor 进程,其结束时会发送 ExecutorStateChanged 给 Worker,从而到达 Master,最后到达 CoarseGrainedSchedulerBackend.removeExecutor。Master 会重启 executor,一个 app 的 executor 重启超过 spark.deploy.maxExecutorRetries,app 会被终止。

    sparkContext 里有个 HeartbeatReceiver,如果一段时间内没收到 executor 的心跳,会触发 TaskSchedulerImpl.executorLost。该函数也会在 CoarseGrainedSchedulerBackend.removeExecutor 中被调用。

    如果 deploy mode 为 mesos,如果运行模式是 Coarse-Grained(Fine-Grained 已经是 deprecated 的了),实际运行的也是 CoarseGrainedExecutorBackend。在该情况下 executor crash 会通过 MesosCoarseGrainedSchedulerBackend.statusUpdate 处理,它会从totalCoresAcquired 中减去已使用的 cpu,从而在下次调度时,由于 totalCoresAcquired < spark.cores.max,会启动一个新的 executor,没有重启次数限制。

    executor 挂了,task 会在 TaskSchedulerImpl.removeExecutor 里被标记为失败,而 task 失败的处理上一节介绍过了。

    Executor 网络分区

    Executor 网络分区指的是 executor 没有挂,但是和 driver 的网络连接断了,之后可能会恢复。

    先来应该如何处理。系统是无法区分网络分区和 crash 的,因此,系统会把网络分区当做 Crash 处理,关键是在 executor 重新连上时该如何做。重新连上时,driver 端应该忽略, executor 发出的任何消息,并且通知 executor 自杀。

    我们来看 spark 如何做的。executor 是如何自杀的呢:

    如果 executor 向 driver 发送 Heartbeat 的失败次数超过 spark.executor.heartbeat.maxFailures,executor 会自杀,默认设置下,10 分钟连不上,executor 就自杀了,所以这个机制起的是保底的作用,防止在任何情况下 executor 的泄露。

    CoarseGrainedExecutorBackend.onDisconnected 会杀掉 executor

    driver 端 CoarseGrainedSchedulerBackend 在清理 executor 状态时会向 executor 发送 StopExecutor

    driver 如何忽略旧 executor 发出的消息:

    现在的实现里,driver 并没有忽略旧 executor 发出的消息,这可能会造成问题,可以得到改进。具体来说,在 CoarseGainedSchedulerBackend.receive StatusUpdate,并没有看 executorId 是否依旧在 driver 的表中。

    Driver

    Driver crash

    还是先看应该如何做。

    如果想尽量保留 driver 已完成的工作,driver 挂了,应该依赖持久化存储和与 executor 的通信,恢复 driver 的状态,重现建立对正在运行的 executor,task 等的管理。而 executor 与 driver 通信失败时应该有重试机制,并且当重试失败时不应该退出,而应该等待 driver 恢复后重连,并把自己的状态告诉 driver,只有在超过 driver 恢复时间时才自行退出。

    spark 的实现。不管什么模式,driver 挂了,所有相关的 executor 和 task 都会被清理,不尝试恢复,在 yarn-cluster 模式下还会重启 driver,重跑所有 task。这种方式浪费了已有的工作,但实现起来是最简单的。

    Driver 网络分区

    应该忽略旧 driver 的所有消息,并且让旧 driver 退出。

    在 spark 的实现下,例如在 yarn-cluster 模式,如果 driver 网络分区,yarn 会重启 driver,旧 driver 重连时,可能会有点问题,因为两个 driver 的 ID 是一样的,spark 分不清两者。

    主机

    主机 Crash

    主机 Crash,只需要把主机上运行的 driver,executor,task 都按失败处理,同时上面的 shuffle 数据对应的 task 也得重跑。driver 等失败的处理已经介绍过了。

    主机网络分区

    按主机 Crash 处理,上面的旧 driver 的处理也按 driver 网络分区一样处理,其他对象以此类推。

    总结

    任何容错机制的设计都是先考虑正常情况下是如何处理的,然后去考虑各种失败场景,失败场景可分 Crash(kill -9,掉电等),正常退出(例如抛异常,程序可以做善后处理),网络分区。本文介绍了 spark 的 driver,executor,task 失败时的处理,同时也对各种情况应该如何处理表达了一些看法,希望有所帮助。

    展开全文
  • IntroduceApache Flink 提供了可以恢复数据流应用到一致状态的容错机制。确保在发生故障时,程序的每条记录只会作用于状态一次(exactly-once),当然也可以降级为至少一次(at-least-once)。容错机制通过持续创建...
  • Resilience4j是一个轻量级、易于使用的容错库,其灵感来自Netflix Hystrix,但专为Java 8和函数式编程设计。轻量级,因为库只使用Vavr,它没有任何其他外部库依赖项。相比之下,Netflix Hystrix对Archaius有一个编译...
  • 1 Dubbo简介Dubbo是一款高性能、轻量级的开源Java RPC框架,它提供了三年夜核心能力:面向接口的远程体例挪用,智能容错和负载均衡,以及办事自动注册和发现。作为一个轻量级RPC框架,Dubbo的设计架构简洁清晰,主要...
  • Java容错机制——try—catch机制

    千次阅读 2016-10-08 21:40:59
    try{ ……………… }catch( Exception e ){ System.err.println( "发生I/O错误!!!" ); e.printStackTrace(); }
  • 它是基于Netflix的开源框架,该框架的目标在于控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。Hystrix具备服务熔断,服务降级,线程和信号隔离。 请求缓存,请求合并以及服务...
  • } } } 测试类: import java.util.Scanner; /** * @author caojie * @version 1.0 * @date 2020/11/2 9:53 */ public class Test { private static Scanner input = new Scanner(System.in); public static void ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 503
精华内容 201
关键字:

java容错机制

java 订阅