Tag Archives: Apache Flink

Talk about the high availability of Flink jobmanager

Preface

This paper mainly studies the high availability of Flink jobmanager

Configuration

flink- conf.yaml

high-availability: zookeeper
high-availability.zookeeper.quorum: zookeeper:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one # important: customize per cluster
high-availability.storageDir: file:///share

The optional values of high availability are none or zookeeper; high- availability.zookeeper.quorum Peers; high used to specify zookeeper- availability.zookeeper.path . root is used to specify the root node path in zookeeper; high- availability.cluster -ID is used to specify the name of the node of the current cluster, which is located at root Under node- availability.storageDir Specifies the storage path of the jobmanager metadata

Masters file

localhost:8081
localhost:8082

The masters file specifies the address of the jobmanager

HighAvailabilityMode

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/jobmanager/ HighAvailabilityMode.java

public enum HighAvailabilityMode {
	NONE(false),
	ZOOKEEPER(true),
	FACTORY_CLASS(true);

	private final boolean haActive;

	HighAvailabilityMode(boolean haActive) {
		this.haActive = haActive;
	}

	/**
	 * Return the configured {@link HighAvailabilityMode}.
	 *
	 * @param config The config to parse
	 * @return Configured recovery mode or {@link HighAvailabilityMode#NONE} if not
	 * configured.
	 */
	public static HighAvailabilityMode fromConfig(Configuration config) {
		String haMode = config.getValue(HighAvailabilityOptions.HA_MODE);

		if (haMode == null) {
			return HighAvailabilityMode.NONE;
		} else if (haMode.equalsIgnoreCase(ConfigConstants.DEFAULT_RECOVERY_MODE)) {
			// Map old default to new default
			return HighAvailabilityMode.NONE;
		} else {
			try {
				return HighAvailabilityMode.valueOf(haMode.toUpperCase());
			} catch (IllegalArgumentException e) {
				return FACTORY_CLASS;
			}
		}
	}

	/**
	 * Returns true if the defined recovery mode supports high availability.
	 *
	 * @param configuration Configuration which contains the recovery mode
	 * @return true if high availability is supported by the recovery mode, otherwise false
	 */
	public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
		HighAvailabilityMode mode = fromConfig(configuration);
		return mode.haActive;
	}
}

Highavailabilitymode has three enumerations: none, zookeeper and factory_ Class; these enumerations have a property haactive to indicate whether highavailability is supported

HighAvailabilityOptions

flink-core-1.7.1- sources.jar !/org/apache/flink/configuration/Hig hAvailabilityOptions.java

@PublicEvolving
@ConfigGroups(groups = {
	@ConfigGroup(name = "HighAvailabilityZookeeper", keyPrefix = "high-availability.zookeeper")
})
public class HighAvailabilityOptions {

	// ------------------------------------------------------------------------
	//  Required High Availability Options
	// ------------------------------------------------------------------------

	/**
	 * Defines high-availability mode used for the cluster execution.
	 * A value of "NONE" signals no highly available setup.
	 * To enable high-availability, set this mode to "ZOOKEEPER".
	 * Can also be set to FQN of HighAvailability factory class.
	 */
	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
	public static final ConfigOption<String&> HA_MODE =
			key("high-availability")
			.defaultValue("NONE")
			.withDeprecatedKeys("recovery.mode")
			.withDescription("Defines high-availability mode used for the cluster execution." +
				" To enable high-availability, set this mode to \"ZOOKEEPER\" or specify FQN of factory class.");

	/**
	 * The ID of the Flink cluster, used to separate multiple Flink clusters
	 * Needs to be set for standalone clusters, is automatically inferred in YARN and Mesos.
	 */
	public static final ConfigOption<String&> HA_CLUSTER_ID =
			key("high-availability.cluster-id")
			.defaultValue("/default")
			.withDeprecatedKeys("high-availability.zookeeper.path.namespace", "recovery.zookeeper.path.namespace")
			.withDescription("The ID of the Flink cluster, used to separate multiple Flink clusters from each other." +
				" Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.");

