精华内容
下载资源
问答
  • 主要介绍了java ThreadPoolExecutor 并发调用实例详解的相关资料,需要的朋友可以参考下
  • 主要介绍了Go并发调用的超时处理的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • rpc接口并发调用实例

    2020-10-21 09:32:11
    问题背景 需要根据id通过rpc调用查询具体信息,因为没有提供批量查询的接口,所以做法是...方案二:rpc服务的调用由顺序调用修改为并行调用,采用线程池实现rpc的并发调用。 具体实现如下: 1)创建线程的类 public

    问题背景
    需要根据id通过rpc调用查询具体信息,因为没有提供批量查询的接口,所以做法是挨个遍历查询,那意味着:
    如果有100个id,就需要顺序进行100次rpc调用,假设每次rpc接口的调用时间是50ms(这个速度很快了),那单单rpc调用就要占用5s,所以接口的响应会非常慢。下面进行优化。

    优化方案:
    方案一:让服务方提供批量查询接口,需要服务提供方配合,这里暂不采用。
    方案二:rpc服务的调用由顺序调用修改为并行调用,采用线程池实现rpc的并发调用。

    具体实现如下:
    1)创建线程的类
    public class MyThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r);
        }
    }

    2)创建线程处理类,因为需要获取rpc调用的结果所以是实现callable类
    说明:这里需要获取spring提供的bean,获取方式是通过参数传入
    public class RegisterTask implements Callable {

        private String  registerId;
        private String  cityCode;
        // 这里需要获取spring提供的bean,获取方式是通过参数传入
        private RegisterClient registerClient;
        public RegisterTask(String registerId, RegisterClient registerClient, String  cityCode){
            this.registerId = registerId;
            this.cityCode = cityCode;
            this.registerClient = registerClient;
        }

        @Override
        public List<StudentLessonDto> call() throws Exception {
            // 这里就是进行rpc调用
            return registerClient.queryLessonsByRegistId(registerId, cityCode);
        }
    }

    3)方法getDetail()内部创建线程池
            ThreadFactory namedThreadFactory = new MyThreadFactory();
            int queueCapacity = 10000, corePoolSize = 10, maximumPoolSize = 10;
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(queueCapacity);
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 10, TimeUnit.SECONDS, arrayBlockingQueue, namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());

    4) 方法getDetail()内部线程池提交线程,并获取执行的结果
            // 线程池进行rpc调用
            ArrayList<Future<List<StudentLessonDto>>> futures = new ArrayList<>();
            for (String id : registerIdList) {
                Future future = threadPoolExecutor.submit(new RegisterTask(id, registerClient, cityCode));
                futures.add(future);
            }

            // 获取线程调用的结果
            for (Future future : futures) {
                try {
                    List<StudentLessonDto> list = (List<StudentLessonDto>) future.get();
                    if (!CollectionUtils.isEmpty(list)) {
                        result.put(list.get(0).getRegistId(), list);
                    }
                } catch (InterruptedException e) {
                    log.error("并发获取报名的讲次异常", e);
                } catch (ExecutionException e) {
                    log.error("并发获取报名的讲次异常", e);
                }
            }

    之后经过测试和上线。

    结果:
    第一天没啥问题,第二天开始有问题,主要现象如下:
    1) 服务调用异常,异常信息大致如下:
    org.springframework.web.util.NestedServletException: Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1055)
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
        at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:503)
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)
        at io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:74)
        at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(
        .......
    Caused by: java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
        at java.base/java.lang.Thread.start0(Native Method)

    2)机器无法登陆,异常如下:
    root:fork failed: Cannot allocate memory

    内存耗用的一干二净,root都没有内存了。最后重启机器才行。

    下面定位问题:
    最近新上的代码,肯定是我的新增的多线程部分除了内存,耗费了大量内存,但是怎么解释呢??
    说明肯定是线程资源没有得到释放,但是我是在方法内部创建的线程池,方法执行后按道理对应的线程池和线程资源应该会释放的(我个人错误的理解)。

    但是看到一篇博客(https://blog.csdn.net/lbh199466/article/details/104934207/),
    因为核心线程一直没有释放,所以对应的线程池和线程资源并没有释放。

    栈中的引用对象就是一种GcRoots, 所以如果核心线程一直不被回收,那么对应的线程与对象资源都不会被回收。线程栈和线程中的对象占用的对象都不会释放。引用关系:ThreadPoolExecutor->Worker->thread,然后因为thread一直不释放,所以对应的worker和池资源也都会不会释放。
    所以:
    方法内部定义线程池,核心线程数不为零,核心线程不会被回收,导致相关内存资源都不会被释放。
    也可以参考:https://zhuanlan.zhihu.com/p/72515308

    查看java进程中的线程的个数
    验证上面的结论,参考https://blog.csdn.net/blueheart20/article/details/78905267(获取当前进程数的方法)
    获取当前服务器上java进程的线程个数。下面挺一种实例
    jps 获取java进程pid
    top -Hp pid 获取进程中的线程个数 如 Threds:1190 total,代表进程中有1190个线程,可以的确看到当前线程数很多

    最终解决方案:
    1)参考 https://www.cnblogs.com/qxynotebook/p/7398882.html
    在线程池中,有核心线程,对于核心线程超时也回收,所以,需要确保核心线程超时之后也被回收。
    解决办法:在结果返回之前设置核心线程也回收:
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    2)参考:https://blog.csdn.net/wchgogo/article/details/78185643(unable to create new native thread)
    栈对内存的消耗:目前没有像堆那样指定最大占用内存,设置Xss指点单个线程的占用内存大小,默认线程占用空间时1M,可以设置为512k。


    服务器中影响最多线程数的因素:
    1)内存,线程肯定是占用内存的,如果内存耗尽,那自然不能继续创建线程。
    单个线程占用内存大小可通过-Xss设置,现在默认1M,一般建议512k就够了。
    如果Xss设置过大,则浪费内存空间;
    如果Xss设置过小,代码中有遍历或递归导致调用太深的时候,就有可能耗尽StackSpace,爆出StackOverflow的错误;

    2)机器设置的最大线程数
    操作系统会限制进程允许创建的线程数,使用ulimit -u命令查看限制。某些服务器上此阈值设置的过小,比如1024。

    展开全文
  • 并发调用下结果获取的原理 Dubbo协议在客户端针对每个Service调用,默认是使用单一Netty长连接来处理RPC调用请求的,而在客户端,如在web环境中,任何一个时刻,可能存在多个线程并发对该Service进行并发调用,这些...

    RPC并发调用的结果获取原理

    • Dubbo协议在客户端针对所有的Service类,默认是使用单一Netty长连接来处理对这些Service类的方法的RPC调用请求的,即所有Service共享这个单一netty长连接。而在客户端,如在web环境中,任何一个时刻,可能存在多个线程并发对该Service进行并发调用,这些请求都是通过该单一Channel发送和获取结果的,而Netty所有请求都是异步,故dubbo如何保证这些并发线程能正确获取到自己的请求结果,而不会造成数据混乱呢?核心实现为:
    1. 客户端Request通过AtomicLong生成的当前进程全局唯一id,服务端响应回传该id;
    2. 客户端通过FUTURES静态ConcurrentHashMap保存调用id和异步结果DefaultFuture之间的关系,服务端响应时,查询根据Response的回传请求id,获取该response对应的DefaultFuture,通过await和signal机制实现请求发起线程和结果获取线程之间的通信,最终请求发起线程得到最终的结果。

    源码实现

    • 当客户端发起对服务端的RPC调用时,使用的是DubboInvoker的doInvoker方法:
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                // For compatibility
                FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                RpcContext.getContext().setFuture(futureAdapter);
                Result result;
                if (isAsyncFuture) {
                    // register resultCallback, sometimes we need the asyn result being processed by the filter chain.
                    result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                } else {
                    result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                }
                return result;
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    

    核心关注:调用HeaderExchangeClient发送请求,获取future,这个future是DefaultFuture类,然后封装成FutureAdapter,构造AsyncRpcResult的result:

    // 代码1
    ResponseFuture future = currentClient.request(inv, timeout);
    // For compatibility
    FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
    RpcContext.getContext().setFuture(futureAdapter);
    Result result;
    if (isAsyncFuture) {
        // register resultCallback, sometimes we need the asyn result being processed by the filter chain.
        result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
    } else {
    
    AsyncRpcResult的getRpcResult实现:
    public Result getRpcResult() {
        Result result;
        try {
            result = resultFuture.get();
        } catch (Exception e) {
            // This should never happen;
            logger.error("", e);
            result = new RpcResult();
        }
        return result;
    }
    // 即调用了DefaultFuture的get()方法来获取结果,get中会通过DefaultFuture的done,调用done.await进行等待,这里是实现的关键,具体看下面的分析。
    
    // 代码2
    // currentClient.request底层最终调用HeaderExchangeChannel的request方法:通过DefaultFuture.newFuture(channel, req, timeout)创建DefaultFuture实例future并返回。
    
     public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
    // 其中Request如下:
    public Request() {
        mId = newId();
    }
    private static long newId() {
        // getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
        return INVOKE_ID.getAndIncrement();
    }
    private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    // 这里是关键:INVOKE_ID是静态递增的AtomicLong,即客户端的每次请求都每个请求都是有一个递增唯一的id的,这个id用于在客户端唯一确定一个请求。
    
    // 代码3
    DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout)的实现如下:
    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
    // 其中FUTURES.put(id, this);的FUTURES:
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
    // 即为静态常量,存放请求的id和DefaultFuture。
    
    
    • 客户端接收到服务端的RPC调用响应,从底层到顶层依次是NettyClient获取NettyServer的响应,NettyClient将响应向上传递给HeaderExchangeHandler的received方法:
    // 代码1
    // NettyClient将底层的netty bootstrap交给构造函数传进来的handler处理,这个handler就是HeaderExchangeHandler:
    public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
        super(url, wrapChannelHandler(url, handler));
    }
    
    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ClientBootstrap(channelFactory);
        // config
        // @see org.jboss.netty.channel.socket.SocketChannelConfig
        bootstrap.setOption("keepAlive", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("connectTimeoutMillis", getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }
    
    // 代码2
    // HeaderExchangeHandler的received实现:对于服务端的响应调用handleResponse方法处理
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        handleRequest(exchangeChannel, request);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
    
    // handleResponse的实现:静态方法,通过局部变量,即参数传入的方式保证线程安全,调用DefaultFuture.received方法。
    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
    
    DefaultFuture.received的实现:
    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    
    // response将客户端的request的id原样返回了,客户端接收结果线程从FUTURES中移除该请求的id和DefaultFuture实例future,调用future的doReceived处理:调用done的signal通知在done中等待的线程。
    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
    
    // 由上面的分析可知,客户端请求时,调用了DefaultFuture的get()方法在请求线程异步来获取结果,get的实现如下:在done调用await等待结果,从而通过await和signal实现线程之间的通信,客户端请求线程得到通知最终获取到了结果。
    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
    
    
    • 服务端从Netty Server接收请求,然后向上传给HeaderExchangeHandler处理:
    void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
    
            String msg;
            if (data == null) msg = null;
            else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
            else msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
    
            channel.send(res);
            return;
        }
        // find handler by message class.
        Object msg = req.getData();
        try {
            // handle data.
            CompletableFuture<Object> future = handler.reply(channel, msg);
            if (future.isDone()) {
                res.setStatus(Response.OK);
                res.setResult(future.get());
                channel.send(res);
                return;
            }
            future.whenComplete((result, t) -> {
                try {
                    if (t == null) {
                        res.setStatus(Response.OK);
                        res.setResult(result);
                    } else {
                        res.setStatus(Response.SERVICE_ERROR);
                        res.setErrorMessage(StringUtils.toString(t));
                    }
                    channel.send(res);
                } catch (RemotingException e) {
                    logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
                } finally {
                    // HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                }
            });
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
            channel.send(res);
        }
    }
    
    // 构造response,获取客户端请求req的id,进行回传:Response res = new Response(req.getId(), req.getVersion());
    
    // handle data.
    CompletableFuture<Object> future = handler.reply(channel, msg);
    if (future.isDone()) {
        res.setStatus(Response.OK);
        res.setResult(future.get());
        channel.send(res);
        return;
    }
    // 调用handler.reply,最终调用本地的Service,进行方法调用,即我们在配置文件中指定的dubbo:service的ref参数对应的bean。
    // handler是DubboProtocol中的requestHandler:
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext rpcContext = RpcContext.getContext();
                boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false);
                if (supportServerAsync) {
                    CompletableFuture<Object> future = new CompletableFuture<>();
                    rpcContext.setAsyncContext(new AsyncContextImpl(future));
                }
                rpcContext.setRemoteAddress(channel.getRemoteAddress());
                Result result = invoker.invoke(inv);
    
                if (result instanceof AsyncRpcResult) {
                    return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
                } else {
                    return CompletableFuture.completedFuture(result);
                }
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }
     ...
    }
    // 核心为:invoker在ServiceConfig的export时,封装了实际Server的ref,invoke最终交给ref进行方法调用。
    Invoker<?> invoker = getInvoker(channel, inv);
    Result result = invoker.invoke(inv);
    
    
    展开全文
  • hive并发调用的运行方式

    千次阅读 2016-07-14 16:05:03
    使用hive,我们很多情况下会并发调用hive程序,将sql任务转换成mapreuce提交到hadoop集群中,而在本人使用hive的过程中,发现并发调用hive有几个问题,在这个和大家分享下. 正文 默认安装hive,hive是使用derby...

    前言

    使用hive,我们很多情况下会并发调用hive程序,将sql任务转换成mapreuce提交到hadoop集群中,而在本人使用hive的过程中,发现并发调用hive有几个问题,在这个和大家分享下.

    正文

    默认安装hive,hive是使用derby内存数据库保存hive的元数据,这样是不可以并发调用hive的,需要配置为使用mysql保存hive的元数据。

     

    运行hive,可以有以下访问方式:

    1.hiveserver:hive以thrift服务的服务器形式运行,允许不同的语言编写客户端进行访问,通过thrift,jdbc,odbc连接器和hive服务器与hive通信,这种方式很适合java编程人员通过jdbc接口去访问hive,但是在实践中,发现并发调用时,很容易出现hiveserver无故宕机,没有jvm的dump文件,hiveserver的程序也毫无输出。

    2.metastore:

    2.1.内嵌metastore:默认情况下,metastore和hive是运行在同一个进程里,这种方式经过测试,在并发中是最稳定的,使用这种方式,暂时没有出现问题。

    2.2.远程metastore:通过配置hive.metastore.local为false,让metastore作为一个单独的进程运行,hive客户端都要连接远程metastore才能执行任务,但是在实践中,一样很容易出现远程metastore无故宕机,同样没有jvm的dump文件,远程metastore程序也毫无输出。

     

    如果你是多个机器安装了hive,多个机器共享同一个mysql元数据,那么默认情况下,在并发调用时,会偶尔发生hive报DELETEME找不到的错误,异常可以参考http://mail-archives.apache.org/mod_mbox/hive-user/201107.mbox/%3C4F6B25AFFFCAFE44B6259A412D5F9B1033183876@ExchMBX104.netflix.com%3E

    这个错误是hive使用的datanucleus框架的bug,在hive中会去取schme name和catalog,是第三方的库datanucleus在操作,可以看到它创建DELETEME123213一些随机数字的表,然后删掉。。目的就为了去获取schme name和catalog。而多个机器在并发过程中,datanucleus发现有DELETEME表,会删除,这个会导致创建了DELETEME的hive进程在访问mysql过程中报错。

    要解决这个问题,需要做以下配置:

    复制代码
    <property> 
      <name>datanucleus.fixedDatastore</name>          
      <value>true</value> 
    </property>
    <property>
     <name>datanucleus.autoCreateSchema</name>
     <value>false</value>
    </property>
    复制代码

    但是要注意:这个配置需要在让hive在第一次初始化好后,才能启动,因为第一次会自动创建元数据。

    但是,这个配置会导致hive不会自动创建元数据,而第一次初始化时,不是全部的元数据会建好的,所以这个配置需要折中平衡,建议是没有并发调用不启动这个配置,如果有并发调用启动,但是最好配置两种hive实例,一种不启动这个配置,作为日常的建表维护,一种作为定时任务,并发调用hive。

    hive的并发调用,是很容易遇到问题的,要小心处理。

     

    总结

    个人经验,如果需要并发调用hive,首先要配置hive的元数据为mysql数据库,最好是通过内嵌metastore的方式去调用hive,通过执行 $HIVE_HOME/bin/hive -S -e "<hive sql>",再从管道获取hive的输出,是本人觉得最稳定安全的方式。

    备注:执行sql做分析计算,以local的shell方式调用是没啥问题,但是在load data的时候,并发调用依然有问题,hive stats设置为mysql依然无果,只能将load data的代码以synchroize的方式调用,规避并发的问题。

    展开全文
  • 我在java代码中并发调用该存储过程,并用log4j打印调用日志,日志显示成功调用2566次,并成功插入到目标表中,但是在oracle实际查询时,数据量却少了100多条,而且每次执行程序,数据量少的数目都不一样。...
  • ThreadPoolExecutor 并发调用

    千次阅读 2017-05-05 21:22:45
    通常为了提供任务的处理速度,会使用一些并发模型,ThreadPoolExecutor中的invokeAll便是一种。 代码 package test.current;import java.util.ArrayList; import java.util.Arrays; import java.util.List; import ...

    概述


    通常为了提供任务的处理速度,会使用一些并发模型,ThreadPoolExecutor中的invokeAll便是一种。


    代码


    package test.current;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class TestCallable {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
    
            List<Callable<List<Long>>> tasks = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                Callable<List<Long>> task = new Callable<List<Long>>() {
                    @Override
                    public List<Long> call() throws Exception {
                        return Arrays.asList(1L,2L);
                    }
                };
    
                tasks.add(task);
            }
    
            List<Long> finalResults = new ArrayList<>(10);
            List<Future<List<Long>>> results = ThreadPool.getThreadPool().invokeAll(tasks);
            for(Future<List<Long>> ele : results) {
                List<Long> list = ele.get();
                finalResults.addAll(list);
            }
    
            System.out.println(finalResults);
        }
    }
    
    package test.current;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPool {
        private static final int CORE_SIZE = 8;
    
        private static final int MAX_SIZE = 12;
    
        private static final long KEEP_ALIVE_TIME = 30;
    
        private static final int QUEUE_SIZE = 50000;
    
        private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_SIZE, MAX_SIZE, KEEP_ALIVE_TIME,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(QUEUE_SIZE), new ThreadPoolExecutor.AbortPolicy());
    
        public static ThreadPoolExecutor getThreadPool() {
            return threadPool;
        }
    }
    

    可以把需要执行的任务创建一个Callable task,利用线程池中的线程并发的执行这些task,从而提高任务的执行效率。

    展开全文
  • 单例对象的并发调用需要同步

    千次阅读 2016-07-02 16:12:00
    为什么单例对象的并发调用需要同步?最近在阅读 《Inside theJVM》 这本书,结合一些日常工作学习中的感想,随便写一些东西,蜻蜓点水,不必有章法。 关于“单例同步”: 一直有人在问单例对象的并发调用是否需要...
  • 控制并发调用接口数

    千次阅读 2015-08-29 18:23:05
    控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用,需要一个轻量简洁的实现,大家看看下面的一个实现,然后可以自己写一个实现。 import java.util.Date; import java.util.concurrent....
  • Jemeter 测试存储过程的并发调用

    千次阅读 2017-08-03 17:08:36
    第一次使用Jemeter测试项目,测试的是多个存储过程的并发调用 准备工作: 先熟悉存储过程的脚本内容,确定了测试方案,需要编写两部分的脚本 1、创建压测数据的脚本 2、清理压测数据的脚本,目的是为了方便压测可以...
  • postman并发调用接口

    千次阅读 2019-07-10 17:43:09
    1.新增测试项目 2.配置好接口内容 配置完后要点击保存按钮 3.并发 并发结果返回 POSTman并发请求完成
  • 对于高并发调用TOP的回答

    千次阅读 热门讨论 2010-06-07 21:49:00
    对于高并发调用TOP的回答
  • 限制接口高并发调用

    2020-07-26 17:49:56
    场景:当某个接口提供第三方获取数据时候,如果第三方平台通过线程去轮询调用该接口,则会造成接口的压力过大,并且会对数据库的连接数造成负担,因此限制接口调用次数是有必要的。 解决方式:1.将调用的 用户id+...
  • Spring Cloud feign并发调用异常

    千次阅读 2020-08-14 18:23:34
    前言:今天在做压力测试的时候,1秒钟发出100个请求,发现调用用户服务的时候出现部分请求进入熔断。但是用户服务正常,未出现报错异常,经排查是由于Hystrix机制,当并发请求到达一定数量时,会剩余请求会进入熔断...
  • 概述 对consumer而言,Dubbo协议对每个Service默认是基于Netty单一长连接和NIO...客户端基于单一长连接的并发调用结果,请参加我的另外一篇文章: Dubbo源码分析:Dubbo协议客户端单一长连接下并发调用的结果获取
  • func calHandler(c *gin.Context) { ...}
  • 本地和测试环境上,同一时间只有一个线程在执行settleLoan的方法,也就是多线程同时调用的情况下,线程是顺序调用settleLoan方法的,即A执行完,B才执行等等。但是生产环境上,居然是多个线程同时在执行settleLoan...
  • 线程类: /** * 类名称:CallableTest.java * 类描述: * 作 者:why * 时 间:2016年11月7日 */ public class CallableTest implements ...正常情况下,执行4次所用时间应给是4000ms,而并发调用,用时1003ms。
  • from sklearn.externals import joblib ## 保存到test目录 joblib.dump(scaler,'/home/test/scaler_test') ## 读取scaler scaler_test = joblib.load('/home/test/scaler_test')

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 692,041
精华内容 276,816
关键字:

并发调用