Linux-Manual

Wormhole + Flink 最佳实践

本文声明


前置声明


本文目标

{
   "id": 1,
   "name": "test",
   "phone": "18074546423",
   "city": "Beijing",
   "time": "2017-12-22 10:00:00"
}

服务器基础环境设置

特别说明

未命名文件(1).png

服务器基础配置

hostnamectl --static set-hostname linux01
hostnamectl --static set-hostname linux02
hostnamectl --static set-hostname linux03
hostnamectl --static set-hostname linux04
172.16.0.55       linux01
172.16.0.92       linux02
172.16.0.133      linux03
172.16.0.159      linux04
生产密钥对
ssh-keygen -t rsa


公钥内容写入 authorized_keys
cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys

测试:
ssh localhost

将公钥复制到其他机子
ssh-copy-id -i ~/.ssh/id_rsa.pub -p 22 root@linux02(根据提示输入 linux02 密码)

ssh-copy-id -i ~/.ssh/id_rsa.pub -p 22 root@linux03(根据提示输入 linux03 密码)

ssh-copy-id -i ~/.ssh/id_rsa.pub -p 22 root@linux04(根据提示输入 linux04 密码)


在 linux01 上测试
ssh linux01

ssh linux02

ssh linux03

ssh linux04
[hadoop-host]
linux01
linux02
linux03

[kafka-host]
linux04

linux01 | CHANGED | rc=0 >>
  PID TTY          TIME CMD
11088 pts/7    00:00:00 sh
11101 pts/7    00:00:00 python
11102 pts/7    00:00:00 ps

linux02 | CHANGED | rc=0 >>
  PID TTY          TIME CMD
10590 pts/1    00:00:00 sh
10603 pts/1    00:00:00 python
10604 pts/1    00:00:00 ps

linux03 | CHANGED | rc=0 >>
  PID TTY          TIME CMD
10586 pts/1    00:00:00 sh
10599 pts/1    00:00:00 python
10600 pts/1    00:00:00 ps

linux04 | CHANGED | rc=0 >>
  PID TTY          TIME CMD
10574 pts/1    00:00:00 sh
10587 pts/1    00:00:00 python
10588 pts/1    00:00:00 ps

服务器基础组件(CentOS 7.x)

- hosts: all
  remote_user: root
  tasks:
    - name: Disable SELinux at next reboot
      selinux:
        state: disabled
        
    - name: disable firewalld
      command: ""
      with_items:
         - systemctl stop firewalld
         - systemctl disable firewalld
         
    - name: install-basic
      command: ""
      with_items:
         - yum install -y zip unzip lrzsz git epel-release wget htop deltarpm
         
    - name: install-vim
      shell: ""
      with_items:
         - yum install -y vim
         - curl https://raw.githubusercontent.com/wklken/vim-for-server/master/vimrc > ~/.vimrc
         
    - name: install-docker
      shell: ""
      with_items:
         - yum install -y yum-utils device-mapper-persistent-data lvm2
         - yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
         - yum makecache fast
         - yum install -y docker-ce
         - systemctl start docker.service
         - docker run hello-world
         
    - name: install-docker-compose
      shell: ""
      with_items:
         - curl -L https://github.com/docker/compose/releases/download/1.18.0/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
         - chmod +x /usr/local/bin/docker-compose
         - docker-compose --version
         - systemctl restart docker.service
         - systemctl enable docker.service
         

Wormhole 所需组件安装

关于版本号和端口问题

JDK 安装

scp -r /opt/jdk-8u191-linux-x64.tar.gz root@linux02:/opt

scp -r /opt/jdk-8u191-linux-x64.tar.gz root@linux03:/opt

