Tag Archives: zookeeper

[Solved] zookeeper deletes a directory node Error: authentication is not valid: /HBase/tokenauth

Problem background:

When building an HDP cluster, after Kerberos is enabled, the zookeeper.znode.parent of HBase becomes/HBase secure. I want to change it to/HBase

However, after changing this value, there is a problem connecting HBase to zookeeper. It should be that/HBase already exists in zookeeper. I want to delete/HBase in ZK and reinitialize it,

But it can’t be deleted, Error reporting: authentication is not valid:/HBase/tokenauth

Solution:

This is because zookeeper opens ACL, and I can’t use zookeeper’s getacl command,

Then, I directly turn off the ACL function of zookeeper, and then restart zookeeper,

[close ACL]   Add a zookeeper configuration. Here I demonstrate HDP:

skipACL = yes

At this time, you can delete the directory node in zookeeper. This is a newly built cluster and can be deleted at will. If it is a production cluster, be careful!!

At this point, change the zookeeper.znode.parent of my HBase to/habse, restart and enter the HBase shell for verification

After the above, I opened the ACL function of zookeeper again, that is, delete the configuration and restart zookeeper;

[Solved] Error contacting service. It is probable not running

1. An error is reported when the zookeeper is started, as shown in the following figure

2. Solutions

After zookeeper version 3.5.3, there is an embedded management console that is started through jetty and will also occupy port 8080

So add the following in the zoo.cfg configuration

admin.serverPort=No occupied port number

ZooKeeper Startup Error: Error contacting service. It is probably not running

Zookeeper can be started, but it is found that the master election has not been conducted by checking the server status:

Possible problems and solutions:

If configured step by step, a myid file should be created in the directory specified by dataDir of each node. This myid is consistent with the server ID and is used to uniquely identify a machine in a zookeeper cluster. Each machine cannot be repeated.

Recheck the myid file to ensure that the myid value of each server is not repeated.

Zookeeper link error keepererrorcode = nodeexists for

zookeeper    Link error

CONNECTING
Receive watched event:WatchedEvent state:SyncConnected type:None path:null
ZooKeeper session established.
org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /test_node
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
	at ZKTest.doSomething(ZKTest.java:27)
	at ZKTest.process(ZKTest.java:21)
	at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522)
	at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498) 

  To delete the original   Restart the version-2 folder under the dataDir path

https://blog.csdn.net/agony_sun/article/details/77480109

Error already running as process when starting zookeeper

An error was reported when starting zookeeper today:

[ root@hadoop -one zookeeper-3.4.5]# bin/zkServer.sh start
JMX enabled by default
Using config: /root/zookeeper/zookeeper-3.4.5/bin/../conf/zoo.cfg
Starting zookeeper … already running as process 947..

After looking at the 947 process, it is found that it is a Linux system process and should not conflict. Check several port numbers that zookeeper will use, and they are not occupied

Later, open the bin/zkserver.sh startup script and search the place where the above sentence will be thrown:

    echo  -n "Starting zookeeper ... "
    if [ -f $ZOOPIDFILE ]; then
      if kill -0 `cat $ZOOPIDFILE` > /dev/null 2>&1; then
         echo $command already running as process `cat $ZOOPIDFILE`.
         exit 0
      fi
    fi

When you see $zoomidfile, you think that the process ID file already exists, which makes it impossible to start

Go to dataDir and see if there is a zookeeper_Server.pid, last modified a few days ago. The last time the machine was shut down abnormally due to power failure, which probably caused the residual PID file

Delete, restart, OK

An error is reported when zookeeper starts (the data directory permission is incorrect)

Zookeeper startup error log:

2016-11-16 11:19:43,880 [myid:3] - INFO  [WorkerReceiver[myid=3]:FastLeaderElection@542] - Notification: 3 (n.leader), 0x200111a88 (n.zxid), 0x1 (n.round), LOOKING (n.state), 3 (n.sid), 0x2 (n.peerEPoch), LOOKING (my state)
2016-11-16 11:19:43,883 [myid:3] - INFO  [WorkerReceiver[myid=3]:FastLeaderElection@542] - Notification: 2 (n.leader), 0x200111a88 (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x2 (n.peerEPoch), LOOKING (my state)
2016-11-16 11:19:43,886 [myid:3] - INFO  [WorkerReceiver[myid=3]:FastLeaderElection@542] - Notification: 3 (n.leader), 0x200111a88 (n.zxid), 0x1 (n.round), LOOKING (n.state), 2 (n.sid), 0x2 (n.peerEPoch), LOOKING (my state)
2016-11-16 11:19:44,089 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:QuorumPeer@750] - LEADING
2016-11-16 11:19:44,093 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Leader@59] - TCP NoDelay set to: true
2016-11-16 11:19:44,097 [myid:3] - INFO  [node03/172.16.145.113:3888:QuorumCnxManager$Listener@493] - Received connection request /172.16.145.111:43715
2016-11-16 11:19:44,101 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:zookeeper.version=3.4.5-cdh5.3.0--1, built on 12/17/2014 02:55 GMT
2016-11-16 11:19:44,101 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:host.name=node03
2016-11-16 11:19:44,101 [myid:3] - INFO  [WorkerReceiver[myid=3]:FastLeaderElection@542] - Notification: 1 (n.leader), 0x200111a88 (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x2 (n.peerEPoch), LEADING (my state)
2016-11-16 11:19:44,101 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:java.version=1.7.0_71
2016-11-16 11:19:44,101 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:java.vendor=Oracle Corporation
2016-11-16 11:19:44,101 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:java.home=/usr/java/jdk1.7.0_71/jre
2016-11-16 11:19:44,102 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:java.class.path=/usr/lib/zookeeper/bin/../build/classes:/usr/lib/zookeeper/bin/../build/lib/*.jar:/usr/lib/zookeeper/bin/../lib/slf4j-log4j12.jar:/usr/lib/zookeeper/bin/../lib/slf4j-log4j12-1.7.5.jar:/usr/lib/zookeeper/bin/../lib/slf4j-api-1.7.5.jar:/usr/lib/zookeeper/bin/../lib/netty-3.2.2.Final.jar:/usr/lib/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/lib/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper/bin/../zookeeper-3.4.5-cdh5.3.0.jar:/usr/lib/zookeeper/bin/../src/java/lib/*.jar:/etc/zookeeper/conf::/etc/zookeeper/conf:/usr/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar:/usr/lib/zookeeper/zookeeper.jar:/usr/lib/zookeeper/lib/netty-3.2.2.Final.jar:/usr/lib/zookeeper/lib/log4j-1.2.16.jar:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar:/usr/lib/zookeeper/lib/slf4j-api-1.7.5.jar:/usr/lib/zookeeper/lib/jline-0.9.94.jar:/usr/lib/zookeeper/lib/slf4j-log4j12.jar
2016-11-16 11:19:44,102 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2016-11-16 11:19:44,102 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:java.io.tmpdir=/tmp
2016-11-16 11:19:44,102 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:java.compiler=<NA>
2016-11-16 11:19:44,102 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:os.name=Linux
2016-11-16 11:19:44,102 [myid:3] - INFO  [WorkerReceiver[myid=3]:FastLeaderElection@542] - Notification: 3 (n.leader), 0x200111a88 (n.zxid), 0x1 (n.round), LOOKING (n.state), 1 (n.sid), 0x2 (n.peerEPoch), LEADING (my state)
2016-11-16 11:19:44,102 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:os.arch=amd64
2016-11-16 11:19:44,103 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:os.version=2.6.32-504.el6.x86_64
2016-11-16 11:19:44,103 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:user.name=zookeeper
2016-11-16 11:19:44,103 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:user.home=/var/lib/zookeeper
2016-11-16 11:19:44,103 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Environment@100] - Server environment:user.dir=/
2016-11-16 11:19:44,105 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:ZooKeeperServer@162] - Created server with tickTime 2000 minSessionTimeout 4000 maxSessionTimeout 40000 datadir /var/lib/zookeeper/version-2 snapdir /var/lib/zookeeper/version-2
2016-11-16 11:19:44,106 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:Leader@348] - LEADING - LEADER ELECTION TOOK - 243
2016-11-16 11:19:44,120 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:FileSnap@83] - Reading snapshot /var/lib/zookeeper/version-2/snapshot.200106cdb
2016-11-16 11:19:44,511 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:FileTxnSnapLog@273] - Snapshotting: 0x200111a88 to /var/lib/zookeeper/version-2/snapshot.200111a88
2016-11-16 11:19:44,511 [myid:3] - ERROR [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:ZooKeeperServer@272] - Severe unrecoverable error, exiting
java.io.FileNotFoundException: /var/lib/zookeeper/version-2/snapshot.200111a88 (Permission denied)
       at java.io.FileOutputStream.open(Native Method)
       at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
       at java.io.FileOutputStream.<init>(FileOutputStream.java:171)
       at org.apache.zookeeper.server.persistence.FileSnap.serialize(FileSnap.java:225)
       at org.apache.zookeeper.server.persistence.FileTxnSnapLog.save(FileTxnSnapLog.java:275)
       at org.apache.zookeeper.server.ZooKeeperServer.takeSnapshot(ZooKeeperServer.java:270)
       at org.apache.zookeeper.server.ZooKeeperServer.loadData(ZooKeeperServer.java:265)
       at org.apache.zookeeper.server.quorum.Leader.lead(Leader.java:357)
       at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:753)
2016-11-16 11:58:48,309 [myid:] - INFO  [main:QuorumPeerConfig@101] - Reading configuration from: /etc/zookeeper/conf/zoo.cfg
2016-11-16 11:58:48,316 [myid:] - INFO  [main:QuorumPeerConfig@334] - Defaulting to majority quorums
2016-11-16 11:58:48,322 [myid:3] - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2016-11-16 11:58:48,322 [myid:3] - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 0
2016-11-16 11:58:48,323 [myid:3] - INFO  [main:DatadirCleanupManager@101] - Purge task is not scheduled.
2016-11-16 11:58:48,338 [myid:3] - INFO  [main:QuorumPeerMain@132] - Starting quorum peer
2016-11-16 11:58:48,352 [myid:3] - INFO  [main:NIOServerCnxnFactory@94] - binding to port 0.0.0.0/0.0.0.0:2181
2016-11-16 11:58:48,371 [myid:3] - INFO  [main:QuorumPeer@913] - tickTime set to 2000
2016-11-16 11:58:48,371 [myid:3] - INFO  [main:QuorumPeer@933] - minSessionTimeout set to -1
2016-11-16 11:58:48,371 [myid:3] - INFO  [main:QuorumPeer@944] - maxSessionTimeout set to -1
2016-11-16 11:58:48,372 [myid:3] - INFO  [main:QuorumPeer@959] - initLimit set to 10
2016-11-16 11:58:48,392 [myid:3] - INFO  [main:FileSnap@83] - Reading snapshot /var/lib/zookeeper/version-2/snapshot.200106cdb
2016-11-16 11:58:49,556 [myid:3] - INFO  [Thread-1:QuorumCnxManager$Listener@486] - My election bind port: 0.0.0.0/0.0.0.0:3888
2016-11-16 11:58:49,570 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:QuorumPeer@670] - LOOKING
2016-11-16 11:58:49,572 [myid:3] - INFO  [QuorumPeer[myid=3]/0:0:0:0:0:0:0:0:2181:FastLeaderElection@740] - New election. My id =  3, proposed zxid=0x200111a88
2016-11-16 11:58:49,578 [myid:3] - WARN  [WorkerSender[myid=3]:QuorumCnxManager@368] - Cannot open channel to 1 at election address node01/172.16.145.111:3888
java.net.ConnectException: Connection refused
       at java.net.PlainSocketImpl.socketConnect(Native Method)
       at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
       at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
       at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
       at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
       at java.net.Socket.connect(Socket.java:579)
       at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:354)
       at org.apache.zookeeper.server.quorum.QuorumCnxManager.toSend(QuorumCnxManager.java:327)
       at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.process(FastLeaderElection.java:393)
       at org.apache.zookeeper.server.quorum.FastLeaderElection$Messenger$WorkerSender.run(FastLeaderElection.java:365)
       at java.lang.Thread.run(Thread.java:745)

 

Solution: change the owner of the dataDir directory and the files in the directory to the zookeeper user and zookeeper user group

Execute command: chown – R zookeeper: zookeeper/var/lib/zookeeper

[root@node03 zookeeper]# chown -R  zookeeper:zookeeper /var/lib/zookeeper
[root@node03 zookeeper]# ls -l /var/lib/zookeeper
total 16
-rw-r--r--. 1 zookeeper zookeeper    2 Nov 13 21:20 myid
drwxr-xr-x. 2 zookeeper zookeeper 4096 Nov 16 12:23 version-2
-rw-r--r--. 1 zookeeper zookeeper 1055 Nov 13 21:20 zookeeper.out
-rw-r--r--. 1 zookeeper zookeeper    4 Nov 13 21:20 zookeeper_server.pid
[root@node03 zookeeper]# ls -l /var/lib/zookeeper/version-2/
total 114016
-rw-r--r--. 1 zookeeper zookeeper        1 Nov 16 12:22 acceptedEpoch
-rw-r--r--. 1 zookeeper zookeeper        1 Nov 16 12:22 currentEpoch
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 15 15:57 log.200000001
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 15 17:24 log.20000d912
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 15 18:38 log.200021e5b
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 15 19:39 log.200032bd2
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 15 21:02 log.200040925
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 15 22:03 log.200053776
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 15 23:39 log.200061578
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 00:34 log.2000771e7
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 01:57 log.200083bd3
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 03:14 log.200096a85
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 04:53 log.2000a84e4
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 06:12 log.2000becbc
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 07:45 log.2000d0c68
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 08:58 log.2000e60a1
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 10:09 log.2000f6a29
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 11:01 log.200106cdd
-rw-r--r--. 1 zookeeper zookeeper 67108880 Nov 16 12:32 log.300000001
-rw-r--r--. 1 zookeeper zookeeper      296 Nov 13 22:34 snapshot.100000000
-rw-r--r--. 1 zookeeper zookeeper     1419 Nov 15 15:57 snapshot.20000d910
-rw-r--r--. 1 zookeeper zookeeper     1594 Nov 15 17:24 snapshot.200021e5c
-rw-r--r--. 1 zookeeper zookeeper     1594 Nov 15 18:38 snapshot.200032bd3
-rw-r--r--. 1 zookeeper zookeeper     1606 Nov 15 19:39 snapshot.200040928
-rw-r--r--. 1 zookeeper zookeeper     1606 Nov 15 21:02 snapshot.200053774
-rw-r--r--. 1 zookeeper zookeeper     1771 Nov 15 22:03 snapshot.200061576
-rw-r--r--. 1 zookeeper zookeeper     1594 Nov 15 23:39 snapshot.2000771e7
-rw-r--r--. 1 zookeeper zookeeper     1594 Nov 16 00:34 snapshot.200083bd2
-rw-r--r--. 1 zookeeper zookeeper     1606 Nov 16 01:57 snapshot.200096a83
-rw-r--r--. 1 zookeeper zookeeper     1594 Nov 16 03:14 snapshot.2000a84e4
-rw-r--r--. 1 zookeeper zookeeper     1771 Nov 16 04:53 snapshot.2000becbb
-rw-r--r--. 1 zookeeper zookeeper     1771 Nov 16 06:12 snapshot.2000d0c66
-rw-r--r--. 1 zookeeper zookeeper     1771 Nov 16 07:45 snapshot.2000e60a1
-rw-r--r--. 1 zookeeper zookeeper     1771 Nov 16 08:58 snapshot.2000f6a29
-rw-r--r--. 1 zookeeper zookeeper     1606 Nov 16 10:09 snapshot.200106cdb
-rw-r--r--. 1 zookeeper zookeeper     1395 Nov 16 12:22 snapshot.200111a88

Restart and Done!

[Solved] Zookeeperjava.net.ConnectException: Connection refused: no further information

zookeeper error: java.net.ConnectException: Connection refused: no further information

 

Error Message:
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxnSendThread.run(ClientCnxn.java:1141)2018−08−2314:29:30,700[localhost−startStop−1−SendThread(192.168.43.31:2181)][org.apache.zookeeper.ClientCnxnSocketNIO]−[DEBUG]Ignoringexceptionduringshutdowninputjava.nio.channels.ClosedChannelExceptionatsun.nio.ch.SocketChanne

 

Solution:
Write picture description here
start tomcat again and succeed.

Cause analysis of Hadoop ring ResourceManager crash caused by data limitation of zookeeper node (2)

After five months (click to read the previous article), the problem as shown in the title occurs again. Due to the improvement of our big data monitoring system, I have made a further study on this problem. The following is the whole investigation process and solution:

1、 Problem description

The first ResourceManager service exception alarm was received from 8:12 a.m. on August 8. As of 8:00 a.m. on August 11, the ResourceManager service exception problem frequently occurred between 8:00 a.m. and 8:12 a.m. every day, and occasionally occurred at 8:00 p.m. and 1-3 p.m. every day. The following is the statistics of ResourceManager abnormal status times by SpaceX :

2、 Abnormal causes

1. Abnormal information

The following interception is the log between 20:00 and 20:12 on August 8 , and the exception information in other periods is the same as this information:

2019-08-08 20:12:18,681 INFO org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore: Retrying operation on ZK. Retry no. 544
2019-08-08 20:12:18,886 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server 10.204.245.44/10.204.245.44:5181. Will not attempt to authenticate using SASL (unknown error)
2019-08-08 20:12:18,887 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to 10.204.245.44/10.204.245.44:5181, initiating session
2019-08-08 20:12:18,887 INFO org.apache.zookeeper.ClientCnxn: Session establishment complete on server 10.204.245.44/10.204.245.44:5181, sessionid = 0x26c00dfd48e9068, negotiated timeout = 60000
2019-08-08 20:12:20,850 WARN org.apache.zookeeper.ClientCnxn: Session 0x26c00dfd48e9068 for server 10.204.245.44/10.204.245.44:5181, unexpected error, closing socket connection and attempting reconnect
java.lang.OutOfMemoryError: Java heap space
2019-08-08 20:12:20,951 INFO org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore: Exception while executing a ZK operation.
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:99)
	at org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:935)
	at org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:915)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore$5.run(ZKRMStateStore.java:989)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore$5.run(ZKRMStateStore.java:986)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore$ZKAction.runWithCheck(ZKRMStateStore.java:1128)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore$ZKAction.runWithRetries(ZKRMStateStore.java:1161)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.doMultiWithRetries(ZKRMStateStore.java:986)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.doMultiWithRetries(ZKRMStateStore.java:1000)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.setDataWithRetries(ZKRMStateStore.java:1017)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore.updateApplicationAttemptStateInternal(ZKRMStateStore.java:713)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$UpdateAppAttemptTransition.transition(RMStateStore.java:243)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$UpdateAppAttemptTransition.transition(RMStateStore.java:226)
	at org.apache.hadoop.yarn.state.StateMachineFactory$SingleInternalArc.doTransition(StateMachineFactory.java:362)
	at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:302)
	at org.apache.hadoop.yarn.state.StateMachineFactory.access$300(StateMachineFactory.java:46)
	at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:448)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.handleStoreEvent(RMStateStore.java:812)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$ForwardingEventHandler.handle(RMStateStore.java:872)
	at org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore$ForwardingEventHandler.handle(RMStateStore.java:867)
	at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:182)
	at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:109)
	at java.lang.Thread.run(Thread.java:745)

2. Abnormal causes

The main reason is that the ZK server limits the amount of data of a single node to less than 1m . After the data submitted by the client exceeds 1m , the ZK server will throw the following exception:

Exception causing close of session 0x2690d678e98ae8b due to java.io.IOException: Len error 1788046

After the exception is thrown, yard will continue to retry the ZK , with short interval and many times of retrying, resulting in yard memory overflow and unable to provide normal service.

3. Yard exception code

The following is org.apache.hadoop . yarn.server.resourcemanager . recovery.ZKRMStateStore Method of exception code in :

   /**
     * Update Information
     *
     * @param appAttemptId
     * @param attemptStateDataPB
     * @throws Exception
     */
    @Override
    public synchronized void updateApplicationAttemptStateInternal(
            ApplicationAttemptId appAttemptId,
            ApplicationAttemptStateData attemptStateDataPB)
            throws Exception {
        String appIdStr = appAttemptId.getApplicationId().toString();
        String appAttemptIdStr = appAttemptId.toString();
        String appDirPath = getNodePath(rmAppRoot, appIdStr);
        String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
                    + " at: " + nodeUpdatePath);
        }
        byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();

        if (existsWithRetries(nodeUpdatePath, true) != null) {
            setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
        } else {
            createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
                    CreateMode.PERSISTENT);
            LOG.debug(appAttemptId + " znode didn't exist. Created a new znode to"
                    + " update the application attempt state.");
        }
    }

