博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink JobManager的heap大小设置
阅读量:6228 次
发布时间:2019-06-21

本文共 21418 字,大约阅读时间需要 71 分钟。

本文主要研究一下flink JobManager的heap大小设置

JobManagerOptions

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/JobManagerOptions.java

@PublicEvolvingpublic class JobManagerOptions {	//......	/**	 * JVM heap size for the JobManager with memory size.	 */	@Documentation.CommonOption(position = Documentation.CommonOption.POSITION_MEMORY)	public static final ConfigOption
JOB_MANAGER_HEAP_MEMORY = key("jobmanager.heap.size") .defaultValue("1024m") .withDescription("JVM heap size for the JobManager."); /** * JVM heap size (in megabytes) for the JobManager. * @deprecated use {@link #JOB_MANAGER_HEAP_MEMORY} */ @Deprecated public static final ConfigOption
JOB_MANAGER_HEAP_MEMORY_MB = key("jobmanager.heap.mb") .defaultValue(1024) .withDescription("JVM heap size (in megabytes) for the JobManager."); //......}复制代码
  • jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃

ConfigurationUtils

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/ConfigurationUtils.java

public class ConfigurationUtils {	private static final String[] EMPTY = new String[0];	/**	 * Get job manager's heap memory. This method will check the new key	 * {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and	 * the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.	 *	 * @param configuration the configuration object	 * @return the memory size of job manager's heap memory.	 */	public static MemorySize getJobManagerHeapMemory(Configuration configuration) {		if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {			return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));		} else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {			return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");		} else {			//use default value			return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));		}	}	//......}复制代码
  • ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize

MemorySize

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java

@PublicEvolvingpublic class MemorySize implements java.io.Serializable {	private static final long serialVersionUID = 1L;	// ------------------------------------------------------------------------	/** The memory size, in bytes. */	private final long bytes;	/**	 * Constructs a new MemorySize.	 *	 * @param bytes The size, in bytes. Must be zero or larger.	 */	public MemorySize(long bytes) {		checkArgument(bytes >= 0, "bytes must be >= 0");		this.bytes = bytes;	}	// ------------------------------------------------------------------------	/**	 * Gets the memory size in bytes.	 */	public long getBytes() {		return bytes;	}	/**	 * Gets the memory size in Kibibytes (= 1024 bytes).	 */	public long getKibiBytes() {		return bytes >> 10;	}	/**	 * Gets the memory size in Mebibytes (= 1024 Kibibytes).	 */	public int getMebiBytes() {		return (int) (bytes >> 20);	}	/**	 * Gets the memory size in Gibibytes (= 1024 Mebibytes).	 */	public long getGibiBytes() {		return bytes >> 30;	}	/**	 * Gets the memory size in Tebibytes (= 1024 Gibibytes).	 */	public long getTebiBytes() {		return bytes >> 40;	}	// ------------------------------------------------------------------------	@Override	public int hashCode() {		return (int) (bytes ^ (bytes >>> 32));	}	@Override	public boolean equals(Object obj) {		return obj == this ||				(obj != null && obj.getClass() == this.getClass() && ((MemorySize) obj).bytes == this.bytes);	}	@Override	public String toString() {		return bytes + " bytes";	}	// ------------------------------------------------------------------------	//  Parsing	// ------------------------------------------------------------------------	/**	 * Parses the given string as as MemorySize.	 *	 * @param text The string to parse	 * @return The parsed MemorySize	 *	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.	 */	public static MemorySize parse(String text) throws IllegalArgumentException {		return new MemorySize(parseBytes(text));	}	/**	 * Parses the given string with a default unit.	 *	 * @param text The string to parse.	 * @param defaultUnit specify the default unit.	 * @return The parsed MemorySize.	 *	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.	 */	public static MemorySize parse(String text, MemoryUnit defaultUnit) throws IllegalArgumentException {		if (!hasUnit(text)) {			return parse(text + defaultUnit.getUnits()[0]);		}		return parse(text);	}	/**	 * Parses the given string as bytes.	 * The supported expressions are listed under {@link MemorySize}.	 *	 * @param text The string to parse	 * @return The parsed size, in bytes.	 *	 * @throws IllegalArgumentException Thrown, if the expression cannot be parsed.	 */	public static long parseBytes(String text) throws IllegalArgumentException {		checkNotNull(text, "text");		final String trimmed = text.trim();		checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string");		final int len = trimmed.length();		int pos = 0;		char current;		while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') {			pos++;		}		final String number = trimmed.substring(0, pos);		final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US);		if (number.isEmpty()) {			throw new NumberFormatException("text does not start with a number");		}		final long value;		try {			value = Long.parseLong(number); // this throws a NumberFormatException on overflow		}		catch (NumberFormatException e) {			throw new IllegalArgumentException("The value '" + number +					"' cannot be re represented as 64bit number (numeric overflow).");		}		final long multiplier;		if (unit.isEmpty()) {			multiplier = 1L;		}		else {			if (matchesAny(unit, BYTES)) {				multiplier = 1L;			}			else if (matchesAny(unit, KILO_BYTES)) {				multiplier = 1024L;			}			else if (matchesAny(unit, MEGA_BYTES)) {				multiplier = 1024L * 1024L;			}			else if (matchesAny(unit, GIGA_BYTES)) {				multiplier = 1024L * 1024L * 1024L;			}			else if (matchesAny(unit, TERA_BYTES)) {				multiplier = 1024L * 1024L * 1024L * 1024L;			}			else {				throw new IllegalArgumentException("Memory size unit '" + unit +						"' does not match any of the recognized units: " + MemoryUnit.getAllUnits());			}		}		final long result = value * multiplier;		// check for overflow		if (result / multiplier != value) {			throw new IllegalArgumentException("The value '" + text +					"' cannot be re represented as 64bit number of bytes (numeric overflow).");		}		return result;	}	private static boolean matchesAny(String str, MemoryUnit unit) {		for (String s : unit.getUnits()) {			if (s.equals(str)) {				return true;			}		}		return false;	}	//......}复制代码
  • MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法

MemoryUnit

flink-core-1.7.1-sources.jar!/org/apache/flink/configuration/MemorySize.java

/**	 *  Enum which defines memory unit, mostly used to parse value from configuration file.	 *	 * 

To make larger values more compact, the common size suffixes are supported: * *

    *
  • q or 1b or 1bytes (bytes) *
  • 1k or 1kb or 1kibibytes (interpreted as kibibytes = 1024 bytes) *
  • 1m or 1mb or 1mebibytes (interpreted as mebibytes = 1024 kibibytes) *
  • 1g or 1gb or 1gibibytes (interpreted as gibibytes = 1024 mebibytes) *
  • 1t or 1tb or 1tebibytes (interpreted as tebibytes = 1024 gibibytes) *
* */ public enum MemoryUnit { BYTES(new String[] { "b", "bytes" }), KILO_BYTES(new String[] { "k", "kb", "kibibytes" }), MEGA_BYTES(new String[] { "m", "mb", "mebibytes" }), GIGA_BYTES(new String[] { "g", "gb", "gibibytes" }), TERA_BYTES(new String[] { "t", "tb", "tebibytes" }); private String[] units; MemoryUnit(String[] units) { this.units = units; } public String[] getUnits() { return units; } public static String getAllUnits() { return concatenateUnits(BYTES.getUnits(), KILO_BYTES.getUnits(), MEGA_BYTES.getUnits(), GIGA_BYTES.getUnits(), TERA_BYTES.getUnits()); } public static boolean hasUnit(String text) { checkNotNull(text, "text"); final String trimmed = text.trim(); checkArgument(!trimmed.isEmpty(), "argument is an empty- or whitespace-only string"); final int len = trimmed.length(); int pos = 0; char current; while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { pos++; } final String unit = trimmed.substring(pos).trim().toLowerCase(Locale.US); return unit.length() > 0; } private static String concatenateUnits(final String[]... allUnits) { final StringBuilder builder = new StringBuilder(128); for (String[] units : allUnits) { builder.append('('); for (String unit : units) { builder.append(unit); builder.append(" | "); } builder.setLength(builder.length() - 3); builder.append(") / "); } builder.setLength(builder.length() - 3); return builder.toString(); } }复制代码
  • MemoryUnit枚举定义了BYTES、KILO_BYTES、MEGA_BYTES、GIGA_BYTES、TERA_BYTES;它有units属性,是一个string数组,用于指定每类单位的文本标识,最后匹配时都是转换为小写来匹配

FlinkYarnSessionCli

flink-1.7.1/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java

public class FlinkYarnSessionCli extends AbstractCustomCommandLine
{ //...... private ClusterSpecification createClusterSpecification(Configuration configuration, CommandLine cmd) { if (cmd.hasOption(container.getOpt())) { // number of containers is required option! LOG.info("The argument {} is deprecated in will be ignored.", container.getOpt()); } // TODO: The number of task manager should be deprecated soon final int numberTaskManagers; if (cmd.hasOption(container.getOpt())) { numberTaskManagers = Integer.valueOf(cmd.getOptionValue(container.getOpt())); } else { numberTaskManagers = 1; } // JobManager Memory final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes(); // Task Managers memory final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes(); int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); return new ClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMB) .setTaskManagerMemoryMB(taskManagerMemoryMB) .setNumberTaskManagers(numberTaskManagers) .setSlotsPerTaskManager(slotsPerTaskManager) .createClusterSpecification(); } //......}复制代码
  • FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB

flink-1.7.1/flink-dist/src/main/flink-bin/bin/config.sh

//......DEFAULT_ENV_PID_DIR="/tmp"                          # Directory to store *.pid files toDEFAULT_ENV_LOG_MAX=5                               # Maximum number of old log files to keepDEFAULT_ENV_JAVA_OPTS=""                            # Optional JVM argsDEFAULT_ENV_JAVA_OPTS_JM=""                         # Optional JVM args (JobManager)DEFAULT_ENV_JAVA_OPTS_TM=""                         # Optional JVM args (TaskManager)DEFAULT_ENV_JAVA_OPTS_HS=""                         # Optional JVM args (HistoryServer)DEFAULT_ENV_SSH_OPTS=""                             # Optional SSH parameters running in cluster modeDEFAULT_YARN_CONF_DIR=""                            # YARN Configuration Directory, if necessaryDEFAULT_HADOOP_CONF_DIR=""                          # Hadoop Configuration Directory, if necessary//......# Define FLINK_JM_HEAP if it is not already setif [ -z "${FLINK_JM_HEAP}" ]; then    FLINK_JM_HEAP=$(readFromConfig ${KEY_JOBM_MEM_SIZE} 0 "${YAML_CONF}")fi# Try read old config key, if new key not existsif [ "${FLINK_JM_HEAP}" == 0 ]; then    FLINK_JM_HEAP_MB=$(readFromConfig ${KEY_JOBM_MEM_MB} 0 "${YAML_CONF}")fi//......if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then    FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")    # Remove leading and ending double quotes (if present) of value    FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//'  -e 's/"$//' )"fiif [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then    FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")    # Remove leading and ending double quotes (if present) of value    FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//'  -e 's/"$//' )"fi//......# Arguments for the JVM. Used for job and task manager JVMs.# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys# KEY_JOBM_MEM_SIZE and KEY_TASKM_MEM_SIZE for that!if [ -z "${JVM_ARGS}" ]; then    JVM_ARGS=""fi//......复制代码
  • config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB
  • 如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空
  • JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置

flink-1.7.1/flink-dist/src/main/flink-bin/bin/jobmanager.sh

#!/usr/bin/env bash#################################################################################  Licensed to the Apache Software Foundation (ASF) under one#  or more contributor license agreements.  See the NOTICE file#  distributed with this work for additional information#  regarding copyright ownership.  The ASF licenses this file#  to you under the Apache License, Version 2.0 (the#  "License"); you may not use this file except in compliance#  with the License.  You may obtain a copy of the License at##      http://www.apache.org/licenses/LICENSE-2.0##  Unless required by applicable law or agreed to in writing, software#  distributed under the License is distributed on an "AS IS" BASIS,#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#  See the License for the specific language governing permissions and# limitations under the License.################################################################################# Start/stop a Flink JobManager.USAGE="Usage: jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all"STARTSTOP=$1HOST=$2 # optional when starting multiple instancesWEBUIPORT=$3 # optional when starting multiple instancesif [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then  echo $USAGE  exit 1fibin=`dirname "$0"`bin=`cd "$bin"; pwd`. "$bin"/config.shENTRYPOINT=standalonesessionif [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then    if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then	    echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"    else	    flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})	    FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})    fi    if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then        echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."        exit 1    fi    if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"    fi    # Add JobManager-specific JVM options    export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"    # Startup parameters    args=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster")    if [ ! -z $HOST ]; then        args+=("--host")        args+=("${HOST}")    fi    if [ ! -z $WEBUIPORT ]; then        args+=("--webui-port")        args+=("${WEBUIPORT}")    fifiif [[ $STARTSTOP == "start-foreground" ]]; then    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${args[@]}"else    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"fi复制代码
  • jobmanager.sh首先调用config.sh来初始化相关变量(FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS)
  • 如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量;如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;然后将FLINK_ENV_JAVA_OPTS_JM(依据env.java.opts.jobmanager配置)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts)中
  • jobmanager.sh最后调用flink-console.sh来启动相关类

flink-1.7.1/flink-dist/src/main/flink-bin/bin/flink-console.sh

#!/usr/bin/env bash#################################################################################  Licensed to the Apache Software Foundation (ASF) under one#  or more contributor license agreements.  See the NOTICE file#  distributed with this work for additional information#  regarding copyright ownership.  The ASF licenses this file#  to you under the Apache License, Version 2.0 (the#  "License"); you may not use this file except in compliance#  with the License.  You may obtain a copy of the License at##      http://www.apache.org/licenses/LICENSE-2.0##  Unless required by applicable law or agreed to in writing, software#  distributed under the License is distributed on an "AS IS" BASIS,#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#  See the License for the specific language governing permissions and# limitations under the License.################################################################################# Start a Flink service as a console application. Must be stopped with Ctrl-C# or with SIGTERM by kill or the controlling process.USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]"SERVICE=$1ARGS=("${@:2}") # get remaining arguments as arraybin=`dirname "$0"`bin=`cd "$bin"; pwd`. "$bin"/config.shcase $SERVICE in    (taskexecutor)        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner    ;;    (historyserver)        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer    ;;    (zookeeper)        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer    ;;    (standalonesession)        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint    ;;    (standalonejob)        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint    ;;    (*)        echo "Unknown service '${SERVICE}'. $USAGE."        exit 1    ;;esacFLINK_TM_CLASSPATH=`constructFlinkClassPath`log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')# Only set JVM 8 arguments if we have correctly extracted the versionif [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then    if [ "$JAVA_VERSION" -lt 18 ]; then        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"    fifiecho "Starting $SERVICE as a console application on host $HOSTNAME."exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"复制代码
  • flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN

小结