	/**
	 * File system path (URI) where Flink persists metadata in high-availability setups.
	 */
	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_HIGH_AVAILABILITY)
	public static final ConfigOption<String&> HA_STORAGE_PATH =
			key("high-availability.storageDir")
			.noDefaultValue()
			.withDeprecatedKeys("high-availability.zookeeper.storageDir", "recovery.zookeeper.storageDir")
			.withDescription("File system path (URI) where Flink persists metadata in high-availability setups.");

	// ------------------------------------------------------------------------
	//  Recovery Options
	// ------------------------------------------------------------------------

	/**
	 * Optional port (range) used by the job manager in high-availability mode.
	 */
	public static final ConfigOption<String&> HA_JOB_MANAGER_PORT_RANGE =
			key("high-availability.jobmanager.port")
			.defaultValue("0")
			.withDeprecatedKeys("recovery.jobmanager.port")
			.withDescription("Optional port (range) used by the job manager in high-availability mode.");

	/**
	 * The time before a JobManager after a fail over recovers the current jobs.
	 */
	public static final ConfigOption<String&> HA_JOB_DELAY =
			key("high-availability.job.delay")
			.noDefaultValue()
			.withDeprecatedKeys("recovery.job.delay")
			.withDescription("The time before a JobManager after a fail over recovers the current jobs.");

	// ------------------------------------------------------------------------
	//  ZooKeeper Options
	// ------------------------------------------------------------------------

	/**
	 * The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.
	 */
	public static final ConfigOption<String&> HA_ZOOKEEPER_QUORUM =
			key("high-availability.zookeeper.quorum")
			.noDefaultValue()
			.withDeprecatedKeys("recovery.zookeeper.quorum")
			.withDescription("The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.");

	/**
	 * The root path under which Flink stores its entries in ZooKeeper.
	 */
	public static final ConfigOption<String&> HA_ZOOKEEPER_ROOT =
			key("high-availability.zookeeper.path.root")
			.defaultValue("/flink")
			.withDeprecatedKeys("recovery.zookeeper.path.root")
			.withDescription("The root path under which Flink stores its entries in ZooKeeper.");

	public static final ConfigOption<String&> HA_ZOOKEEPER_LATCH_PATH =
			key("high-availability.zookeeper.path.latch")
			.defaultValue("/leaderlatch")
			.withDeprecatedKeys("recovery.zookeeper.path.latch")
			.withDescription("Defines the znode of the leader latch which is used to elect the leader.");

	/** ZooKeeper root path (ZNode) for job graphs. */
	public static final ConfigOption<String&> HA_ZOOKEEPER_JOBGRAPHS_PATH =
			key("high-availability.zookeeper.path.jobgraphs")
			.defaultValue("/jobgraphs")
			.withDeprecatedKeys("recovery.zookeeper.path.jobgraphs")
			.withDescription("ZooKeeper root path (ZNode) for job graphs");

	public static final ConfigOption<String&> HA_ZOOKEEPER_LEADER_PATH =
			key("high-availability.zookeeper.path.leader")
			.defaultValue("/leader")
			.withDeprecatedKeys("recovery.zookeeper.path.leader")
			.withDescription("Defines the znode of the leader which contains the URL to the leader and the current" +
				" leader session ID.");

	/** ZooKeeper root path (ZNode) for completed checkpoints. */
	public static final ConfigOption<String&> HA_ZOOKEEPER_CHECKPOINTS_PATH =
			key("high-availability.zookeeper.path.checkpoints")
			.defaultValue("/checkpoints")
			.withDeprecatedKeys("recovery.zookeeper.path.checkpoints")
			.withDescription("ZooKeeper root path (ZNode) for completed checkpoints.");

	/** ZooKeeper root path (ZNode) for checkpoint counters. */
	public static final ConfigOption<String&> HA_ZOOKEEPER_CHECKPOINT_COUNTER_PATH =
			key("high-availability.zookeeper.path.checkpoint-counter")
			.defaultValue("/checkpoint-counter")
			.withDeprecatedKeys("recovery.zookeeper.path.checkpoint-counter")
			.withDescription("ZooKeeper root path (ZNode) for checkpoint counters.");

	/** ZooKeeper root path (ZNode) for Mesos workers. */
	@PublicEvolving
	public static final ConfigOption<String&> HA_ZOOKEEPER_MESOS_WORKERS_PATH =
			key("high-availability.zookeeper.path.mesos-workers")
			.defaultValue("/mesos-workers")
			.withDeprecatedKeys("recovery.zookeeper.path.mesos-workers")
			.withDescription(Description.builder()
				.text("The ZooKeeper root path for persisting the Mesos worker information.")
				.build());

	// ------------------------------------------------------------------------
	//  ZooKeeper Client Settings
	// ------------------------------------------------------------------------

	public static final ConfigOption<Integer&> ZOOKEEPER_SESSION_TIMEOUT =
			key("high-availability.zookeeper.client.session-timeout")
			.defaultValue(60000)
			.withDeprecatedKeys("recovery.zookeeper.client.session-timeout")
			.withDescription("Defines the session timeout for the ZooKeeper session in ms.");

	public static final ConfigOption<Integer&> ZOOKEEPER_CONNECTION_TIMEOUT =
			key("high-availability.zookeeper.client.connection-timeout")
			.defaultValue(15000)
			.withDeprecatedKeys("recovery.zookeeper.client.connection-timeout")
			.withDescription("Defines the connection timeout for ZooKeeper in ms.");

	public static final ConfigOption<Integer&> ZOOKEEPER_RETRY_WAIT =
			key("high-availability.zookeeper.client.retry-wait")
			.defaultValue(5000)
			.withDeprecatedKeys("recovery.zookeeper.client.retry-wait")
			.withDescription("Defines the pause between consecutive retries in ms.");

	public static final ConfigOption<Integer&> ZOOKEEPER_MAX_RETRY_ATTEMPTS =
			key("high-availability.zookeeper.client.max-retry-attempts")
			.defaultValue(3)
			.withDeprecatedKeys("recovery.zookeeper.client.max-retry-attempts")
			.withDescription("Defines the number of connection retries before the client gives up.");

	public static final ConfigOption<String&> ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH =
			key("high-availability.zookeeper.path.running-registry")
			.defaultValue("/running_job_registry/");

	public static final ConfigOption<String&> ZOOKEEPER_CLIENT_ACL =
			key("high-availability.zookeeper.client.acl")
			.defaultValue("open")
			.withDescription("Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be" +
				" set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use" +
				" SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).");

	// ------------------------------------------------------------------------

	/** Not intended to be instantiated. */
	private HighAvailabilityOptions() {}
}

