k8s1.26.5 安装 flink1.17.1
  ArsYYS04oliO 2023年11月14日 47 0

标签(空格分隔): kubernetes系列


一: 系统环境介绍

系统:
   centos7.9x64
k8s 集群版本:
   k8s1.26.5 采用kubesphere 做页面

caclico 版本:
    calico v3.26.1

containerd版本:
     containerd://1.6.24

hadoop 版本:
     hadoop 3.3.6 

helm 版本:
   helm 3.9.0


二:编译得到flink 的镜像

2.1 准备文件:

1. jdk 软件
   jdk-8u381-linux-x64.tar.gz
   
   https://www.oracle.com/sg/java/technologies/javase/javase8u381-later-archive-downloads.html
   
   flink 软件:
   flink-1.17.1-bin-scala_2.12.tgz
  https://dlcdn.apache.org/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz

2.3 编辑dockerfile 文件

cat > /root/images/Dockerfile << 'EOF'
FROM centos:7.9.2009

USER root

# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone

RUN mkdir -p /opt/apache

ADD jdk-8u381-linux-x64.tar.gz /opt/apache/

ADD flink-1.17.1-bin-scala_2.12.tgz  /opt/apache/

ENV FLINK_HOME /opt/apache/flink-1.17.1
ENV JAVA_HOME /opt/apache/jdk1.8.0_381
ENV PATH $JAVA_HOME/bin:$PATH

# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/

#RUN mkdir home
COPY docker-entrypoint.sh $FLINK_HOME/

RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

RUN chown -R admin:admin $FLINK_HOME
RUN chmod +x $FLINK_HOME/docker-entrypoint.sh

#设置的工作目录
WORKDIR $FLINK_HOME

# 切换到 admin 用户
USER admin

# 对外暴露端口
EXPOSE 6123 8081

# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["./docker-entrypoint.sh"]
CMD ["help"]

EOF

cat > /root/images/docker-entrypoint.sh << 'EOF'
#!/usr/bin/env bash

###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