scp -r /opt/jdk-8u191-linux-x64.tar.gz root@linux04:/opt
- hosts: all
  remote_user: root
  tasks:
    - name: copy jdk
      copy: src=/opt/jdk-8u191-linux-x64.tar.gz dest=/usr/local
      
    - name: tar jdk
      shell: cd /usr/local && tar zxf jdk-8u191-linux-x64.tar.gz
      
    - name: set JAVA_HOME
      blockinfile: 
        path: /etc/profile
        marker: "#{mark} JDK ENV"
        block: |
          JAVA_HOME=/usr/local/jdk1.8.0_191
          JRE_HOME=$JAVA_HOME/jre
          PATH=$PATH:$JAVA_HOME/bin
          CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
          export JAVA_HOME
          export JRE_HOME
          export PATH
          export CLASSPATH
    
    - name: source profile
      shell: source /etc/profile

Hadoop 集群(HDFS,YARN)

- hosts: hadoop-host
  remote_user: root
  tasks:
    - name: Creates directory
      file:
        path: /data/hadoop/hdfs/name
        state: directory
    - name: Creates directory
      file:
        path: /data/hadoop/hdfs/data
        state: directory
    - name: Creates directory
      file:
        path: /data/hadoop/hdfs/tmp
        state: directory

    - name: set HADOOP_HOME
      blockinfile: 
        path: /etc/profile
        marker: "#{mark} HADOOP ENV"
        block: |
          HADOOP_HOME=/usr/local/hadoop
          HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
          YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
          PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
          export HADOOP_CONF_DIR
          export YARN_CONF_DIR
          export HADOOP_HOME
          export PATH
    
    - name: source profile
      shell: source /etc/profile

Zookeeper

Kafka

034 行:listeners=PLAINTEXT://0.0.0.0:9092
039 行:advertised.listeners=PLAINTEXT://linux04:9092
119 行:zookeeper.connect=linux04:2181
补充  :auto.create.topics.enable=true

MySQL

[mysql]
default-character-set = utf8