This code is mainly used to update or add task retrial status information to ZK , horn in the process of scheduling tasks, tasks may be retried many times, which is mainly affected by network, hardware, resources and other factors. If the task retry information fails to save ZK , the will be called org.apache.hadoop . yarn.server.resourcemanager . recovery.ZKRMStateStore.ZKAction . runwithretries method to try again. By default, the number of retries is 1000 times, and the interval between retries is affected by whether to enable yard high availability, that is, yard- site.xml in yarn.resourcemanager.ha Whether the. Enabled parameter is true . The official explanation of the retrial interval is as follows:

Retry interval in milliseconds when connecting to ZooKeeper. When HA is enabled, the value here is NOT used. It is generated automatically from yarn.resourcemanager.zk-timeout-ms and yarn.resourcemanager.zk-num-retries.

Under the condition of whether to enable horn high availability, the retrial interval mechanism is as follows:

(1) horn high availability is not enabled:

Affected by yarn.resourcemanager.zk -The default value of this parameter is 1000 in the Bi production environment, and the unit is Ms.

(2) Enable horn high availability:

Affected by yarn.resourcemanager.zk -Timeout MS ( ZK session timeout) and session timeout yarn.resourcemanager.zk -Num retries (number of retries after operation failure) parameter control, the calculation formula is:

try later(yarn.resourcemanager.zk-retry-interval-ms )=yarn.resourcemanager.zk-timeout-ms(ZK session超时时间)/yarn.resourcemanager.zk-num-retries(重试次数)

The process of determining the retrial interval is in org.apache.hadoop . yarn.server.resourcemanager . recovery.ZKRMStateStore.initInternal The method source code is:

// Calculate the time interval to retry the connection ZK, expressed in milliseconds
if (HAUtil.isHAEnabled(conf)) { // In the case of high availability: retry interval = session timeout / number of retries to ZK
    zkRetryInterval = zkSessionTimeout/numRetries;
} else {
    zkRetryInterval =
            conf.getLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
                    YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
}

Bi configuration of production environment:

yarn.resourcemanager.zk -Timeout MS : 60000 , unit: ms

yarn.resourcemanager.zk -Num retries : use the default value 1000 , unit times

Therefore, the retrial interval of Bi production environment is 60000/1000 = 60 . If the task status is not saved successfully, it will be retried 1000 times with an interval of 60 Ms. It’s terrible, which will eventually lead to horn heap memory overflow ( 10g = 4G [Cenozoic] + 6G [old age] ). The following is the JVM monitoring data monitored by SpaceX when using the above 2 parameters to perform high frequency retrying operation:

(1) Heap memory usage:

(2) GC times:

(3) full GC time:

3、 Solutions

1. Adjust the number of completed tasks saved by yard in ZK to solve the problem that yard registers too many useless watchers in ZK due to saving too much completed task information in ZK (default value is 10000 ). Major adjustments yarn.resourcemanager.state – store.max -Completed applications and yarn.resourcemanager.max -The parameters of completed applications are as follows:

<!--ZKMaximum number of completed tasks saved--&>
<property&>
  <name&>yarn.resourcemanager.state-store.max-completed-applications</name&>
  <value&>2000</value&>
</property&>

<!--The maximum number of completed tasks saved in RM memory, adjust this parameter mainly for the consistency of the information and number of tasks saved in RM memory and ZK--&>
<property&>
  <name&>yarn.resourcemanager.max-completed-applications</name&>
  <value&>2000</value&>
</property&>

Task status information saved in ZK ( RM_ APP_ The structure of root ) is as follows:

    ROOT_DIR_PATH
      |--- VERSION_INFO
      |--- EPOCH_NODE
      |--- RM_ZK_FENCING_LOCK
      |--- RM_APP_ROOT
      |     |----- (#ApplicationId1)
      |     |        |----- (#ApplicationAttemptIds)
      |     |
      |     |----- (#ApplicationId2)
      |     |       |----- (#ApplicationAttemptIds)
      |     ....
      |
      |--- RM_DT_SECRET_MANAGER_ROOT
      |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
      |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
      |       |----- Token_1
      |       |----- Token_2
      |       ....
      |
      |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
      |      |----- Key_1
      |      |----- Key_2
      ....
      |--- AMRMTOKEN_SECRET_MANAGER_ROOT
      |----- currentMasterKey
      |----- nextMasterKey

The data structure determines the algorithm implementation. As can be seen from the above structure, a task ID ( applicationid ) will correspond to multiple task retrial information ID ( applicationattemptid ), zkrmstatestore has registered watcher for these nodes, so too many nodes will increase the number of watchers and consume too much ZK Heap memory. Bi production environment horn running tasks every day 7000 or so. Therefore, the above two parameters are reduced to 2000 , and the adjustment will not affect the task status information at runtime. The specific reasons are as follows:

(1) From org.apache.hadoop . yarn.server.resourcemanager According to the operations related to member variables completedappsinstatestore and completedapps in. Rmappmanager class, the above two configurations save the information of completed tasks. The relevant codes are as follows:

protected int completedAppsInStateStore = 0; //Record the completed task information, task completion automatically add 1
private LinkedList<ApplicationId&> completedApps = new LinkedList<ApplicationId&>();// Record the task ID of the completed task, the task is completed and executed remove

 /**
   * Save completed task information
   * @param applicationId
   */
  protected synchronized void finishApplication(ApplicationId applicationId) {
    if (applicationId == null) {
      LOG.error("RMAppManager received completed appId of null, skipping");
    } else {
      // Inform the DelegationTokenRenewer
      if (UserGroupInformation.isSecurityEnabled()) {
        rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
      }
      
      completedApps.add(applicationId);
      completedAppsInStateStore++;
      writeAuditLog(applicationId);
    }
  }

  /*
   * check to see if hit the limit for max # completed apps kept
   *
   * Check if the number of completed applications stored in memory and ZK exceeds the maximum limit, and perform a remove completed task information operation if the limit is exceeded
   */
  protected synchronized void checkAppNumCompletedLimit() {
    // check apps kept in state store.
    while (completedAppsInStateStore &> this.maxCompletedAppsInStateStore) {
      ApplicationId removeId =
          completedApps.get(completedApps.size() - completedAppsInStateStore);
      RMApp removeApp = rmContext.getRMApps().get(removeId);
      LOG.info("Max number of completed apps kept in state store met:"
          + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
          + ", removing app " + removeApp.getApplicationId()
          + " from state store.");
      rmContext.getStateStore().removeApplication(removeApp);
      completedAppsInStateStore--;
    }

    // check apps kept in memorty.
    while (completedApps.size() &> this.maxCompletedAppsInMemory) {
      ApplicationId removeId = completedApps.remove();
      LOG.info("Application should be expired, max number of completed apps"
          + " kept in memory met: maxCompletedAppsInMemory = "
          + this.maxCompletedAppsInMemory + ", removing app " + removeId
          + " from memory: ");
      rmContext.getRMApps().remove(removeId);
      this.applicationACLsManager.removeApplication(removeId);
    }
  }

(2) Before modification, yard used the default value of 10000 for the maximum number of completed task information saved in ZK , and checked in zkdoc /bi-rmstore-20190811-1/zkrmstateroot/rmapproot in zkdoc , the number of sub nodes is 10000 + . After reducing, check the /bi-rmstore-20190811-1/zkrmstateroot/rmapproot in zkdoc to see that the number of sub nodes is 2015 , yard the real-time data of the monitoring page shows that 15 tasks were running at that time, that is to say, yard saves the status information of running tasks and completed tasks under this node . The monitoring data of zkdoc are as follows:

From this, we can conclude the mechanism of saving and removing task state in horn :

when there is a new task, yard uses the storeapplicationstateinternal method of zkrmstatestore to save the state of the new task

when more than yarn.resourcemanager.state - store.max -When the completed applications parameter is limited, yard uses the removeapplication method of rmstatestore to delete the status of completed tasks

rmstatestore is the parent class of zkrmstatestore . The synchronized keyword is added to the above two methods. The two operations are independent and do not interfere with each other, so they will not affect the tasks running in yard .

2. Solve the following problems: the retrial interval is too short, which leads to the shortage of horn heap memory and frequent GC :

<!--Default 1000, set to 100 here is to control the frequency of retry connections to ZK, in high availability case, retry frequency (yarn.resourcemanager.zk-retry-interval-ms ) = yarn.resourcemanager.zk-timeout-ms (ZK session timeout) / yarn.resourcemanager.zk-num-retries (number of retries)--&>
<property&>
  <name&>yarn.resourcemanager.zk-num-retries</name&>
  <value&>100</value&>
</property&>

After adjustment, the retrial interval of Bi production environment horn connection ZK is: 60000/100 = 600 Ms. The JVM data monitored by SpaceX is as follows:

(1) Heap memory usage:

(2) GC times:

(3) full GC time:

It can be seen from the monitoring data that when the problem occurs, the JVM heap memory usage, GC times and time consumption are improved due to the increase of the retrial interval.

3. Solve the problem that the task retrial status data exceeds 1m :

Modifying the logic related to horn will affect the task recovery mechanism of horn , so we can only modify the server configuration and client configuration of ZK to solve this problem

(1) ZK server jute.maxbuffer increase the parameter size to 3M 3M

(2) Modify yarn- env.sh , in yard_ Opts and horn_ RESOURCEMANAGER_ Opts configuration – Djute.maxbuffer=3145728 The parameter indicates that the maximum amount of data submitted by the ZK client to the ZK server is 3M . The modified configuration is as follows:

YARN_OPTS="$YARN_OPTS -Dyarn.policy.file=$YARN_POLICYFILE -Djute.maxbuffer=3145728"

YARN_RESOURCEMANAGER_OPTS="-server -Xms10240m -Xmx10240m -Xmn4048m -Xss512k -verbose:gc -Xloggc:$YARN_LOG_DIR/gc_resourcemanager.log-`date +'%Y%m%d%H%M'` -XX:+PrintGCDateStamps -XX:+PrintGCDetails -XX:SurvivorRatio=8 -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSCompactAtFullCollection 
-XX:CMSFullGCsBeforeCompaction=0 -XX:+CMSClassUnloadingEnabled -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$YARN_LOG_DIR -Djute.maxbuffer=3145728 $YARN_RESOURCEMANAGER_OPTS"

After modification, restart ResourceManager service and ZK service to make the configuration effective.

4、 Summary

1. The log mechanism of Hadoop is perfect, and the whole log information is a complete event flow. Therefore, when you encounter problems, you must carefully read the log information of Hadoop to find clues.

2. At present, the ZK cluster used by yard is also used by HBase and other services. With the expansion of cluster scale and the growth of data volume, it will have a certain performance impact on ZK . Therefore, it is recommended to build a separate ZK cluster for yard instead of creating high load on ZK Use a common ZK cluster.

3. Adjusting the maximum amount of node data of ZK to 3M , will have a certain performance impact on ZK , such as cluster synchronization and request processing. Therefore, we must improve the monitoring of the basic service of ZK to ensure high availability.

5、 References

Troubleshooting of frequent change of ownership in yarn ResourceManager active

Resource manager ha application state storage and recovery

yard official issue :

(1) About issue : limit application resource reservation on nodes for non node/rack specific requests

(2) zkrmstatestore update data exceeds 1MB issue : ResourceManager failed when zkrmstatestore tries to update znode data larger than 1MB

Talk about the high availability of Flink jobmanager

Preface

This paper mainly studies the high availability of Flink jobmanager

Configuration

flink- conf.yaml

high-availability: zookeeper
high-availability.zookeeper.quorum: zookeeper:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: file:///share

The optional values of high availability are none or zookeeper; high- availability.zookeeper.quorum Peers; high used to specify zookeeper- availability.zookeeper.path . root is used to specify the root node path in zookeeper; high- availability.cluster -ID is used to specify the name of the node of the current cluster, which is located at root Under node- availability.storageDir Specifies the storage path of the jobmanager metadata

Masters file

localhost:8081
localhost:8082

The masters file specifies the address of the jobmanager

HighAvailabilityMode

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/jobmanager/ HighAvailabilityMode.java

public enum HighAvailabilityMode {
	NONE(false),
	ZOOKEEPER(true),
	FACTORY_CLASS(true);

	private final boolean haActive;

	HighAvailabilityMode(boolean haActive) {
		this.haActive = haActive;
	}

	/**
	 * Return the configured {@link HighAvailabilityMode}.
	 *
	 * @param config The config to parse
	 * @return Configured recovery mode or {@link HighAvailabilityMode#NONE} if not
	 * configured.
	 */
	public static HighAvailabilityMode fromConfig(Configuration config) {
		String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);

		if (haMode == null) {
			return HighAvailabilityMode.NONE;
		} else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
			// Map old default to new default
			return HighAvailabilityMode.NONE;
		} else {
			try {
				return HighAvailabilityMode.valueOf(haMode.toUpperCase());
			} catch (IllegalArgumentException e) {
				return FACTORY_CLASS;
			}
		}
	}

	/**
	 * Returns true if the defined recovery mode supports high availability.
	 *
	 * @param configuration Configuration which contains the recovery mode
	 * @return true if high availability is supported by the recovery mode, otherwise false
	 */
	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
		HighAvailabilityMode mode = fromConfig(configuration);
		return mode.haActive;
	}
}

Highavailabilitymode has three enumerations: none, zookeeper and factory_ Class; these enumerations have a property haactive to indicate whether highavailability is supported

HighAvailabilityOptions

flink-core-1.7.1- sources.jar !/org/apache/flink/configuration/Hig hAvailabilityOptions.java

@PublicEvolving
@ConfigGroups(groups = {
	@ConfigGroup(name = "HighAvailabilityZookeeper", keyPrefix = "high-availability.zookeeper")
})
public class HighAvailabilityOptions {

	// ------------------------------------------------------------------------
	//  Required High Availability Options
	// ------------------------------------------------------------------------

	/**
	 * Defines high-availability mode used for the cluster execution.
	 * A value of "NONE" signals no highly available setup.
	 * To enable high-availability, set this mode to "ZOOKEEPER".
	 * Can also be set to FQN of HighAvailability factory class.
	 */
	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
	public static final ConfigOption<String&> HA_MODE =
			key("high-availability")
			.defaultValue("NONE")
			.withDeprecatedKeys("recovery.mode")
			.withDescription("Defines high-availability mode used for the cluster execution." +
				" To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");

	/**
	 * The ID of the Flink cluster, used to separate multiple Flink clusters
	 * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
	 */
	public static final ConfigOption<String&> HA_CLUSTER_ID =
			key("high-availability.cluster-id")
			.defaultValue("/default")
			.withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace")
			.withDescription("The ID of the Flink cluster, used to separate multiple Flink clusters from each other." +
				" Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.");

	/**
	 * File system path (URI) where Flink persists metadata in high-availability setups.
	 */
	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
	public static final ConfigOption<String&> HA_STORAGE_PATH =
			key("high-availability.storageDir")
			.noDefaultValue()
			.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
			.withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");

	// ------------------------------------------------------------------------
	//  Recovery Options
	// ------------------------------------------------------------------------

	/**
	 * Optional port (range) used by the job manager in high-availability mode.
	 */
	public static final ConfigOption<String&> HA_JOB_MANAGER_PORT_RANGE =
			key("high-availability.jobmanager.port")
			.defaultValue("0")
			.withDeprecatedKeys("recovery.jobmanager.port")
			.withDescription("Optional port (range) used by the job manager in high-availability mode.");

	/**
	 * The time before a JobManager after a fail over recovers the current jobs.
	 */
	public static final ConfigOption<String&> HA_JOB_DELAY =
			key("high-availability.job.delay")
			.noDefaultValue()
			.withDeprecatedKeys("recovery.job.delay")
			.withDescription("The time before a JobManager after a fail over recovers the current jobs.");

	// ------------------------------------------------------------------------
	//  ZooKeeper Options
	// ------------------------------------------------------------------------

	/**
	 * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
	 */
	public static final ConfigOption<String&> HA_ZOOKEEPER_QUORUM =
			key("high-availability.zookeeper.quorum")
			.noDefaultValue()
			.withDeprecatedKeys("recovery.zookeeper.quorum")
			.withDescription("The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.");

	/**
	 * The root path under which Flink stores its entries in ZooKeeper.
	 */
	public static final ConfigOption<String&> HA_ZOOKEEPER_ROOT =
			key("high-availability.zookeeper.path.root")
			.defaultValue("/flink")
			.withDeprecatedKeys("recovery.zookeeper.path.root")
			.withDescription("The root path under which Flink stores its entries in ZooKeeper.");

	public static final ConfigOption<String&> HA_ZOOKEEPER_LATCH_PATH =
			key("high-availability.zookeeper.path.latch")
			.defaultValue("/leaderlatch")
			.withDeprecatedKeys("recovery.zookeeper.path.latch")
			.withDescription("Defines the znode of the leader latch which is used to elect the leader.");

	/** ZooKeeper root path (ZNode) for job graphs. */
	public static final ConfigOption<String&> HA_ZOOKEEPER_JOBGRAPHS_PATH =
			key("high-availability.zookeeper.path.jobgraphs")
			.defaultValue("/jobgraphs")
			.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
			.withDescription("ZooKeeper root path (ZNode) for job graphs");

	public static final ConfigOption<String&> HA_ZOOKEEPER_LEADER_PATH =
			key("high-availability.zookeeper.path.leader")
			.defaultValue("/leader")
			.withDeprecatedKeys("recovery.zookeeper.path.leader")
			.withDescription("Defines the znode of the leader which contains the URL to the leader and the current" +
				" leader session ID.");

	/** ZooKeeper root path (ZNode) for completed checkpoints. */
	public static final ConfigOption<String&> HA_ZOOKEEPER_CHECKPOINTS_PATH =
			key("high-availability.zookeeper.path.checkpoints")
			.defaultValue("/checkpoints")
			.withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
			.withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");

	/** ZooKeeper root path (ZNode) for checkpoint counters. */
	public static final ConfigOption<String&> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
			key("high-availability.zookeeper.path.checkpoint-counter")
			.defaultValue("/checkpoint-counter")
			.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
			.withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");

	/** ZooKeeper root path (ZNode) for Mesos workers. */
	@PublicEvolving
	public static final ConfigOption<String&> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
			key("high-availability.zookeeper.path.mesos-workers")
			.defaultValue("/mesos-workers")
			.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers")
			.withDescription(Description.builder()
				.text("The ZooKeeper root path for persisting the Mesos worker information.")
				.build());

	// ------------------------------------------------------------------------
	//  ZooKeeper Client Settings
	// ------------------------------------------------------------------------

	public static final ConfigOption<Integer&> ZOOKEEPER_SESSION_TIMEOUT =
			key("high-availability.zookeeper.client.session-timeout")
			.defaultValue(60000)
			.withDeprecatedKeys("recovery.zookeeper.client.session-timeout")
			.withDescription("Defines the session timeout for the ZooKeeper session in ms.");

	public static final ConfigOption<Integer&> ZOOKEEPER_CONNECTION_TIMEOUT =
			key("high-availability.zookeeper.client.connection-timeout")
			.defaultValue(15000)
			.withDeprecatedKeys("recovery.zookeeper.client.connection-timeout")
			.withDescription("Defines the connection timeout for ZooKeeper in ms.");

	public static final ConfigOption<Integer&> ZOOKEEPER_RETRY_WAIT =
			key("high-availability.zookeeper.client.retry-wait")
			.defaultValue(5000)
			.withDeprecatedKeys("recovery.zookeeper.client.retry-wait")
			.withDescription("Defines the pause between consecutive retries in ms.");

	public static final ConfigOption<Integer&> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
			key("high-availability.zookeeper.client.max-retry-attempts")
			.defaultValue(3)
			.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts")
			.withDescription("Defines the number of connection retries before the client gives up.");

	public static final ConfigOption<String&> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =
			key("high-availability.zookeeper.path.running-registry")
			.defaultValue("/running_job_registry/");

	public static final ConfigOption<String&> ZOOKEEPER_CLIENT_ACL =
			key("high-availability.zookeeper.client.acl")
			.defaultValue("open")
			.withDescription("Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be" +
				" set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use" +
				" SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).");

	// ------------------------------------------------------------------------

	/** Not intended to be instantiated. */
	private HighAvailabilityOptions() {}
}

High availability options defines the prefix high- availability.zookeeper Configuration item for

HighAvailabilityServicesUtils

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/highavailability/HighAvail abilityServicesUtils.java

public class HighAvailabilityServicesUtils {

	public static HighAvailabilityServices createAvailableOrEmbeddedServices(
		Configuration config,
		Executor executor) throws Exception {
		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);

		switch (highAvailabilityMode) {
			case NONE:
				return new EmbeddedHaServices(executor);

			case ZOOKEEPER:
				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

				return new ZooKeeperHaServices(
					ZooKeeperUtils.startCuratorFramework(config),
					executor,
					config,
					blobStoreService);

			case FACTORY_CLASS:
				return createCustomHAServices(config, executor);

			default:
				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
		}
	}

	public static HighAvailabilityServices createHighAvailabilityServices(
		Configuration configuration,
		Executor executor,
		AddressResolution addressResolution) throws Exception {

		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);

		switch (highAvailabilityMode) {
			case NONE:
				final Tuple2<String, Integer&> hostnamePort = getJobManagerAddress(configuration);

				final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
					hostnamePort.f0,
					hostnamePort.f1,
					JobMaster.JOB_MANAGER_NAME,
					addressResolution,
					configuration);
				final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
					hostnamePort.f0,
					hostnamePort.f1,
					ResourceManager.RESOURCE_MANAGER_NAME,
					addressResolution,
					configuration);
				final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
					hostnamePort.f0,
					hostnamePort.f1,
					Dispatcher.DISPATCHER_NAME,
					addressResolution,
					configuration);

				final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
					"%s must be set",
					RestOptions.ADDRESS.key());
				final int port = configuration.getInteger(RestOptions.PORT);
				final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
				final String protocol = enableSSL ?"https://" : "http://";

				return new StandaloneHaServices(
					resourceManagerRpcUrl,
					dispatcherRpcUrl,
					jobManagerRpcUrl,
					String.format("%s%s:%s", protocol, address, port));
			case ZOOKEEPER:
				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);

				return new ZooKeeperHaServices(
					ZooKeeperUtils.startCuratorFramework(configuration),
					executor,
					configuration,
					blobStoreService);

			case FACTORY_CLASS:
				return createCustomHAServices(configuration, executor);

			default:
				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
		}
	}

	/**
	 * Returns the JobManager's hostname and port extracted from the given
	 * {@link Configuration}.
	 *
	 * @param configuration Configuration to extract the JobManager's address from
	 * @return The JobManager's hostname and port
	 * @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration
	 */
	public static Tuple2<String, Integer&> getJobManagerAddress(Configuration configuration) throws ConfigurationException {

		final String hostname = configuration.getString(JobManagerOptions.ADDRESS);
		final int port = configuration.getInteger(JobManagerOptions.PORT);

		if (hostname == null) {
			throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS +
				"' is missing (hostname/address of JobManager to connect to).");
		}

		if (port <= 0 || port &>= 65536) {
			throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
				"' (port of the JobManager actor system) : " + port +
				".  it must be greater than 0 and less than 65536.");
		}

		return Tuple2.of(hostname, port);
	}

	private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
		final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
		final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);

		final HighAvailabilityServicesFactory highAvailabilityServicesFactory;

		try {
			highAvailabilityServicesFactory = InstantiationUtil.instantiate(
				haServicesClassName,
				HighAvailabilityServicesFactory.class,
				classLoader);
		} catch (Exception e) {
			throw new FlinkException(
				String.format(
					"Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
					haServicesClassName),
				e);
		}

		try {
			return highAvailabilityServicesFactory.createHAServices(config, executor);
		} catch (Exception e) {
			throw new FlinkException(
				String.format(
					"Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
					haServicesClassName),
				e);
		}
	}

	/**
	 * Enum specifying whether address resolution should be tried or not when creating the
	 * {@link HighAvailabilityServices}.
	 */
	public enum AddressResolution {
		TRY_ADDRESS_RESOLUTION,
		NO_ADDRESS_RESOLUTION
	}
}