  • jobmanager.heap.size配置用于指定JobManager的大小,默认是1024m;jobmanager.heap.mb配置已经被废弃;ConfigurationUtils的getJobManagerHeapMemory方法从Configuration中读取配置,然后解析为MemorySize;MemorySize内部有个bytes字段,以bytes为单位,之后提供了getBytes、getKibiBytes、getMebiBytes、getGibiBytes、getTebiBytes方法用于快速换算;parse静态方法用于从文本中解析并创建MemorySize,其中parse方法可接收MemoryUnit参数用于文本中没有MemoryUnit时才使用的默认的MemoryUnit,最后都是调用的parseBytes方法
  • FlinkYarnSessionCli的createClusterSpecification方法使用到了ConfigurationUtils.getJobManagerHeapMemory(configuration)来读取jobManagerMemoryMB
  • config.sh首先判断环境变量FLINK_JM_HEAP是否有设置,没有的话,则从flink-conf.yaml中读取jobmanager.heap.size配置到FLINK_JM_HEAP;如果FLINK_JM_HEAP为0,则读取jobmanager.heap.mb的配置到FLINK_JM_HEAP_MB;如果没有设置FLINK_ENV_JAVA_OPTS,则从flink-conf.yaml中读取env.java.opts配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS,默认为空;如果没有设置FLINK_ENV_JAVA_OPTS_JM,则从flink-conf.yaml中读取env.java.opts.jobmanager配置,如果没有该配置则使用DEFAULT_ENV_JAVA_OPTS_JM,默认为空;JVM_ARGS变量会被job及task manager使用,如果没有设置,则初始化为空;注意不要设置内存相关参数到JVM_ARGS,要使用flink-conf.yaml中的jobmanager.heap.size、taskmanager.heap.size来配置
  • jobmanager.sh首先调用config.sh来初始化相关变量(FLINK_JM_HEAP、FLINK_JM_HEAP_MB、FLINK_ENV_JAVA_OPTS、FLINK_ENV_JAVA_OPTS_JM、JVM_ARGS);如果FLINK_JM_HEAP值大于0,则解析到FLINK_JM_HEAP_MB变量,如果FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中;它会将FLINK_ENV_JAVA_OPTS_JM(依据env.java.opts.jobmanager配置)追加到FLINK_ENV_JAVA_OPTS(依据env.java.opts)中;jobmanager.sh最后调用flink-console.sh来启动相关类
  • flink-console.sh在java小于8版本时会给JVM_ARGS追加-XX:MaxPermSize=256m;之后使用JVM_ARGS及FLINK_ENV_JAVA_OPTS作为jvm参数启动CLASS_TO_RUN

由此可见最后的jvm参数取决于JVM_ARGS及FLINK_ENV_JAVA_OPTS;其中注意不要设置内存相关参数到JVM_ARGS,因为jobmanager.sh在FLINK_JM_HEAP_MB大于0,则使用该值设置Xms及Xmx追加到JVM_ARGS变量中,而FLINK_JM_HEAP_MB则取决于FLINK_JM_HEAP或者jobmanager.heap.size配置;FLINK_ENV_JAVA_OPTS的配置则取决于env.java.opts以及env.java.opts.jobmanager;因而要配置jobmanager的heap大小的话,可以指定FLINK_JM_HEAP环境变量(比如FLINK_JM_HEAP=512m),或者在flink-conf.yaml中指定jobmanager.heap.size

doc

转载地址:http://xsnna.baihongyu.com/

你可能感兴趣的文章
记dynamic的一个小坑 -- RuntimeBinderException:“object”未包括“xxx”的定义
查看>>
代写初中语文作文|代写初中语文作文技巧分享
查看>>
linux字符设备文件的打开操作
查看>>
Servlet介绍以及简单实例
查看>>
[js高手之路] 跟GhostWu一起封装一个字符串工具库-架构篇(1)
查看>>
Java.ftp上传下载
查看>>
【Node.js】4.从一个例子切入Node js的规范
查看>>
实施微服务架构的关键技术
查看>>
使用云服务器不得不知的操作禁忌
查看>>
“流”的思维—Workflowy
查看>>
LR IP欺骗
查看>>
关于Java读取mysql中date类型字段默认值'0000-00-00'的问题
查看>>
儿童小孩的书籍
查看>>
Git 同时与多个远程库互相同步
查看>>
为什么Linux下的环境变量要用大写而不是小写
查看>>
Linux CentOS7.0 (01)在Vmvare Workstation上 安装配置
查看>>
剑指offer九之变态跳台阶
查看>>
php rabbitmq操作类及生产者和消费者实例代码 转
查看>>
js 格式化日期 ("/Date(1400046388387)/")
查看>>
Caffe CNN特征可视化
查看>>