[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
symbolic-links=0

log-error=/var/log/mysql/error.log
default-storage-engine = InnoDB
collation-server = utf8_unicode_ci
init_connect = 'SET NAMES utf8'
character-set-server = utf8
lower_case_table_names = 1
max_allowed_packet = 50M

Spark

vim /etc/profile

SPARK_HOME=/usr/local/spark
PATH=$PATH:${SPARK_HOME}/bin:${SPARK_HOME}/sbin
export SPARK_HOME
export PATH

source /etc/profile
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

非必须组件


Wormhole 安装 + 配置

cd /usr/local/kafka && bin/kafka-topics.sh --list --zookeeper linux04:2181
cd /usr/local/kafka && bin/kafka-topics.sh --create --zookeeper linux04:2181 --replication-factor 1 --partitions 1 --topic source
cd /usr/local/kafka && bin/kafka-topics.sh --create --zookeeper linux04:2181 --replication-factor 1 --partitions 1 --topic wormhole_feedback
cd /usr/local/kafka && bin/kafka-topics.sh --create --zookeeper linux04:2181 --replication-factor 1 --partitions 1 --topic wormhole_heartbeat

akka.http.server.request-timeout = 120s

wormholeServer {
  cluster.id = "" #optional global uuid
  host = "linux01"
  port = 8989
  ui.default.language = "Chinese"
  token.timeout = 1
  token.secret.key = "iytr174395lclkb?lgj~8u;[=L:ljg"
  admin.username = "admin"    #default admin user name
  admin.password = "admin"    #default admin user password
}

mysql = {
  driver = "slick.driver.MySQLDriver$"
  db = {
    driver = "com.mysql.jdbc.Driver"
    user = "root"
    password = "aaabbb123456"
    url = "jdbc:mysql://linux04:3306/wormhole?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
    numThreads = 4
    minConnections = 4
    maxConnections = 10
    connectionTimeout = 3000
  }
}

#ldap = {
#  enabled = false
#  user = ""
#  pwd = ""
#  url = ""
#  dc = ""
#  read.timeout = 3000
#  read.timeout = 5000
#  connect = {
#    timeout = 5000
#    pool = true
#  }
#}

spark = {
  wormholeServer.user = "root"   #WormholeServer linux user
  wormholeServer.ssh.port = 22       #ssh port, please set WormholeServer linux user can password-less login itself remote
  spark.home = "/usr/local/spark"
  yarn.queue.name = "default"        #WormholeServer submit spark streaming/job queue
  wormhole.hdfs.root.path = "hdfs://linux01/wormhole"   #WormholeServer hdfslog data default hdfs root path
  yarn.rm1.http.url = "linux01:8088"    #Yarn ActiveResourceManager address
  yarn.rm2.http.url = "linux01:8088"   #Yarn StandbyResourceManager address
}

flink = {
  home = "/usr/local/flink"
  yarn.queue.name = "default"
  feedback.state.count=100
  checkpoint.enable=false
  checkpoint.interval=60000
  stateBackend="hdfs://linux01/flink-checkpoints"
  feedback.interval=30
}

zookeeper = {
  connection.url = "linux04:2181"  #WormholeServer stream and flow interaction channel
  wormhole.root.path = "/wormhole"   #zookeeper
}

kafka = {
  brokers.url = "linux04:9092"
  zookeeper.url = "linux04:2181"
  topic.refactor = 1
  using.cluster.suffix = false #if true, _${cluster.id} will be concatenated to consumer.feedback.topic
  consumer = {
    feedback.topic = "wormhole_feedback"
    poll-interval = 20ms
    poll-timeout = 1s
    stop-timeout = 30s
    close-timeout = 20s
    commit-timeout = 70s
    wakeup-timeout = 60s
    max-wakeups = 10
    session.timeout.ms = 60000
    heartbeat.interval.ms = 50000
    max.poll.records = 1000
    request.timeout.ms = 80000
    max.partition.fetch.bytes = 10485760
  }
}

#kerberos = {
#  keyTab=""      #the keyTab will be used on yarn
#  spark.principal=""   #the principal of spark
#  spark.keyTab=""      #the keyTab of spark
#  server.config=""     #the path of krb5.conf
#  jaas.startShell.config="" #the path of jaas config file which should be used by start.sh
#  jaas.yarn.config=""     #the path of jaas config file which will be uploaded to yarn
#  server.enabled=false   #enable wormhole connect to Kerberized cluster
#}

# choose monitor method among ES、MYSQL
monitor ={
   database.type="MYSQL"
}

#Wormhole feedback data store, if doesn't want to config, you will not see wormhole processing delay and throughput
#if not set, please comment it

#elasticSearch.http = {
#  url = "http://localhost:9200"
#  user = ""
#  password = ""
#}

#display wormhole processing delay and throughput data, get admin user token from grafana
#garfana should set to be anonymous login, so you can access the dashboard through wormhole directly
#if not set, please comment it

#grafana = {
#  url = "http://localhost:3000"
#  admin.token = "jihefouglokoj"
#}

#delete feedback history data on time
maintenance = {
  mysql.feedback.remain.maxDays = 7
  elasticSearch.feedback.remain.maxDays = 7
}


#Dbus integration, support serveral DBus services, if not set, please comment it

#dbus = {
#  api = [
#    {
#      login = {
#        url = "http://localhost:8080/keeper/login"
#        email = ""
#        password = ""
#      }
#      synchronization.namespace.url = "http://localhost:8080/keeper/tables/riderSearch"
#    }
#  ]
#}

创建用户


创建 Source 需要涉及的概念

创建 Instance

创建 Database

创建 Namespace

{
   "id": 1,
   "name": "test",
   "phone": "18074546423",
   "city": "Beijing",
   "time": "2017-12-22 10:00:00"
}

创建 Sink 需要涉及的概念

创建 Instance

创建 Database

创建 Namespace


创建 Project



Kafka 发送测试数据

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 1, "name": "test1", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:01:00"}

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 2, "name": "test2", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:02:00"}

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 3, "name": "test3", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:03:00"}

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 4, "name": "test4", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:04:00"}

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 5, "name": "test5", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:05:00"}

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 6, "name": "test6", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:06:00"}

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 7, "name": "test7", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:07:00"}

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 8, "name": "test8", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:08:00"}

data_increment_data.kafka.source_kafka.source.ums_extension.*.*.*@@@{"id": 9, "name": "test9", "phone":"18074546423", "city": "Beijing", "time": "2017-12-22 10:09:00"}