Highavailability services utils provides static methods for creating highavailability services. These methods include create available or embedded services, create highavailability services, and create custom ha services

The method of createavailableorembeddedservices is mainly used for flinkminicluster; the method of createhighavailabilityservices is mainly used for clusterentrypoint. It creates standalonehaservices when highavailabilitymode is none, zookeeperhaservices when highavailabilitymode is zookeeper, and factory when highavailabilitymode is factory_ Class is created by using the createcustomhaservices method

Highavailability services utils also provides a static method getjobmanageraddress to get the host name and port of the jobmanager

HighAvailabilityServices

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/highavailability/High AvailabilityServices.java

/**
 * The HighAvailabilityServices give access to all services needed for a highly-available
 * setup. In particular, the services provide access to highly available storage and
 * registries, as well as distributed counters and leader election.
 * 
 * <ul&>
 *     <li&>ResourceManager leader election and leader retrieval</li&>
 *     <li&>JobManager leader election and leader retrieval</li&>
 *     <li&>Persistence for checkpoint metadata</li&>
 *     <li&>Registering the latest completed checkpoint(s)</li&>
 *     <li&>Persistence for the BLOB store</li&>
 *     <li&>Registry that marks a job's status</li&>
 *     <li&>Naming of RPC endpoints</li&>
 * </ul&>
 */