High availability options defines the prefix high- availability.zookeeper Configuration item for

HighAvailabilityServicesUtils

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/highavailability/HighAvail abilityServicesUtils.java

public class HighAvailabilityServicesUtils {

	public static HighAvailabilityServices createAvailableOrEmbeddedServices(
		Configuration config,
		Executor executor) throws Exception {
		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(config);

		switch (highAvailabilityMode) {
			case NONE:
				return new EmbeddedHaServices(executor);

			case ZOOKEEPER:
				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

				return new ZooKeeperHaServices(
					ZooKeeperUtils.startCuratorFramework(config),
					executor,
					config,
					blobStoreService);

			case FACTORY_CLASS:
				return createCustomHAServices(config, executor);

			default:
				throw new Exception("High availability mode " + highAvailabilityMode + " is not supported.");
		}
	}

	public static HighAvailabilityServices createHighAvailabilityServices(
		Configuration configuration,
		Executor executor,
		AddressResolution addressResolution) throws Exception {

		HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);

		switch (highAvailabilityMode) {
			case NONE:
				final Tuple2<String, Integer&> hostnamePort = getJobManagerAddress(configuration);

				final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
					hostnamePort.f0,
					hostnamePort.f1,
					JobMaster.JOB_MANAGER_NAME,
					addressResolution,
					configuration);
				final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
					hostnamePort.f0,
					hostnamePort.f1,
					ResourceManager.RESOURCE_MANAGER_NAME,
					addressResolution,
					configuration);
				final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(
					hostnamePort.f0,
					hostnamePort.f1,
					Dispatcher.DISPATCHER_NAME,
					addressResolution,
					configuration);

				final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),
					"%s must be set",
					RestOptions.ADDRESS.key());
				final int port = configuration.getInteger(RestOptions.PORT);
				final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);
				final String protocol = enableSSL ?"https://" : "http://";

				return new StandaloneHaServices(
					resourceManagerRpcUrl,
					dispatcherRpcUrl,
					jobManagerRpcUrl,
					String.format("%s%s:%s", protocol, address, port));
			case ZOOKEEPER:
				BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);

				return new ZooKeeperHaServices(
					ZooKeeperUtils.startCuratorFramework(configuration),
					executor,
					configuration,
					blobStoreService);

			case FACTORY_CLASS:
				return createCustomHAServices(configuration, executor);

			default:
				throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
		}
	}

	/**
	 * Returns the JobManager's hostname and port extracted from the given
	 * {@link Configuration}.
	 *
	 * @param configuration Configuration to extract the JobManager's address from
	 * @return The JobManager's hostname and port
	 * @throws ConfigurationException if the JobManager's address cannot be extracted from the configuration
	 */
	public static Tuple2<String, Integer&> getJobManagerAddress(Configuration configuration) throws ConfigurationException {

		final String hostname = configuration.getString(JobManagerOptions.ADDRESS);
		final int port = configuration.getInteger(JobManagerOptions.PORT);

		if (hostname == null) {
			throw new ConfigurationException("Config parameter '" + JobManagerOptions.ADDRESS +
				"' is missing (hostname/address of JobManager to connect to).");
		}

		if (port <= 0 || port &>= 65536) {
			throw new ConfigurationException("Invalid value for '" + JobManagerOptions.PORT +
				"' (port of the JobManager actor system) : " + port +
				".  it must be greater than 0 and less than 65536.");
		}

		return Tuple2.of(hostname, port);
	}

	private static HighAvailabilityServices createCustomHAServices(Configuration config, Executor executor) throws FlinkException {
		final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
		final String haServicesClassName = config.getString(HighAvailabilityOptions.HA_MODE);

		final HighAvailabilityServicesFactory highAvailabilityServicesFactory;

		try {
			highAvailabilityServicesFactory = InstantiationUtil.instantiate(
				haServicesClassName,
				HighAvailabilityServicesFactory.class,
				classLoader);
		} catch (Exception e) {
			throw new FlinkException(
				String.format(
					"Could not instantiate the HighAvailabilityServicesFactory '%s'. Please make sure that this class is on your class path.",
					haServicesClassName),
				e);
		}

		try {
			return highAvailabilityServicesFactory.createHAServices(config, executor);
		} catch (Exception e) {
			throw new FlinkException(
				String.format(
					"Could not create the ha services from the instantiated HighAvailabilityServicesFactory %s.",
					haServicesClassName),
				e);
		}
	}

	/**
	 * Enum specifying whether address resolution should be tried or not when creating the
	 * {@link HighAvailabilityServices}.
	 */
	public enum AddressResolution {
		TRY_ADDRESS_RESOLUTION,
		NO_ADDRESS_RESOLUTION
	}
}

