重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要介绍了zk工厂方法如何实现NIOServerCnxnFactory,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
创新互联是一家集网站建设,杞县企业网站建设,杞县品牌网站建设,网站定制,杞县网站建设报价,网络营销,网络优化,杞县网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
NIOServerCnxnFactory类
内部类
AbstractSelectThread
AcceptThread
SelectorThread
属性
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT | 10s session过期时间 |
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS | selector 线程数 |
ZOOKEEPER_NIO_NUM_WORKER_THREADS | worker 线程数 |
directBuffer | buffer用来线程间数据交互 |
ipMap | 限制ip上连接数 |
cnxnExpiryQueue | 连接失效时间分桶队列 |
workerPool | WorkerService worker执行服务 |
acceptThread | 接收新连接,simple round-robin 分配到选择线程 |
selectorThreads | |
方法
停止接收
private void pauseAccept(long millisecs) { acceptKey.interestOps(0); try { selector.select(millisecs); } catch (IOException e) { // ignore } finally { acceptKey.interestOps(SelectionKey.OP_ACCEPT); } } private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; try { sc = acceptSocket.accept(); accepted = true; InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); // Round-robin assign this connection to a selector thread if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; } private void processAcceptedConnections() { SocketChannel accepted; while (!stopped && (accepted = acceptedQueue.poll()) != null) { SelectionKey key = null; try { key = accepted.register(selector, SelectionKey.OP_READ); NIOServerCnxn cnxn = createConnection(accepted, key, this); key.attach(cnxn); addCnxn(cnxn); } catch (IOException e) { // register, createConnection cleanupSelectionKey(key); fastCloseSock(accepted); } } } configure 获取客户端连接数 private int getClientCnxnCount(InetAddress cl) { Sets = ipMap.get(cl); if (s == null) { return 0; } return s.size(); } 创建连接 protected NIOServerCnxn createConnection(SocketChannel sock, SelectionKey sk, SelectorThread selectorThread) throws IOException { return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread); } 创建连接 private void addCnxn(NIOServerCnxn cnxn) throws IOException { InetAddress addr = cnxn.getSocketAddress(); if (addr == null) { throw new IOException("Socket of " + cnxn + " has been closed"); } Set set = ipMap.get(addr); if (set == null) { // in general we will see 1 connection from each // host, setting the initial cap to 2 allows us // to minimize mem usage in the common case // of 1 entry -- we need to set the initial cap // to 2 to avoid rehash when the first entry is added // Construct a ConcurrentHashSet using a ConcurrentHashMap set = Collections.newSetFromMap(new ConcurrentHashMap (2)); // Put the new set in the map, but only if another thread // hasn't beaten us to it Set existingSet = ipMap.putIfAbsent(addr, set); if (existingSet != null) { set = existingSet; } } set.add(cnxn); cnxns.add(cnxn); touchCnxn(cnxn); } 思考: 为什么单机和集群模式启动不一样 单机可以直接从日志,快照恢复数据 集群根据角色划分,涉及到数据同步
感谢你能够认真阅读完这篇文章,希望小编分享的“zk工厂方法如何实现NIOServerCnxnFactory”这篇文章对大家有帮助,同时也希望大家多多支持创新互联,关注创新互联行业资讯频道,更多相关知识等着你来学习!