public interface HighAvailabilityServices extends AutoCloseable {

	// ------------------------------------------------------------------------
	//  Constants
	// ------------------------------------------------------------------------

	/**
	 * This UUID should be used when no proper leader election happens, but a simple
	 * pre-configured leader is used. That is for example the case in non-highly-available
	 * standalone setups.
	 */
	UUID DEFAULT_LEADER_ID = new UUID(0, 0);

	/**
	 * This JobID should be used to identify the old JobManager when using the
	 * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
	 * distinct JobID assigned.
	 */
	JobID DEFAULT_JOB_ID = new JobID(0L, 0L);

	// ------------------------------------------------------------------------
	//  Services
	// ------------------------------------------------------------------------

	/**
	 * Gets the leader retriever for the cluster's resource manager.
	 */
	LeaderRetrievalService getResourceManagerLeaderRetriever();

	/**
	 * Gets the leader retriever for the dispatcher. This leader retrieval service
	 * is not always accessible.
	 */
	LeaderRetrievalService getDispatcherLeaderRetriever();

	/**
	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
	 *
	 * @param jobID The identifier of the job.
	 * @return Leader retrieval service to retrieve the job manager for the given job
	 * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
	 */
	@Deprecated
	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

	/**
	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
	 *
	 * @param jobID The identifier of the job.
	 * @param defaultJobManagerAddress JobManager address which will be returned by
	 *                              a static leader retrieval service.
	 * @return Leader retrieval service to retrieve the job manager for the given job
	 */
	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