Highavailability services utils provides static methods for creating highavailability services. These methods include create available or embedded services, create highavailability services, and create custom ha services

The method of createavailableorembeddedservices is mainly used for flinkminicluster; the method of createhighavailabilityservices is mainly used for clusterentrypoint. It creates standalonehaservices when highavailabilitymode is none, zookeeperhaservices when highavailabilitymode is zookeeper, and factory when highavailabilitymode is factory_ Class is created by using the createcustomhaservices method

Highavailability services utils also provides a static method getjobmanageraddress to get the host name and port of the jobmanager

HighAvailabilityServices

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/highavailability/High AvailabilityServices.java

/**
 * The HighAvailabilityServices give access to all services needed for a highly-available
 * setup. In particular, the services provide access to highly available storage and
 * registries, as well as distributed counters and leader election.
 * 
 * <ul&>
 *     <li&>ResourceManager leader election and leader retrieval</li&>
 *     <li&>JobManager leader election and leader retrieval</li&>
 *     <li&>Persistence for checkpoint metadata</li&>
 *     <li&>Registering the latest completed checkpoint(s)</li&>
 *     <li&>Persistence for the BLOB store</li&>
 *     <li&>Registry that marks a job's status</li&>
 *     <li&>Naming of RPC endpoints</li&>
 * </ul&>
 */
public interface HighAvailabilityServices extends AutoCloseable {

	// ------------------------------------------------------------------------
	//  Constants
	// ------------------------------------------------------------------------

	/**
	 * This UUID should be used when no proper leader election happens, but a simple
	 * pre-configured leader is used. That is for example the case in non-highly-available
	 * standalone setups.
	 */
	UUID DEFAULT_LEADER_ID = new UUID(0, 0);

	/**
	 * This JobID should be used to identify the old JobManager when using the
	 * {@link HighAvailabilityServices}. With the new mode every JobMaster will have a
	 * distinct JobID assigned.
	 */
	JobID DEFAULT_JOB_ID = new JobID(0L, 0L);