drop_privs_cmd() {
    if [ $(id -u) != 0 ]; then
        # Don't need to drop privs if EUID != 0
        return
    elif [ -x /sbin/su-exec ]; then
        # Alpine
        echo su-exec admin
    else
        # Others
        echo gosu admin
    fi
}
copy_plugins_if_required() {
  if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
    return 0
  fi
  echo "Enabling required built-in plugins"
  for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
    echo "Linking ${target_plugin} to plugin directory"
    plugin_name=${target_plugin%.jar}
    mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
    if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
      echo "Plugin ${target_plugin} does not exist. Exiting."
      exit 1
    else
      ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
      echo "Successfully enabled ${target_plugin}"
    fi
  done
}
set_config_option() {
  local option=$1
  local value=$2
  # escape periods for usage in regular expressions
  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
  # either override an existing entry, or append a new one
  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
        sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
  else
        echo "${option}: ${value}" >> "${CONF_FILE}"
  fi
}
prepare_configuration() {
    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
    set_config_option blob.server.port 6124
    set_config_option query.server.port 6125
    if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
    fi
    if [ -n "${FLINK_PROPERTIES}" ]; then
        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
    fi
    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
maybe_enable_jemalloc() {
    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
        JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
        JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
        if [ -f "$JEMALLOC_PATH" ]; then
            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
        elif [ -f "$JEMALLOC_FALLBACK" ]; then
            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
        else
            if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
                MSG_PATH=$JEMALLOC_PATH
            else
                MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
            fi
            echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
        fi
    fi
}
maybe_enable_jemalloc
copy_plugins_if_required
prepare_configuration
args=("$@")
if [ "$1" = "help" ]; then
    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
    printf "    Or $(basename "$0") help\n\n"
    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
    exit 0
elif [ "$1" = "jobmanager" ]; then
    args=("${args[@]:1}")
    echo "Starting Job Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
    args=("${args[@]:1}")
    echo "Starting Job Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
    args=("${args[@]:1}")
    echo "Starting History Server"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
    args=("${args[@]:1}")
    echo "Starting Task Manager"
    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi
args=("${args[@]}")
# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"

EOF

# 查看镜像
nerdctl -n k8s.io images
# 生成镜像导入containerd
buildctl build --frontend=dockerfile.v0 --local context=. --local dockerfile=. --output type=docker,name=bigdata/flink:1.17.1 | ctr -n k8s.io image import -
# 查看镜像
nerdctl -n k8s.io images

image.png image.png

导出镜像:
nerdctl -n k8s.io save -o flink.tar.gz  docker.io/bigdata/flink:1.17.1

导入镜像:
nerdctl -n k8s.io load -i flink.tar.gz

三:部署flink

3.1 创建共享存储

yum install rpcbind nfs-utils 

mkdir -p /nfs/data/flink/job-artifacts

echo "/nfs/data" > /etc/exportfs 

service nfs restart 

tree /nfs/data/flink

image.png

创建flink pod 的运行空间
  kubectl create ns flink 

image.png


创建 PV 

mkdir /root/images/flink/

cat > /root/images/flink/pv.yaml << 'EOF'
apiVersion: v1
kind: PersistentVolume
metadata:
  name: flink-pv
spec:
  capacity:
    storage: 5Gi
  accessModes:
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: flink-storage
  nfs:
    path: /nfs/data/flink/job-artifacts
    server: 172.16.10.11
EOF

kubectl apply -f pv.yaml
kubectl get pv

image.png

创建PVC 

cat > /root/images/flink/pvc.yaml << 'EOF'
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-pvc
  namespace: flink
spec:
  storageClassName: "flink-storage"
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 5Gi
EOF

kubectl apply -f pvc.yaml
kubectl get pvc -n flink

image.png image.png

cat > /root/images/flink/sc.yaml << 'EOF'
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: flink-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: Immediate
EOF

kubectl apply -f sc.yaml
kubectl get sc

image.png image.png

3.2 安装部署flink

创建CM 

cat > /root/images/flink/cm.yaml << 'EOF'
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  namespace: flink
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 3200m
    taskmanager.memory.process.size: 2728m
    taskmanager.memory.flink.size: 2280m
    parallelism.default: 2    
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
EOF

kubectl apply -f cm.yaml
kubectl describe cm flink-config -n flink
kubectl get cm flink-config -n flink

image.png

创建 SVC

cat > /root/images/flink/svc.yaml << 'EOF'
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
  namespace: flink
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager-query-state
  namespace: flink
spec:
  type: NodePort
  ports:
  - name: query-state
    port: 6125
    targetPort: 6125
    nodePort: 30025
  selector:
    app: flink
    component: taskmanager
EOF

kubectl apply -f svc.yaml
kubectl get svc -n flink

image.png

cat > /root/images/flink/pod.yaml << 'EOF'
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      nodeName: flyfish11
      containers:
      - name: jobmanager
        image: bigdata/flink:1.17.1
        imagePullPolicy: IfNotPresent
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/apache/flink-1.17.1/conf/
        - name: job-artifacts-volume
          mountPath: /opt/apache/flink-1.17.1/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: job-artifacts-volume
        persistentVolumeClaim:
          claimName: flink-pvc
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager-1
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      nodeName: flyfish12
      containers:
      - name: taskmanager
        image: bigdata/flink:1.17.1
        imagePullPolicy: IfNotPresent
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/apache/flink-1.17.1/conf/
        - name: job-artifacts-volume
          mountPath: /opt/apache/flink-1.17.1/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: job-artifacts-volume
        persistentVolumeClaim:
          claimName: flink-pvc
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager-2
  namespace: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      nodeName: flyfish13
      containers:
      - name: taskmanager
        image: bigdata/flink:1.17.1
        imagePullPolicy: IfNotPresent
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/apache/flink-1.17.1/conf/
        - name: job-artifacts-volume
          mountPath: /opt/apache/flink-1.17.1/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: job-artifacts-volume
        persistentVolumeClaim:
          claimName: flink-pvc
---
EOF

kubectl apply -f pod.yaml
kubectl get pod -n flink -o wide

image.png image.png image.png

打开web:

  http://172.16.10.11:30081

image.png image.png

3.4 测试集群

运行jar包测试:
  tar -zxvf flink-1.17.1-bin-scala_2.12.tgz
  
  cd flink-1.17.1/
  ./bin/flink run -m 172.16.10.11:30081 ./examples/batch/WordCount.jar

image.png

./bin/flink run -m 172.16.10.11:30081 ./examples/streaming/WordCount.jar

image.png image.png image.png

进入pod 提交作业
  kubectl get pod -n flink 
   kubectl cp ./test1-1.0-SNAPSHOT.jar flink-jobmanager-57854c9cf6-wlf4j:/tmp -n flink

image.png


kubectl exec -it flink-jobmanager-57854c9cf6-wlf4j -n flink -- bash
./bin/flink run -c com.zuo.test.FlinkWC /tmp/test1-1.0-SNAPSHOT.jar

image.png image.png image.png

3.5 卸载集群

卸载 flink 集群

cd /root/images/flink

kubectl delete -f pod.yaml
kubectl get pod -n flink -o wide

kubectl delete -f svc.yaml
kubectl get svc -n flink

kubectl delete -f cm.yaml
kubectl get cm flink-config -n flink

kubectl delete -f pvc.yaml
kubectl get pvc -n flink

kubectl delete -f pv.yaml
kubectl get pv

kubectl delete -f sc.yaml
kubectl get sc

kubectl delete namespace flink
kubectl get namespace

删除目录

rm -rf /nfs/data/flink
mkdir -p /nfs/data/flink/job-artifacts
tree /nfs/data/flink
 
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

ArsYYS04oliO