	LeaderRetrievalService getWebMonitorLeaderRetriever();

	/**
	 * Gets the leader election service for the cluster's resource manager.
	 *
	 * @return Leader election service for the resource manager leader election
	 */
	LeaderElectionService getResourceManagerLeaderElectionService();

	/**
	 * Gets the leader election service for the cluster's dispatcher.
	 *
	 * @return Leader election service for the dispatcher leader election
	 */
	LeaderElectionService getDispatcherLeaderElectionService();

	/**
	 * Gets the leader election service for the given job.
	 *
	 * @param jobID The identifier of the job running the election.
	 * @return Leader election service for the job manager leader election
	 */
	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

	LeaderElectionService getWebMonitorLeaderElectionService();

	/**
	 * Gets the checkpoint recovery factory for the job manager
	 *
	 * @return Checkpoint recovery factory
	 */
	CheckpointRecoveryFactory getCheckpointRecoveryFactory();

	/**
	 * Gets the submitted job graph store for the job manager
	 *
	 * @return Submitted job graph store
	 * @throws Exception if the submitted job graph store could not be created
	 */
	SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;

	/**
	 * Gets the registry that holds information about whether jobs are currently running.
	 *
	 * @return Running job registry to retrieve running jobs
	 */
	RunningJobsRegistry getRunningJobsRegistry() throws Exception;

	/**
	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
	 *
	 * @return Blob store
	 * @throws IOException if the blob store could not be created
	 */
	BlobStore createBlobStore() throws IOException;

	// ------------------------------------------------------------------------
	//  Shutdown and Cleanup
	// ------------------------------------------------------------------------

	/**
	 * Closes the high availability services, releasing all resources.
	 * 
	 * <p&>This method <b&>does not delete or clean up</b&> any data stored in external stores
	 * (file systems, ZooKeeper, etc). Another instance of the high availability
	 * services will be able to recover the job.
	 * 
	 * <p&>If an exception occurs during closing services, this method will attempt to
	 * continue closing other services and report exceptions only after all services
	 * have been attempted to be closed.
	 *
	 * @throws Exception Thrown, if an exception occurred while closing these services.
	 */
	@Override
	void close() throws Exception;

	/**
	 * Closes the high availability services (releasing all resources) and deletes
	 * all data stored by these services in external stores.
	 * 
	 * <p&>After this method was called, the any job or session that was managed by
	 * these high availability services will be unrecoverable.
	 * 
	 * <p&>If an exception occurs during cleanup, this method will attempt to
	 * continue the cleanup and report exceptions only after all cleanup steps have
	 * been attempted.
	 * 
	 * @throws Exception Thrown, if an exception occurred while closing these services
	 *                   or cleaning up data stored by them.
	 */
	void closeAndCleanupAllData() throws Exception;
}

Highavailability services defines the get methods of various services required by highly available

ZooKeeperHaServices

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/highavailability/zookeeper/ ZooKeeperHaServices.java

/**
 * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
 * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
 * 
 * <pre&>
 * /flink
 *      +/cluster_id_1/resource_manager_lock
 *      |            |
 *      |            +/job-id-1/job_manager_lock
 *      |            |         /checkpoints/latest
 *      |            |                     /latest-1
 *      |            |                     /latest-2
 *      |            |
 *      |            +/job-id-2/job_manager_lock
 *      |      
 *      +/cluster_id_2/resource_manager_lock
 *                   |
 *                   +/job-id-1/job_manager_lock
 *                            |/checkpoints/latest
 *                            |            /latest-1
 *                            |/persisted_job_graph
 * </pre&>
 * 
 * <p&>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
 * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
 * accommodate specific permission.
 * 
 * <p&>The "cluster_id" part identifies the data stored for a specific Flink "cluster". 
 * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
 * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
 * 
 * <p&>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
 * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
 * 
 * <p&>In the case of a standalone cluster, that cluster-id needs to be configured via
 * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
 * cluster and participate in the execution of the same set of jobs.
 */
public class ZooKeeperHaServices implements HighAvailabilityServices {

	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHaServices.class);

	private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";

	private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";

	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";

	private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";

	// ------------------------------------------------------------------------
	
	
	/** The ZooKeeper client to use */
	private final CuratorFramework client;

	/** The executor to run ZooKeeper callbacks on */
	private final Executor executor;

	/** The runtime configuration */
	private final Configuration configuration;

	/** The zookeeper based running jobs registry */
	private final RunningJobsRegistry runningJobsRegistry;

	/** Store for arbitrary blobs */
	private final BlobStoreService blobStoreService;

	public ZooKeeperHaServices(
			CuratorFramework client,
			Executor executor,
			Configuration configuration,
			BlobStoreService blobStoreService) {
		this.client = checkNotNull(client);
		this.executor = checkNotNull(executor);
		this.configuration = checkNotNull(configuration);
		this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);

		this.blobStoreService = checkNotNull(blobStoreService);
	}

	// ------------------------------------------------------------------------
	//  Services
	// ------------------------------------------------------------------------

	@Override
	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
	}

	@Override
	public LeaderRetrievalService getDispatcherLeaderRetriever() {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
	}

	@Override
	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
	}

	@Override
	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
		return getJobManagerLeaderRetriever(jobID);
	}

	@Override
	public LeaderRetrievalService getWebMonitorLeaderRetriever() {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
	}

	@Override
	public LeaderElectionService getResourceManagerLeaderElectionService() {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
	}

	@Override
	public LeaderElectionService getDispatcherLeaderElectionService() {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
	}

	@Override
	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
	}

	@Override
	public LeaderElectionService getWebMonitorLeaderElectionService() {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
	}

	@Override
	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
		return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
	}

	@Override
	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
	}

	@Override
	public RunningJobsRegistry getRunningJobsRegistry() {
		return runningJobsRegistry;
	}

	@Override
	public BlobStore createBlobStore() throws IOException {
		return blobStoreService;
	}

	// ------------------------------------------------------------------------
	//  Shutdown
	// ------------------------------------------------------------------------

	@Override
	public void close() throws Exception {
		Throwable exception = null;

		try {
			blobStoreService.close();
		} catch (Throwable t) {
			exception = t;
		}

		internalClose();

		if (exception != null) {
			ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");
		}
	}

	@Override
	public void closeAndCleanupAllData() throws Exception {
		LOG.info("Close and clean up all data for ZooKeeperHaServices.");

		Throwable exception = null;

		try {
			blobStoreService.closeAndCleanupAllData();
		} catch (Throwable t) {
			exception = t;
		}

		internalClose();

		if (exception != null) {
			ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");
		}
	}

	/**
	 * Closes components which don't distinguish between close and closeAndCleanupAllData
	 */
	private void internalClose() {
		client.close();
	}

	// ------------------------------------------------------------------------
	//  Utilities
	// ------------------------------------------------------------------------

	private static String getPathForJobManager(final JobID jobID) {
		return "/" + jobID + JOB_MANAGER_LEADER_PATH;
	}
}

Zookeeperhaservices implements the high availability services interface, which creates the required services through various create methods of zookeeperutils, such as ZooKeeperUtils.createLeaderRetrievalService 、 ZooKeeperUtils.createLeaderElectionService 、 ZooKeeperUtils.createSubmittedJobGraphs

JobClient.submitJob

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/client/ JobClient.java

public class JobClient {

	private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

	//......