	// ------------------------------------------------------------------------
	//  Services
	// ------------------------------------------------------------------------

	/**
	 * Gets the leader retriever for the cluster's resource manager.
	 */
	LeaderRetrievalService getResourceManagerLeaderRetriever();

	/**
	 * Gets the leader retriever for the dispatcher. This leader retrieval service
	 * is not always accessible.
	 */
	LeaderRetrievalService getDispatcherLeaderRetriever();

	/**
	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
	 *
	 * @param jobID The identifier of the job.
	 * @return Leader retrieval service to retrieve the job manager for the given job
	 * @deprecated This method should only be used by the legacy code where the JobManager acts as the master.
	 */
	@Deprecated
	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID);

	/**
	 * Gets the leader retriever for the job JobMaster which is responsible for the given job
	 *
	 * @param jobID The identifier of the job.
	 * @param defaultJobManagerAddress JobManager address which will be returned by
	 *                              a static leader retrieval service.
	 * @return Leader retrieval service to retrieve the job manager for the given job
	 */
	LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress);

	LeaderRetrievalService getWebMonitorLeaderRetriever();

	/**
	 * Gets the leader election service for the cluster's resource manager.
	 *
	 * @return Leader election service for the resource manager leader election
	 */
	LeaderElectionService getResourceManagerLeaderElectionService();

	/**
	 * Gets the leader election service for the cluster's dispatcher.
	 *
	 * @return Leader election service for the dispatcher leader election
	 */
	LeaderElectionService getDispatcherLeaderElectionService();

	/**
	 * Gets the leader election service for the given job.
	 *
	 * @param jobID The identifier of the job running the election.
	 * @return Leader election service for the job manager leader election
	 */
	LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);

	LeaderElectionService getWebMonitorLeaderElectionService();

	/**
	 * Gets the checkpoint recovery factory for the job manager
	 *
	 * @return Checkpoint recovery factory
	 */
	CheckpointRecoveryFactory getCheckpointRecoveryFactory();

	/**
	 * Gets the submitted job graph store for the job manager
	 *
	 * @return Submitted job graph store
	 * @throws Exception if the submitted job graph store could not be created
	 */
	SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;

	/**
	 * Gets the registry that holds information about whether jobs are currently running.
	 *
	 * @return Running job registry to retrieve running jobs
	 */
	RunningJobsRegistry getRunningJobsRegistry() throws Exception;

	/**
	 * Creates the BLOB store in which BLOBs are stored in a highly-available fashion.
	 *
	 * @return Blob store
	 * @throws IOException if the blob store could not be created
	 */
	BlobStore createBlobStore() throws IOException;

	// ------------------------------------------------------------------------
	//  Shutdown and Cleanup
	// ------------------------------------------------------------------------

	/**
	 * Closes the high availability services, releasing all resources.
	 * 
	 * <p&>This method <b&>does not delete or clean up</b&> any data stored in external stores
	 * (file systems, ZooKeeper, etc). Another instance of the high availability
	 * services will be able to recover the job.
	 * 
	 * <p&>If an exception occurs during closing services, this method will attempt to
	 * continue closing other services and report exceptions only after all services
	 * have been attempted to be closed.
	 *
	 * @throws Exception Thrown, if an exception occurred while closing these services.
	 */
	@Override
	void close() throws Exception;

	/**
	 * Closes the high availability services (releasing all resources) and deletes
	 * all data stored by these services in external stores.
	 * 
	 * <p&>After this method was called, the any job or session that was managed by
	 * these high availability services will be unrecoverable.
	 * 
	 * <p&>If an exception occurs during cleanup, this method will attempt to
	 * continue the cleanup and report exceptions only after all cleanup steps have
	 * been attempted.
	 * 
	 * @throws Exception Thrown, if an exception occurred while closing these services
	 *                   or cleaning up data stored by them.
	 */
	void closeAndCleanupAllData() throws Exception;
}

Highavailability services defines the get methods of various services required by highly available

ZooKeeperHaServices

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/highavailability/zookeeper/ ZooKeeperHaServices.java