	/**
	 * Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be
	 * passed to {@code awaitJobResult} to get the result of the submission.
	 * @return JobListeningContext which may be used to retrieve the JobExecutionResult via
	 * 			{@code awaitJobResult(JobListeningContext context)}.
	 */
	public static JobListeningContext submitJob(
			ActorSystem actorSystem,
			Configuration config,
			HighAvailabilityServices highAvailabilityServices,
			JobGraph jobGraph,
			FiniteDuration timeout,
			boolean sysoutLogUpdates,
			ClassLoader classLoader) {

		checkNotNull(actorSystem, "The actorSystem must not be null.");
		checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
		checkNotNull(jobGraph, "The jobGraph must not be null.");
		checkNotNull(timeout, "The timeout must not be null.");

		// for this job, we create a proxy JobClientActor that deals with all communication with
		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
		// update messages, watches for disconnect between client and JobManager, ...

		Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
			timeout,
			sysoutLogUpdates,
			config);

		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

		Future<Object&> submissionFuture = Patterns.ask(
				jobClientActor,
				new JobClientMessages.SubmitJobAndWait(jobGraph),
				new Timeout(AkkaUtils.INF_TIMEOUT()));

		return new JobListeningContext(
			jobGraph.getJobID(),
			submissionFuture,
			jobClientActor,
			timeout,
			classLoader,
			highAvailabilityServices);
	}

	//......
}

Like JobClient.submitJob The method is high AvailabilityServices.getJobManagerLeaderRetriever Method to obtain the address of the jobmanagerleader, which is used to submit the job

Summary

Highavailabilitymode has three enumerations: none, zookeeper and factory_ Class; these enumerations have a property haactive to indicate whether highavailability is supported; highavailability options defines the prefix high- availability.zookeeper Configuration item for

Highavailability services utils provides static methods for creating highavailability services. These methods include createavailableorembeddedservices, createhighavailabilityservices, and createcustomhaservices. The createavailableorembeddedservices method is mainly used for flinkminicluster, and the createhighavailabilityservices method is mainly used for clusterentrypoint When the high availability mode is none, standalone has services is created, zookeeper has services is created for zookeeper in the high availability mode, and factory is created in the high availability mode_ Class is created by using the createcustomhaservices method

Highavailability services defines the get methods of all kinds of services required by high availability; zookeeperhaservices implements the interface of highavailability services, which creates the required services through all kinds of create methods of zookeeperutils, such as ZooKeeperUtils.createLeaderRetrievalService 、 ZooKeeperUtils.createLeaderElectionService 、 ZooKeeperUtils.createSubmitte Djobgraphs; image JobClient.submitJob The method is high AvailabilityServices.getJobManagerLeaderRetriever Method to obtain the address of the jobmanagerleader, which is used to submit the job

doc

JobManager High Availability (HA)

Comparison of service registration models: Consult vs zookeeper vs etcd vs Eureka

Zookeeper is based on the simplified version of Zab of Paxos, etcd is based on raft algorithm, and consumer is also based on raft algorithm. Etcd and consumer, as rising stars, did not abandon themselves because they already had zookeeper, but adopted a more direct raft algorithm.

The number one goal of raft algorithm is to be easy to understand, which can be seen from the title of the paper. Of course, raft enhances comprehensibility, which is no less than Paxos in terms of performance, reliability and availability.

Raft more understandable than Paxos and also provides a better foundation for building practical systems

   in order to achieve the goal of being easy to understand, raft has made a lot of efforts, the most important of which are two things:

problem decomposition

State simplification

   problem decomposition is to divide the complex problem of “node consistency in replication set” into several subproblems that can be explained, understood and solved independently. In raft, subproblems include leader selection, log replication, safety and membership changes. The better understanding of state simplification is to make some restrictions on the algorithm, reduce the number of States to be considered, and make the algorithm clearer and less uncertain (for example, to ensure that the newly elected leader will contain all the commented log entries)

Raft implements consensus by first electing a distinguished leader, then giving the leader complete responsibility for managing the replicated log. The leader accepts log entries from clients, replicates them on other servers, and tells servers when it is safe to apply log entries to their state machines. A leader can fail or become disconnected from the other servers, in which case a new leader is elected.

   the above quotation highly summarizes the working principle of raft protocol: raft will select the leader first, and the leader is fully responsible for the management of replicated log. The leader is responsible for accepting all client update requests, copying them to the follower node, and executing them when “safe”. If the leader fails, followers will re elect a new leader.

   this involves two new subproblems of raft: Leader Selection and log replication

leader election

log replication

Here is a comparison of the following features of service discovery products that are often used. First of all, let’s look at the following conclusions:

Health check of service

Euraka needs to explicitly configure health check support when it is used; zookeeper and etcd are not healthy when they lose the connection with the service process, while consult is more detailed, such as whether the memory has been used by 90% and whether the file system is running out of space.

Multi data center support

Consul completes the synchronization across data centers through Wan’s gossip protocol, and other products need additional development work

KV storage service

In addition to Eureka, several other products can support K-V storage services externally, so we will talk about the important reasons why these products pursue high consistency later. And providing storage services can also be better transformed into dynamic configuration services.

The choice of cap theory in product design

Eureka’s typical AP is more suitable for service discovery in distributed scenarios. Service discovery scenarios have higher availability priority and consistency is not particularly fatal. Secondly, CA type scenario consul can also provide high availability and ensure consistency of K-V store service. Zookeeper and etcd are CP types, which sacrifice availability and have little advantage in service discovery scenarios

Multilingual capability and access protocol for external services

Zookeeper’s cross language support is weak, and other models support http11 to provide access. Euraka generally provides access support for multilingual clients through sidecar. Etcd also provides grpc support. In addition to the standard rest Service API, consul also provides DNS support.

Watch support (clients observe changes in service providers)

Zookeeper supports server-side push changes, and Eureka 2.0 (under development) also plans to support it. Eureka 1, consul and etcd all realize change perception through long polling

Monitoring of self cluster

In addition to zookeeper, other models support metrics by default. Operators can collect and alarm these metrics information to achieve the purpose of monitoring

Safety

Consul and zookeeper support ACL, and consul and etcd support secure channel HTTPS

Integration of spring cloud

At present, there are corresponding boot starters, which provide integration capabilities.

In general, the functions of consul and the support of spring cloud for its integration are relatively perfect, and the complexity of operation and maintenance is relatively simple (there is no detailed discussion). The design of Eureka is more in line with the scene, but it needs continuous improvement.

Etcd and zookeeper provide very similar capabilities, and their positions in the software ecosystem are almost the same, and they can replace each other.

They are universal consistent meta information storage

Both provide a watch mechanism for change notification and distribution

They are also used by distributed systems as shared information storage

In addition to the differences in implementation details, language, consistency and protocol, the biggest difference lies in the surrounding ecosystem.

Zookeeper is written in Java under Apache and provides RPC interface. It was first hatched from Hadoop project and widely used in distributed system (Hadoop, Solr, Kafka, mesos, etc.).

Etcd is an open source product of coreos company, which is relatively new. With its easy-to-use rest interface and active community, etcd has captured a group of users and has been used in some new clusters (such as kubernetes).

Although V3 is changed to binary RPC interface for performance, its usability is better than zookeeper.

While the goal of consul is more specific. Etcd and zookeeper provide distributed consistent storage capacity. Specific business scenarios need to be implemented by users themselves, such as service discovery and configuration change.

Consul aims at service discovery and configuration change, with kV storage.

In the software ecology, the more abstract the components are, the wider the scope of application is, but there must be some shortcomings in meeting the requirements of specific business scenarios.

——————-Message middleware rabbitmq

Message middleware rabbitmq (01)

Message middleware rabbitmq (02)
0

Message middleware rabbitmq (03)
0

Message middleware rabbitmq (04)
0

Message middleware rabbitmq (05)

Message middleware rabbitmq (06)
0

Message middleware rabbitmq (07)

———————- cloud computing————————————-

Cloud computing (1) — docker’s past and present life

Cloud computing (2) — Architecture

Cloud computing (3) — container applications

Cloud computing (4) — lamp

Cloud computing (5) — dockerfile
cloud computing (6) — harbor

Add wechat to the wechat communication group of wechat service, and note that wechat service enters the group for communication

pay attention to official account soft Zhang Sanfeng

this article is shared by WeChat official account – soft Zhang Sanfeng (aguzhangsanfeng).
In case of infringement, please contact [email protected] Delete.
This article participates in the “OSC source creation program”. You are welcome to join and share.