/**
 * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
 * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
 * 
 * <pre&>
 * /flink
 *      +/cluster_id_1/resource_manager_lock
 *      |            |
 *      |            +/job-id-1/job_manager_lock
 *      |            |         /checkpoints/latest
 *      |            |                     /latest-1
 *      |            |                     /latest-2
 *      |            |
 *      |            +/job-id-2/job_manager_lock
 *      |      
 *      +/cluster_id_2/resource_manager_lock
 *                   |
 *                   +/job-id-1/job_manager_lock
 *                            |/checkpoints/latest
 *                            |            /latest-1
 *                            |/persisted_job_graph
 * </pre&>
 * 
 * <p&>The root path "/flink" is configurable via the option {@link HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
 * This makes sure Flink stores its data under specific subtrees in ZooKeeper, for example to
 * accommodate specific permission.
 * 
 * <p&>The "cluster_id" part identifies the data stored for a specific Flink "cluster". 
 * This "cluster" can be either a standalone or containerized Flink cluster, or it can be job
 * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
 * 
 * <p&>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is generated and configured
 * automatically by the client or dispatcher that submits the Job to YARN or Mesos.
 * 
 * <p&>In the case of a standalone cluster, that cluster-id needs to be configured via
 * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same cluster id will join the same
 * cluster and participate in the execution of the same set of jobs.
 */
public class ZooKeeperHaServices implements HighAvailabilityServices {

	private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperHaServices.class);

	private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";

	private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";

	private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";

	private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";

	// ------------------------------------------------------------------------
	
	
	/** The ZooKeeper client to use */
	private final CuratorFramework client;

	/** The executor to run ZooKeeper callbacks on */
	private final Executor executor;

	/** The runtime configuration */
	private final Configuration configuration;

	/** The zookeeper based running jobs registry */
	private final RunningJobsRegistry runningJobsRegistry;

	/** Store for arbitrary blobs */
	private final BlobStoreService blobStoreService;

	public ZooKeeperHaServices(
			CuratorFramework client,
			Executor executor,
			Configuration configuration,
			BlobStoreService blobStoreService) {
		this.client = checkNotNull(client);
		this.executor = checkNotNull(executor);
		this.configuration = checkNotNull(configuration);
		this.runningJobsRegistry = new ZooKeeperRunningJobsRegistry(client, configuration);

		this.blobStoreService = checkNotNull(blobStoreService);
	}

	// ------------------------------------------------------------------------
	//  Services
	// ------------------------------------------------------------------------

	@Override
	public LeaderRetrievalService getResourceManagerLeaderRetriever() {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
	}

	@Override
	public LeaderRetrievalService getDispatcherLeaderRetriever() {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, DISPATCHER_LEADER_PATH);
	}

	@Override
	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, getPathForJobManager(jobID));
	}

	@Override
	public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String defaultJobManagerAddress) {
		return getJobManagerLeaderRetriever(jobID);
	}

	@Override
	public LeaderRetrievalService getWebMonitorLeaderRetriever() {
		return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, REST_SERVER_LEADER_PATH);
	}

	@Override
	public LeaderElectionService getResourceManagerLeaderElectionService() {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
	}

	@Override
	public LeaderElectionService getDispatcherLeaderElectionService() {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, DISPATCHER_LEADER_PATH);
	}

	@Override
	public LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, getPathForJobManager(jobID));
	}

	@Override
	public LeaderElectionService getWebMonitorLeaderElectionService() {
		return ZooKeeperUtils.createLeaderElectionService(client, configuration, REST_SERVER_LEADER_PATH);
	}

	@Override
	public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
		return new ZooKeeperCheckpointRecoveryFactory(client, configuration, executor);
	}

	@Override
	public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception {
		return ZooKeeperUtils.createSubmittedJobGraphs(client, configuration);
	}

	@Override
	public RunningJobsRegistry getRunningJobsRegistry() {
		return runningJobsRegistry;
	}

	@Override
	public BlobStore createBlobStore() throws IOException {
		return blobStoreService;
	}

	// ------------------------------------------------------------------------
	//  Shutdown
	// ------------------------------------------------------------------------

	@Override
	public void close() throws Exception {
		Throwable exception = null;

		try {
			blobStoreService.close();
		} catch (Throwable t) {
			exception = t;
		}

		internalClose();

		if (exception != null) {
			ExceptionUtils.rethrowException(exception, "Could not properly close the ZooKeeperHaServices.");
		}
	}

	@Override
	public void closeAndCleanupAllData() throws Exception {
		LOG.info("Close and clean up all data for ZooKeeperHaServices.");

		Throwable exception = null;

		try {
			blobStoreService.closeAndCleanupAllData();
		} catch (Throwable t) {
			exception = t;
		}

		internalClose();

		if (exception != null) {
			ExceptionUtils.rethrowException(exception, "Could not properly close and clean up all data of ZooKeeperHaServices.");
		}
	}

	/**
	 * Closes components which don't distinguish between close and closeAndCleanupAllData
	 */
	private void internalClose() {
		client.close();
	}

	// ------------------------------------------------------------------------
	//  Utilities
	// ------------------------------------------------------------------------

	private static String getPathForJobManager(final JobID jobID) {
		return "/" + jobID + JOB_MANAGER_LEADER_PATH;
	}
}

Zookeeperhaservices implements the high availability services interface, which creates the required services through various create methods of zookeeperutils, such as ZooKeeperUtils.createLeaderRetrievalService 、 ZooKeeperUtils.createLeaderElectionService 、 ZooKeeperUtils.createSubmittedJobGraphs

JobClient.submitJob

flink-runtime_ 2.11-1.7.1- sources.jar !/org/apache/flink/runtime/client/ JobClient.java

public class JobClient {

	private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

	//......

	/**
	 * Submits a job to a Flink cluster (non-blocking) and returns a JobListeningContext which can be
	 * passed to {@code awaitJobResult} to get the result of the submission.
	 * @return JobListeningContext which may be used to retrieve the JobExecutionResult via
	 * 			{@code awaitJobResult(JobListeningContext context)}.
	 */
	public static JobListeningContext submitJob(
			ActorSystem actorSystem,
			Configuration config,
			HighAvailabilityServices highAvailabilityServices,
			JobGraph jobGraph,
			FiniteDuration timeout,
			boolean sysoutLogUpdates,
			ClassLoader classLoader) {

		checkNotNull(actorSystem, "The actorSystem must not be null.");
		checkNotNull(highAvailabilityServices, "The high availability services must not be null.");
		checkNotNull(jobGraph, "The jobGraph must not be null.");
		checkNotNull(timeout, "The timeout must not be null.");

		// for this job, we create a proxy JobClientActor that deals with all communication with
		// the JobManager. It forwards the job submission, checks the success/failure responses, logs
		// update messages, watches for disconnect between client and JobManager, ...

		Props jobClientActorProps = JobSubmissionClientActor.createActorProps(
			highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
			timeout,
			sysoutLogUpdates,
			config);

		ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);

		Future<Object&> submissionFuture = Patterns.ask(
				jobClientActor,
				new JobClientMessages.SubmitJobAndWait(jobGraph),
				new Timeout(AkkaUtils.INF_TIMEOUT()));

		return new JobListeningContext(
			jobGraph.getJobID(),
			submissionFuture,
			jobClientActor,
			timeout,
			classLoader,
			highAvailabilityServices);
	}

	//......
}

Like JobClient.submitJob The method is high AvailabilityServices.getJobManagerLeaderRetriever Method to obtain the address of the jobmanagerleader, which is used to submit the job

Summary

Highavailabilitymode has three enumerations: none, zookeeper and factory_ Class; these enumerations have a property haactive to indicate whether highavailability is supported; highavailability options defines the prefix high- availability.zookeeper Configuration item for

Highavailability services utils provides static methods for creating highavailability services. These methods include createavailableorembeddedservices, createhighavailabilityservices, and createcustomhaservices. The createavailableorembeddedservices method is mainly used for flinkminicluster, and the createhighavailabilityservices method is mainly used for clusterentrypoint When the high availability mode is none, standalone has services is created, zookeeper has services is created for zookeeper in the high availability mode, and factory is created in the high availability mode_ Class is created by using the createcustomhaservices method

Highavailability services defines the get methods of all kinds of services required by high availability; zookeeperhaservices implements the interface of highavailability services, which creates the required services through all kinds of create methods of zookeeperutils, such as ZooKeeperUtils.createLeaderRetrievalService 、 ZooKeeperUtils.createLeaderElectionService 、 ZooKeeperUtils.createSubmitte Djobgraphs; image JobClient.submitJob The method is high AvailabilityServices.getJobManagerLeaderRetriever Method to obtain the address of the jobmanagerleader, which is used to submit the job

doc

JobManager High Availability (HA)