7.13-7.16技术培训实记
Published in:2023-07-12 | category: Computer Technology
Words: 11.8k | Reading time: 52min | reading:

培训报告

[学院组织]

[7.13-7.16]

培训目的

本次技术培训的目的是提升参与人员在特定技术领域的知识和技能,以应对公司在该领域面临的挑战和需求。通过培训,参与人员将能够掌握相关技术,并将其应用于日常工作中,从而提高工作效率和质量。

07-13 基础环境搭建

基础环境

搭建步骤

  1. 修改主机名,便于识别节点;

    1
    2
    3
    4
    #  修改主机名
    hostnamectl set-hostname <hostname>
    # 刷新
    bash
  2. 修改hosts文件,添加集群节点映射,按照给出的节点IP和对应的主机名进行设置;

    1
    2
    # 修改hosts文件内容
    vim /etc/hosts
    1
    2
    # 【局域网ip】 【主机名】
    [ip] [hostname]
  3. 要求各节点时区修改为中国时区( 中国标准时间CST+8)

    1
    2
    # 修改为中国时区
    timedatectl set-timezone Asia/Shanghai
  4. 安装ntp服务,要求主节点master为本地时钟源,从节点设置定时任务同步本地时间;

    1
    2
    # 修改master节点NTP配置
    vim /etc/ntp.conf

    设置master为本地时间服务器,屏蔽默认server,服务器层级设为10

    1
    2
    3
    # server <默认服务器>
    server 127.127.1.0
    fudge 127.127.1.0 stratum 10

    启动ntp服务

    1
    2
    systemctl start ntp
    systemctl restart ntpd.service

    添加定时任务–在早十-晚五时间段内每隔半个小时同步一次本地服务器时间(24小时制、使用用户root任务调度crontab,服务器地址使用主机名)

    1
    sudo crontab -e
    1
    */30 10-17 * * * /usr/sbin/ntpdate -u <主机名>
  5. 集群中数据传输需要节点之间免密访问,要求设置主节点之间到从节点的免密访问;
    主节点生成公钥文件id_rsa.pub(数字签名RSA,用户root,主机名master)

    1
    ssh-keygen

    建⽴master⾃身使⽤root⽤户ssh访问localhost免密登录
    若没有authorized_keys则使用下面命令创建

    1
    cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys

    建⽴master使⽤root⽤户到slave的ssh免密登录访问

    1
    ssh-copy-id root@<从节点IP或主机名>

    测试

    1
    ssh-copy-id root@<从节点IP或主机名>
  6. 配置Java环境
    解压下载的java JDK

    1
    tar -zxvf [jdk]

    添加系统环境变量

    1
    vim /etc/profile

    添加内容

    1
    2
    export JAVA_HOME=<you_java_jdk_path>
    export PATH="$JAVA_HOME/bin:$PATH"

    配置生效

    1
    source /etc/profile
  7. 分发

    1
    2
    scp -r /etc/profile root@slave:/etc/
    scp -r /usr/java root@slave:/usr/

Zookeeper

概念

ZooKeeper是一个开源的分布式协调服务,它为分布式应用程序提供了高度可靠的协调功能。它旨在解决分布式系统中的一些常见问题,如配置管理、命名服务、分布式锁、分布式协调等。

ZooKeeper的设计目标是提供一个简单而高效的分布式协调服务,它采用了基于观察者模式的数据模型。在ZooKeeper中,数据被组织为一个分层的命名空间(类似于文件系统的目录结构),称为ZooKeeper树(ZooKeeper
tree)或ZooKeeper命名空间(ZooKeeper namespace)。每个节点在树中都有一个唯一的路径标识,并可以存储一个小的数据块。

ZooKeeper提供了临时节点、顺序节点、观察者机制等特性,可以用于实现分布式锁、选主(Leader
Election)、集群管理、分布式队列等场景。它的核心是一个高可用的、基于主从架构的协调服务器集群,通过选举机制确保了服务的高可用性和可靠性。

在分布式系统中,ZooKeeper被广泛应用于各种场景,如大数据、分布式数据库、消息队列、分布式应用程序等。它为分布式应用程序提供了一个可靠的、高性能的基础设施,帮助开发人员简化了分布式系统的设计和实现。

Zookeeper是一个分布式服务框架,是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
预装的配置文件zoo_sample.cfg下面默认有五个属性,分别是:

  1. tickTime
    心跳间隔,单位是毫秒,系统默认是2000毫秒,也就是间隔两秒心跳一次。
    tickTime的意义:客户端与服务器或者服务器与服务器之间维持心跳,也就是每个tickTime时间就会发送一次心跳。通过心跳不仅能够用来监听机器的工作状态,还可以通过心跳来控制Flower跟Leader的通信时间,默认情况下FL的会话时常是心跳间隔的两倍。

  2. initLimit
    集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。

  3. syncLimit
    集群中flower服务器(F)跟leader(L)服务器之间的请求和答应最多能容忍的心跳数。

  4. clientPort
    客户端连接的接口,客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接收客户端的请求访问,端口默认是2181。

  5. dataDir
    该属性对应的目录是用来存放myid信息跟一些版本,日志,跟服务器唯一的ID信息等。
    在集群Zookeeper服务在启动的时候,会回去读取zoo.cfg这个文件,从这个文件中找到这个属性然后获取它的值也就是dataDir
    的路径,它会从这个路径下面读取myid这个文件,从这个文件中获取要启动的当前服务器的地址。

集群信息的配置:
在配置文件中,配置集群信息是存在一定的格式:service.N =YYY: A:B
N:代表服务器编号(准确对应对应服务器中myid里面的值)
YYY:服务器地址
A:表示 Flower 跟 Leader的通信端口,简称服务端内部通信的端口(默认2888)
B:表示是选举端口(默认是3888)

参考配置文件

1
2
3
4
5
6
7
8
9
10
11
12
tickTime=2000
initLimit=10
syncLimit=5
clientPort=2181
# 配置数据存储路径
????
# 配置日志文件路径
????
# 配置集群列表
server.1=????
server.2=????
server.3=????

搭建步骤

  1. 将zookeeper安装包解压

    1
    tar -zxvf [zookeeper]
  2. 配置系统变量ZOOKEEPER_HOME,同时将Zookeeper安装路径中bin目录加入PATH系统变量

    1
    vim /etc/profile
    1
    2
    3
    # zookeeper
    export ZOOKEEPER_HOME=/usr/zookeeper/zookeeper-3.4.14
    export PATH="$ZOOKEEPER_HOME/bin:$PATH"
    1
    source /etc/profile
  3. Zookeeper的默认配置文件为Zookeeper安装路径下conf/zoo_sample.cfg,将其修改为zoo.cfg

    1
    2
    cd zookeeper/zookeeper-3.4.14/conf/
    mv zoo_sample.cfg zoo.cfg
  4. 设置数据存储路径(dataDir)为/usr/zookeeper/zookeeper-3.4.14/zkdata

    1
    vim zoo.cfg
    1
    dataDir=/usr/zookeeper/zookeeper-3.4.14/zkdata
  5. 设置日志文件路径(dataLogDir)为/usr/zookeeper/zookeeper-3.4.14/zkdatalog

    1
    vim zoo.cfg
    1
    dataLogDir=/usr/zookeeper/zookeeper-3.4.14/zkdatalog
  6. 设置集群列表(要求master为1号服务器,slave1为2号服务器,slave2为3号服务器)

    1
    vim zoo.cfg
    1
    2
    3
    server.1=master:2888:3888
    server.2=slave1:2888:3888
    server.3=slave2:2888:3888
  7. 创建所需数据存储文件夹、日志存储文件夹

    1
    mkdir -p /usr/zookeeper/zookeeper-3.4.14/{zkdata,zkdatalog}
  8. 数据存储路径下创建myid,写入对应的标识主机服务器序号

    1
    2
    cd /usr/zookeeper/zookeeper-3.4.14/zkdata
    vim myid

    server.1 中的 1 就是主机服务器的序号

    1
    1
  9. 分发

    1
    2
    scp -r /usr/zookeeper/ root@slave:/usr
    scp /etc/profile root@slave:/etc
  10. 启动服务,查看进程QuorumPeerMain是否存在

    1
    zkServer.sh start
  11. 查看各节点服务器角色是否正常(leader/follower)

    1
    zkServer.sh status

Hadoop

概述

Hadoop是一个开源的分布式计算框架,旨在处理大规模数据集的存储和处理。它提供了可靠性、可扩展性和容错性,适用于在集群中并行处理大量数据的场景。

Hadoop的核心组件包括:

  1. Hadoop Distributed File System (HDFS):这是一个分布式文件系统,用于存储大规模数据集。它将数据分散存储在集群中的多个节点上,并提供高容错性和高可靠性,以支持大规模数据的存储和处理。

  2. MapReduce:这是一个分布式数据处理模型,用于在Hadoop集群上并行处理大规模数据。MapReduce将计算任务分解为可并行执行的“映射”(Map)和“归约”(Reduce)阶段,允许在分布式环境中处理海量数据。

除了核心组件之外,Hadoop生态系统还包括许多其他工具和组件,如:

  • YARN (Yet Another Resource Negotiator):作为Hadoop的资源管理器,负责集群资源的管理和任务调度。
  • Hive:一个基于Hadoop的数据仓库和查询工具,提供类似于SQL的查询语言,用于分析和处理存储在Hadoop上的数据。
  • Spark:一个快速、通用的大数据处理引擎,提供高级别的API,支持内存计算和更复杂的数据处理模式。
  • HBase:一个分布式、可扩展的列式数据库,用于存储和访问结构化数据。
  • Pig:一种高级数据流脚本语言,用于编写复杂的数据转换和分析任务。
  • ZooKeeper:一个分布式协调服务,用于管理和协调分布式系统中的各种任务。

Hadoop被广泛应用于大数据领域,它可以处理海量数据并提供可靠的数据存储和分析能力。它的设计理念和架构使得它适用于构建可扩展的分布式系统,以满足日益增长的大数据需求。

Hadoop是由Java语言编写的,在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架,其核心部件是HDFS与MapReduce。

  1. HDFS是一个分布式文件系统:引入存放文件元数据信息的服务器Namenode和实际存放数据的服务器Datanode,对数据进行分布式储存和读取。
  2. MapReduce是一个计算框架:MapReduce的核心思想是把计算任务分配给集群内的服务器里执行。通过对计算任务的拆分(Map计算/Reduce计算)再根据任务调度器(JobTracker)对任务进行分布式计算。
配置文件 配置对象 主要内容
hadoop-env.sh hadoop运行环境 用来定义Hadoop运行环境相关的配置信息;
core-site.xml 集群全局参数 定义系统级别的参数,包括HDFS URL、Hadoop临时目录等;
hdfs-site.xml HDFS参数 定义名称节点、数据节点的存放位置、文本副本的个数、文件读取权限等;
mapred-site.xml MapReduce参数 包括JobHistory Server 和应用程序参数两部分,如reduce任务的默认个数、任务所能够使用内存的默认上下限等;
yarn-site.xml 集群资源管理系统参数 配置ResourceManager ,nodeManager的通信端口,web监控端口等;

Hadoop的配置类是由资源指定的,资源可以由一个String或Path来指定,资源以XML形式的数据表示,由一系列的键值对组成。资源可以用String或path命名(示例如下),

  1. String:指示hadoop在classpath中查找该资源;
  2. Path:指示hadoop在本地文件系统中查找该资源。

配置示例

1
2
3
4
5
6
<configuration>
<property>
<name>fs.default.name</name>
<value>????</value>
</property>
</configuration>
  1. core-site.xml

    配置参数 说明
    fs.default.name 用于指定NameNode的地址
    hadoop.tmp.dir Hadoop运行时产生文件的临时存储目录
  2. hdfs-site.xml

    配置参数 说明
    dfs.replication 指定备份数
    dfs.namenode.name.dir NameNode在本地文件系统中持久存储命名空间和事务日志的路径
    dfs.datanode.data.dir DataNode在本地文件系统中存放块的路径
    dfs.permissions 集群权限系统校验
    dfs.datanode.use.datanode.hostname datanode之间通过域名方式通信

    注意:外域机器通信需要用外网IP,未配置hostname访问会访问异常。可以在Java api客户端使用conf.set(“
    fs.client.use.datanode.hostname”,”true”);。

  3. mapreduce-site.xml

    配置参数 说明
    mapreduce.framework.name 指定执行MapReduce作业的运行时框架。属性值可以是local,classic或yarn
  4. yarn-site.xml

    配置参数 说明
    yarn.resourcemanager.admin.address 用于指定RM管理界面的地址(主机:端口)
    yarn.nodemanager.aux-services mapreduce 获取数据的方式,指定在进行mapreduce作业时,yarn使用mapreduce_shuffle混洗技术。这个混洗技术是hadoop的一个核心技术,非常重要。
    yarn.nodemanager.auxservices.mapreduce.shuffle.class 用于指定混洗技术对应的字节码文件,值为org.apache.hadoop.mapred.ShuffleHandler

配置步骤

  1. Hadoop安装包解压

    1
    tar -zxvf [hadoop]
  2. 配置环境变量HADOOP_HOME,将Hadoop安装路径中bin目录和sbin目录加入PATH系统变量

    1
    vim /etc/profile
    1
    2
    3
    # hadoop
    export HADOOP_HOME=/usr/hadoop/hadoop-2.7.7
    export PATH="$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$PATH"
    1
    source /etc/profile
  3. 配置Hadoop运行环境JAVA_HOME

    1
    2
    cd /usr/hadoop/hadoop-2.7.7/etc/hadoop
    vim hadoop-env.sh
    1
    export JAVA_HOME=/usr/java/jdk1.8.0_221
  4. 设置全局参数,指定HDFS上NameNode地址为master,端口默认为9000;指定临时存储目录为本地/root/hadoopData/tmp(要求为绝对路径,下同)

    1
    vim core-site.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <configuration>
    <property>
    <name>fs.default.name</name>
    <value>hdfs://master:9000</value>
    </property>
    <property>
    <name>hadoop.tmp.dir</name>
    <value>/root/hadoopData/tmp</value>
    </property>
    </configuration>
  5. 设置HDFS参数,指定备份文本数量为2;设置HDFS参数,指定NN存放元数据信息路径为本地/root/hadoopData/name;指定DN存放元数据信息路径为本地/root/hadoopData/data(要求为绝对路径);设置HDFS参数,关闭hadoop集群权限校验(安全配置),允许其他用户连接集群;指定datanode之间通过域名方式进行通信

    1
    vim hdfs-site.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <configuration>
    <property>
    <name>dfs.replication</name>
    <value>2</value>
    </property>
    <property>
    <name>dfs.namenode.name.dir</name>
    <value>/root/hadoopData/name</value>
    </property>
    <property>
    <name>dfs.datanode.data.dir</name>
    <value>/root/hadoopData/data</value>
    </property>
    <property>
    <name>dfs.permissions</name>
    <value>false</value>
    </property>
    <property>
    <name>dfs.datanode.use.datanode.hostname</name>
    <value>true</value>
    </property>
    </configuration>
  6. 设置YARN运行环境$JAVA_HOME参数

    1
    vim yarn-env.sh
    1
    export JAVA_HOME=/usr/java/jdk1.8.0_221
  7. 设置YARN核心参数,指定ResourceManager进程所在主机为master,端口为18141;指定mapreduce 获取数据的方式为mapreduce_shuffle

    1
    vim yarn-site.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    <configuration>
    <property>
    <name>yarn.resourcemanager.admin.address</name>
    <value>master:18141</value>
    </property>
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
    </property>
    <property>
    <name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name>
    <value>org.apache.hadoop.mapred.shuffleHandler</value>
    </property>
    </configuration>
  8. 设置计算框架参数,指定MR运行在yarn上

    1
    2
    cp mapred-site.xml.template mapred-site.xml
    vim mapred-site.xml
    1
    2
    3
    4
    5
    6
    <configuration>
    <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
    </property>
    </configuration>
  9. 设置节点文件,要求master为主节点; slave1、slave2为子节点
    这里填入从节点

    1
    vim slaves
    1
    slave

    这里填入主节点

    1
    vim master
    1
    master
  10. 分发

    1
    2
    scp -r /usr/hadoop/ root@slave:/usr/
    scp /etc/profile root@slave:/etc/
  11. 对文件系统进行格式化

    1
    hdfs namenode -format

    出现一下内容就是成功了

    1
    INFO common.Storage: Storage directory /root/hadoopData/name has been successfully formatted.
  12. 启动Hadoop集群查看各节点服务

    1
    2
    3
    start-all.sh 
    hadoop-daemon.sh start datanode
    jps
  13. 查看集群运行状态是否正常

    1
    hadoop dfsadmin -report

Hive

概念

Hive是基于Hadoop的数据仓库基础架构,它提供了一种类似于SQL的查询语言(HiveQL)和用于处理大规模数据集的数据处理能力。Hive使得用户可以使用SQL语言进行数据查询、转换和分析,而无需编写复杂的MapReduce程序。

Hive的核心思想是将结构化查询语言(SQL)映射到Hadoop分布式文件系统(HDFS)上的大规模数据集。它将SQL查询转换为一系列MapReduce作业,从而能够在Hadoop集群上并行处理数据。

Hive的特性包括:

  1. 数据模型:Hive提供了类似于关系型数据库的表结构,支持基于模式的数据存储和查询。它可以将数据映射到表和分区,并支持复杂的数据类型和数据模型。

  2. 查询语言:Hive的查询语言HiveQL类似于SQL,允许用户使用SQL语句来查询和分析数据。HiveQL支持常见的查询操作,如SELECT、JOIN、GROUP
    BY、ORDER BY等。

  3. 数据转换和ETL:Hive支持数据转换和ETL(抽取、转换、加载)操作,可以通过HiveQL进行数据清洗、转换和提取。

  4. 扩展性和集成:Hive可以与其他Hadoop生态系统组件集成,如HDFS、HBase、Spark等。它还支持自定义函数(UDF)和扩展插件,允许用户编写自定义逻辑和扩展功能。

  5. 元数据管理:Hive维护表和分区的元数据信息,包括表的结构、存储位置、分区信息等。这使得Hive能够提供更高级别的查询优化和查询计划生成。

Hive广泛应用于大数据领域,特别是数据仓库、数据分析和数据处理场景。它提供了一种简化的方式来使用SQL语言进行大数据查询和处理,使得更多的人可以轻松地利用Hadoop集群进行数据分析和数据挖掘。

  1. 环境中已经安装mysql-community-server,注意mysql5.7默认安装后为root用户随机生成一个密码;
    直接查看密码:grep “temporary password” /var/log/mysqld.log
    登入数据库:mysql -uroot -p
    输入随机密码即可登录

  2. 根据要求设置密码,注意对应的安全策略修改;
    设置密码强度为低级:set global validate_password_policy=????;
    设置密码长度:set global validate_password_length=????;
    修改本地密码:alter user ‘root‘@’localhost’ identified by ‘????’;

  3. 根据要求满足任意主机节点root的远程访问权限(否则后续hive无法连接mysql);

    1
    GRANT ALL PRIVILEGES ON *.* TO '????'@'%' IDENTIFIED BY '????' WITH GRANT OPTION;
  4. 注意刷新权限;

    1
    flush privileges;
  5. 参考命令

    1. 启动mysql服务:systemctl start mysqld.service
    2. 关闭mysql服务:systemctl stop mysqld.service
    3. 查看mysql服务:systemctl status mysqld.service

配置步骤

  1. 环境中已经安装mysql-community-server,关闭mysql开机自启服务

    1
    systemctl disable mysqld
  2. 开启MySQL服务

    1
    systemctl start mysqld
  3. 判断mysqld.log日志下是否生成初临时密码

    1
    grep "temporary password" /var/log/mysqld.log 

    观察初始密码并复制下来

  4. 设置mysql数据库本地root用户密码为123456
    登录mysql,使用临时密码

    1
    mysql -uroot -p

    sql中执行

    1
    2
    3
    set global validate_password_policy=0;
    set global validate_password_length=4;
    alter user root@localhost identified by123456’;

    退出

    1
    quit
  5. Hive安装包解压

    1
    tar -zxvf [hive]
  6. 配置环境变量HIVE_HOME,将Hive安装路径中的bin目录加入PATH系统变量

    1
    vim /etc/profile
    1
    2
    export HIVE_HOME=/usr/hive/apache-hive-2.3.4-bin
    export PATH=PATH : PATH:PATH:HIVE_HOME/bin
    1
    source /etc/profile
  7. 修改HIVE运行环境,配置Hadoop安装路径HADOOP_HOME;修改HIVE运行环境,配置Hive配置文件存放路径HIVE_CONF_DIR;修改HIVE运行环境,配置Hive运行资源库路径HIVE_AUX_JARS_PATH

    1
    2
    3
    cd /usr/hive/apache-hive-2.3.4-bin/conf/
    cp hive-env.sh.template hive-env.sh
    vim hive-env.sh
    1
    2
    3
    export HADOOP_HOME=/usr/hadoop/hadoop-2.7.7
    export HIVE_CONF_DIR=/usr/hive/apache-hive-2.3.4-bin/conf
    export HIVE_AUX_JARS_PATH=/usr/hive/apache-hive-2.3.4-bin/lib
  8. 解决jline的版本冲突,将$HIVE_HOME/lib/jline-2.12.jar同步至$HADOOP_HOME/share/hadoop/yarn/lib/下

    1
    cp /usr/package277/mysql-connector-java-5.1.47-bin.jar /usr/hive/apache-hive-2.3.4-bin/lib/
  9. 分发

    1
    2
    scp -r /usr/hive root@slave:/usr
    scp /etc/profile root@slave:/etc

    接下来的操作请选择刚才进行分发的slave从节点进行操作

  10. 驱动JDBC拷贝至hive安装目录对应lib下(依赖包存放于/usr/package277/)

    1
    cp /usr/package277/mysql-connector-java-5.1.47-bin.jar /usr/hive/apache-hive-2.3.4-bin/lib/
  11. 配置元数据数据存储位置为/user/hive_remote/warehouse;配置连接JDBC的URL地址主机名及默认端口号3306,数据库为hive,如不存在自行创建,ssl连接方式为false;配置数据库连接用户;配置数据库连接密码

    1
    vim /usr/hive/apache-hive-2.3.4-bin/conf/hive-site.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    <configuration>
    <!-- Hive产生的元数据存放位置-->
    <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive_remote/warehouse</value>
    </property>
    <!-- 数据库连接driver,即MySQL驱动-->
    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>root</value>
    </property>
    <!-- 数据库连接JDBC的URL地址-->
    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://slave2:3306/hive?createDatabaseIfNotExist=true&amp;useSSL=false</value>
    </property>
    <!-- MySQL数据库用户名-->
    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>com.mysql.jdbc.Driver</value>
    </property>
    <!-- MySQL数据库密码-->
    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>123456</value>
    </property>
    </configuration>

    接下来的操作请选择master节点进行操作

  12. 配置元数据存储位置为/user/hive_remote/warehouse;关闭本地metastore模式;配置指向metastore服务的主机为slave1,端口为9083

    1
    vim /usr/hive/apache-hive-2.3.4-bin/conf/hive-site.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <configuration>
    <!-- Hive产生的元数据存放位置-->
    <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/user/hive_remote/warehouse</value>
    </property>
    <!--- 使用本地服务连接Hive,默认为true-->
    <property>
    <name>hive.metastore.local</name>
    <value>false</value>
    </property>
    <!-- 连接服务器-->
    <property>
    <name>hive.metastore.uris</name>
    <value>thrift://slave1:9083</value>
    </property>
    </configuration>
  13. 服务器端初始化数据库,并启动metastore服务;

    1
    2
    schematool -dbType mysql -initSchema
    hive --service metastore
  14. 客户端开启进入hive,创建hive数据库

    1
    hive
    1
    create database hive;

Spark

Hive是基于Hadoop的数据仓库基础架构,它提供了一种类似于SQL的查询语言(HiveQL)和用于处理大规模数据集的数据处理能力。Hive使得用户可以使用SQL语言进行数据查询、转换和分析,而无需编写复杂的MapReduce程序。

Hive的核心思想是将结构化查询语言(SQL)映射到Hadoop分布式文件系统(HDFS)上的大规模数据集。它将SQL查询转换为一系列MapReduce作业,从而能够在Hadoop集群上并行处理数据。

Hive的特性包括:

  1. 数据模型:Hive提供了类似于关系型数据库的表结构,支持基于模式的数据存储和查询。它可以将数据映射到表和分区,并支持复杂的数据类型和数据模型。

  2. 查询语言:Hive的查询语言HiveQL类似于SQL,允许用户使用SQL语句来查询和分析数据。HiveQL支持常见的查询操作,如SELECT、JOIN、GROUP
    BY、ORDER BY等。

  3. 数据转换和ETL:Hive支持数据转换和ETL(抽取、转换、加载)操作,可以通过HiveQL进行数据清洗、转换和提取。

  4. 扩展性和集成:Hive可以与其他Hadoop生态系统组件集成,如HDFS、HBase、Spark等。它还支持自定义函数(UDF)和扩展插件,允许用户编写自定义逻辑和扩展功能。

  5. 元数据管理:Hive维护表和分区的元数据信息,包括表的结构、存储位置、分区信息等。这使得Hive能够提供更高级别的查询优化和查询计划生成。

Hive广泛应用于大数据领域,特别是数据仓库、数据分析和数据处理场景。它提供了一种简化的方式来使用SQL语言进行大数据查询和处理,使得更多的人可以轻松地利用Hadoop集群进行数据分析和数据挖掘。

Spark是Hadoop的子项目。 环境中将Spark安装到基于Linux的系统中。

相关配置变量如下:

  1. JAVA_HOME:Java安装目录
  2. HADOOP_HOME:Hadoop安装目录
  3. HADOOP_CONF_DIR:Hadoop集群的配置文件的目录
  4. SPARK_MASTER_IP:Spark集群的Master节点的ip地址
  5. SPARK_WORKER_MEMORY:每个worker节点能够最大分配给exectors的内存大小

配置步骤

  1. 将Spark安装包解压

    1
    tar -zvxf [spark]
  2. 文件/etc/profile中配置环境变量SPARK_HOME,将Spark安装路径中的bin目录加入PATH系统变量

    1
    vim /etc/profile
    1
    2
    export SPARK_HOME=/usr/spark/spark-2.4.3-bin-hadoop2.7
    export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH
    1
    source /etc/profile
  3. 修改配置文件spark-env.sh,设置主机节点为master,设置java安装路径,设置节点内存为8g,设置hadoop安装目录、hadoop集群的配置文件的目录,添加spark从节点

    1
    2
    3
    4
    cd /usr/spark/spark-2.4.3-bin-hadoop2.7/conf
    cp spark-defaults.sh.template spark-env.sh

    vim spark-env.sh
    1
    2
    3
    4
    5
    export JAVA_HOME=/usr/java/jdk1.8.0_221
    export SPARK_MASTER_IP=master
    export HADOOP_CONF_DIR=/usr/hadoop/hadoop-2.7.7/etc/hadoop
    export HADOOP_HOME=/usr/hadoop/hadoop-2.7.7
    export SPARK_WORKER_MEMORY=8g

    添加从节点

    1
    2
    cp slaves.template slaves
    vim slaves
    1
    slave
  4. 分发

    1
    2
    scp -r /usr/spark root@slave:/usr
    scp /etc/profile root@slave:/etc
  5. 开启集群,查看各节点进程(主节点进程为Master,子节点进程为Worker)

    1
    2
    cd /usr/spark/spark-2.4.3-bin-hadoop2.7/sbin
    ./start-all.sh

07-14 Hadoop MapReduce

概念
Hadoop MapReduce是Hadoop生态系统中的一个计算模型和编程框架,用于处理大规模数据集的分布式计算任务。

MapReduce模型旨在解决大规模数据处理的并行化和分布式计算问题。它将计算任务分解为两个主要阶段:Map阶段和Reduce阶段。

在Map阶段,数据被分割成小的数据块,并由多个Map任务并行处理。每个Map任务将输入数据块转换为一系列键值对,其中键表示中间结果的标识,值表示相关数据。

在Reduce阶段,通过对Map阶段输出的中间结果进行整理和归并,将具有相同键的值聚合在一起。这样,Reduce任务可以对每个键的值进行处理,生成最终的计算结果。

Hadoop
MapReduce提供了一个编程模型和框架,使开发人员能够编写并行计算任务,并将其自动分布到Hadoop集群中的多个节点上执行。开发人员可以使用Java等编程语言编写MapReduce作业,利用Hadoop框架提供的API来处理输入数据、定义Map和Reduce函数,并设置作业的配置参数。

通过利用Hadoop MapReduce的分布式计算能力,可以有效地处理大规模数据集,实现数据处理、数据挖掘、日志分析等各种应用场景。

WordCount程序

数据说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Shakespeare Sonnet
Shall I compare thee to a summer's day?
Thou art more lovely and more temperate:
Rough winds do shake the darling buds of May,
And summer's lease hath all too short a date:
Sometime too hot the eye of heaven shines,
And often is his gold complexion dimm'd;
And every fair from fair sometime declines,
By chance or nature's changing course untrimm'd
But thy eternal summer shall not fade
Nor lose possession of that fair thou owest;
Nor shall Death brag thou wander'st in his shade,
When in eternal lines to time thou growest:
So long as men can breathe or eyes can see,
So long lives this and this gives life to thee.

程序实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、获取Job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3、关联map和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置map的输出kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5、设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//7、提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text k = new Text();
private IntWritable v = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
k.set(word);
context.write(k, v);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable v = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}

07-15 Hive数据分析

概述

Hive是基于Hadoop的数据仓库基础设施,它提供了一种类SQL的查询语言(HiveQL)来进行数据分析。Hive使得使用类似于传统关系型数据库的查询语言进行大规模数据处理和分析变得更加容易。

以下是使用Hive进行数据分析的一般步骤:

  1. 创建数据表:首先,你需要在Hive中创建适合你数据的表。使用Hive的数据定义语言(DDL),你可以定义表的结构、字段和数据类型。你可以选择将数据加载到表中,或者使用外部表引用现有的数据。

  2. 导入数据:如果你的数据尚未在Hive中可用,你可以使用Hive的数据导入功能将数据加载到表中。Hive支持从各种数据源导入数据,如文本文件、CSV文件、Parquet文件、HBase等。

  3. 执行查询:使用HiveQL编写查询语句来执行数据分析操作。HiveQL类似于传统的SQL语言,你可以使用SELECT语句、WHERE子句、GROUP BY子句、JOIN操作等来进行数据查询、过滤、聚合等操作。

  4. 存储结果:根据需要,你可以将查询结果存储到新的表中,以供后续分析使用。Hive支持将查询结果存储为新表或以其他格式导出到文件系统中。

  5. 优化查询性能:为了提高查询性能,你可以使用Hive的优化技术,如分区、分桶、索引等。这些技术可以减少数据扫描量和提高查询效率。

  6. 定期维护和管理:作为数据仓库,你需要定期维护和管理Hive环境。这包括数据清理、分区维护、表结构变更等。

请注意,Hive适用于批处理和离线分析场景,对于实时查询和低延迟要求较高的场景,可能需要考虑其他技术或框架。

使用Hive进行数据分析需要一定的学习和了解,特别是HiveQL查询语言和Hive的表现行为。根据你的具体需求和数据情况,你可能需要深入学习和使用Hive的高级功能和技术。

案例:房屋数据分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# 标题                                           ,户型               ,区县,商圈,小区        ,房租,地铁,亮点1,亮点2,亮点3
# 绿地 精装套二 随时看房 实图拍摄 临千盛 欧尚 优博 穿巷子,整租|2室2厅|84㎡|朝南,武侯,簇桥,绿地圣路易名邸,2400,,家电齐全,,
# 警官公寓 合租 次卧103平米2室1厅1卫限女生,合租次卧|2户合租|103㎡,高新区,中和镇,警官公寓,860,,交通便利,合租女生,
# 高新区大源嘉祥瑞庭南城 35平米1室1厅1卫,整租|1室1厅|35㎡|朝南北,高新区,大源,嘉祥瑞庭南城,2300,,采光好,,
# 国光一环大厦1室1厅1卫,整租|1室1厅|60㎡|朝南,成华,建设路伊藤,国光一环大厦,1300,红星桥站,随时入住,,
# 双流蛟龙港世纪华都高层 93平米3室1厅1卫,整租|3室1厅|93㎡|朝南北,双流,蛟龙港,世纪华都高层,1400,,家电齐全,,
# 浣花苑1室1厅1卫,整租|1室1厅|64㎡|朝南,青羊,杜甫草堂,浣花苑,600,草堂北路站,随时入住,,
# 华宇阳光水岸 合租 次卧80平米2室1厅1卫性别不限,合租次卧|2户合租|80㎡|朝南,成华,驷马桥,华宇阳光水岸,700,昭觉寺南路站,性别不限,随时入住,
# 润扬川大河畔一期1室1厅1卫,整租|1室1厅|60㎡|朝南,双流,航空港,润扬川大河畔一期,1300,,交通便利,随时入住,
# 东立光华杏林 整租 1室1厅1卫 50平米(个人),整租|1室1厅|50㎡|朝南,青羊,万家湾,东立光华杏林,1600,中坝站,精装修,,

# 在hive数据库下构建数据表house
create table if not exists house
(
title string comment '标题',
layout string comment '户型',
district string comment '区县',
area string comment '商圈',
estate string comment '楼盘',
rent int comment '房租',
station string comment '地铁',
merit1 string comment '亮点1',
merit2 string comment '亮点2',
merit3 string comment '亮点3'
) row format delimited
fields terminated by ','
tblproperties('skip.header.line.count'='1');

# 导入数据house.csv
load data local inpath '/root/house/house.csv'overwrite into table house;

# 计算房屋出租量前十的楼盘排名,结果写入本地/college2020/01/000000_0 (二维数组:楼盘 数量,不计入空值)
insert overwrite local directory '/college2020/01'
row format delimited fields terminated by '\t'
select estate, count(*) as row_count
from house
where estate is not null
group by estate
order by row_count desc, estate
limit 10;

# 计算房屋出租量前十的商圈排名,结果写入本地/college2020/02/000000_0 (二维数组:商圈 数量,不计入空值)
insert overwrite local directory '/college2020/02'
row format delimited fields terminated by '\t'
select area, count(*) as area_count
from house
where area is not null
group by area
order by area_count desc, area
limit 10;

# 整理双流区整租三室一厅不同楼盘价格,结果写入本地/college2020/03/000000_0 (二维数组:楼盘 价格,为均价降序)
insert overwrite local directory '/college2020/03'
row format delimited fields terminated by '\t'
select estate, avg(rent) as avg_rent
from house
where district like '%双流%'
and split(layout, '\\|')[0] = '整租'
and split(layout, '\\|')[1] = '3室1厅'
group by estate
order by avg_rent desc;

# 现想在中坝站万家湾附近进行合租,试给出户型及房租信息进行参考,结果写入本地/college2020/04/000000_0 (二维数组:户型 价格降序)
insert overwrite local directory '/college2020/04'
row format delimited fields terminated by '\t'
select layout, rent
from house
where station = '中坝站'
and area = '万家湾'
and split(layout, '\\|')[0] like '%合租%'
order by rent desc;

# 现想在保利星座进行租房,预算为1000-1500,试给出参考户型信息,结果写入本地/college2020/05/000000_0 (二维数组:户型 价格降序)
insert overwrite local directory '/college2020/05'
row format delimited fields terminated by '\t'
select layout, rent
from house
where estate like '%保利星座%'
and rent between 1000 and 1500
order by rent desc;

# 试列出不同商圈整租1室1厅的价格TOP3,结果写入本地/college2020/06/000000_0 (二维数组:商圈 价格,均值降序)
insert overwrite local directory '/college2020/06'
row format delimited fields terminated by '\t'
select area, avg(rent) as avg_rent
from house
where split(layout, '\\|')[0] = '整租'
and split(layout, '\\|')[1] = '1室1厅'
group by area
order by avg_rent desc
limit 3;

# 试列出高新区不同户型的整租价格清单,结果写入本地/college2020/07/000000_0 (二维数组示例:3室2厅 4000,均值向下取整降序)
insert overwrite local directory '/college2020/07'
row format delimited fields terminated by '\t'
select split(layout, '\\|')[1], floor(avg(rent)) as avg_rent
from house
where district like '%高新区%'
and split(layout, '\\|')[0] like '%整租%'
group by split(layout, '\\|')[1]
order by avg_rent desc;

# 合并房屋出租亮点(特点),给出常用亮点TOP5,结果写入本地/college2020/08/000000_0 (二维数组:亮点 数量,不计入空值,降序)
insert overwrite local directory '/college2020/08'
row format delimited fields terminated by '\t'
select merit, count(*) as merit_count
from (select merit1 as merit
from house
union all
select merit2 as merit
from house
union all
select merit3 as merit
from house) t
group by merit
order by merit_count desc
limit 5;

07-16 Spark数据分析

案例:WordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
def main(args: Array[String]): Unit = {
Logger
.getLogger("org.apache.spark")
.setLevel(Level.ERROR)
// 新建SparkConf对象
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local[*]")
// 创建SparkContext对象
val sparkContext = new SparkContext(config = conf)
// 定义打开的文件
val fileRDD = sparkContext.textFile("/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/words.txt")
// 按行分割单词
var wordCountResultRDD = fileRDD.mapPartitions(it => {
it.flatMap(_.split(" ").map((_, 1)))
})
// (word1,3) (word2, 2) (word3, 5)
.reduceByKey(_ + _).sortBy(_._2, ascending = false)
// output
wordCountResultRDD.collect().foreach(println)
wordCountResultRDD.saveAsTextFile("/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/WordCountResult")
// 关闭
sparkContext.stop()
}
}

案例:搜索引擎日志分析

  1. 封装数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    /*
    queryTime 访问时间,格式为:HH:mm:ss
    userId 用户ID
    queryWords 查询词
    resultRank 该URL在返回结果中的排名
    clickRank 用户点击的顺序号
    clickUrl 用户点击的URL
    */
    case class SogouRecord(
    queryTime:String,
    userId:String,
    queryWords:String,
    resultRank:Int,
    clickRank:Int,
    clickUrl:String
    )
  2. 使用case class反射,适用于简单分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    import com.hankcs.hanlp.HanLP
    import com.hankcs.hanlp.seg.common.Term
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.{SparkConf, SparkContext}

    import java.util

    object SearchEngineLogAnalysis {
    def main(args: Array[String]): Unit = {
    Logger
    .getLogger("org.apache.spark")
    .setLevel(Level.ERROR)
    // 新建SparkConf对象
    val conf = new SparkConf()
    .setAppName("SearchEngineLogAnalysis")
    .setMaster("local[*]")
    // 创建SparkContext对象
    val sparkContext = new SparkContext(config = conf)
    val file = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/reduced.txt"
    val outputPath1 = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/queryWordsWordCountRDD"
    val outputPath2 = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/userQueryAnalysisRDD"
    val outputPath3 = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/timeQueryCountAnalysisRDD"
    val rddSougouRecord = sparkContext.textFile(file)
    // 过滤空行数据、过滤字段数量不等于6
    .filter(line => line != null && line.trim.split("\\s+").length == 6)
    // 将RDD[String]=>RDD[SogouRecord]
    .mapPartitions(it => {
    it.map(line => {
    val contents = line.trim.split("\\s+")
    // 注意字段格式
    SogouRecord(
    contents(0),
    contents(1),
    // [360安全卫士]
    contents(2).replaceAll("[\\[|\\]]", ""),
    contents(3).toInt,
    contents(4).toInt,
    contents(5)
    )
    })
    })
    rddSougouRecord.persist(StorageLevel.MEMORY_AND_DISK).count()

    val queryWordsWordCountRDD = rddSougouRecord.mapPartitions(it => {
    it.flatMap(record => {
    val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
    import scala.collection.JavaConverters._
    terms.asScala.map(_.word)
    })
    }).map((_, 1)).reduceByKey(_ + _).sortBy(_._2, ascending = false)

    queryWordsWordCountRDD.coalesce(numPartitions = 1).saveAsTextFile(outputPath1)

    val userQueryAnalysisRDD = rddSougouRecord.mapPartitions(it => {
    it.map(record => {
    ((record.userId, record.queryWords), 1)
    })
    }).reduceByKey(_ + _).sortBy(_._2, ascending = false)
    userQueryAnalysisRDD.coalesce(1).saveAsTextFile(outputPath2)

    val timeQueryCountAnalysisRDD = rddSougouRecord.mapPartitions(it => {
    it.map(record => {
    (record.queryTime.substring(0, 2), 1)
    })
    }).reduceByKey(_ + _).sortBy(_._2, ascending = false)
    timeQueryCountAnalysisRDD.coalesce(1).saveAsTextFile(outputPath3)

    rddSougouRecord.unpersist()
    sparkContext.stop()
    }
    }
  3. 使用StructType编程,适用于复杂分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.{Row, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

    object SogouAnalysisSQL {
    def main(args: Array[String]): Unit = {
    // 设置输出的日志级别
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    val spark = SparkSession.builder()
    .appName("SogouAnalysisSQL")
    .master("local[*]")
    .getOrCreate()

    val file = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/reduced.txt"

    val fileRDD = spark.sparkContext.textFile(file)
    .filter(line => line != null && line.trim.split("\\s+").length == 6)

    val schema = StructType(Array(
    StructField("queryTime", StringType, false),
    StructField("userId", StringType, false),
    StructField("queryWords", StringType, false),
    StructField("resultRank", IntegerType, false),
    StructField("clickRank", IntegerType, false),
    StructField("clickUrl", StringType, false)
    ))

    // RDD[String] => RDD[Row]
    val rddRow = fileRDD.mapPartitions(it => {
    it.map(line => {
    val contents = line.trim.split("\\s+")
    try {
    // 封装的字段的时候,顺序要与定义的schema顺序完全一致
    Row(
    contents(0),
    contents(1),
    contents(2).replaceAll("[\\[|\\]]", ""),
    contents(3).toInt,
    contents(4).toInt,
    contents(5)
    )
    } catch {
    case e: Exception => Row("error", "error", "error", 0, 0, "error")
    }
    })
    })

    // RDD[Row] ==> DataFrame
    val df = spark.createDataFrame(rddRow, schema)

    // 将DataFrame注册为临时视图
    df.createOrReplaceTempView("sogou_view")

    // 定义查询的sql
    val sql =
    """
    |select * from sogou_view where clickUrl like 'www%'
    |limit 3
    |""".stripMargin

    // 执行sql
    val sqlResult = spark.sql(sql)
    // sqlResult.show(truncate = false)

    val querySQL =
    """
    |select userId,queryWords,count(*) as query_count from sogou_view
    |group by userId,queryWords
    |order by userid,query_count desc
    |""".stripMargin

    val queryResult = spark.sql(querySQL)

    // queryResult.show(truncate = false)

    val timeSQL =
    """
    |select substring(queryTime,0,5) as query_time,count(*) as query_count from sogou_view
    |group by substring(queryTime,0,5)
    |order by query_time,query_count desc
    |""".stripMargin

    val timeResult = spark.sql(timeSQL)
    timeResult.show(truncate = false)
    timeResult.rdd.coalesce(1).saveAsTextFile("/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/timeResult")
    spark.stop()
    }
    }
  4. 示例数据

    1
    2
    3
    4
    5
    6
    7
    00:00:00	2982199073774412	[360安全卫士]	8 3	download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html
    00:00:00 07594220010824798 [哄抢救灾物资] 1 1 news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml
    00:00:00 5228056822071097 [75810部队] 14 5 www.greatoo.com/greatoo_cn/list.asp?link_id=276&title=%BE%DE%C2%D6%D0%C2%CE%C5
    00:00:00 6140463203615646 [绳艺] 62 36 www.jd-cd.com/jd_opus/xx/200607/706.html
    00:00:00 8561366108033201 [汶川地震原因] 3 2 www.big38.net/
    00:00:00 23908140386148713 [莫衷一是的意思] 1 2 www.chinabaike.com/article/81/82/110/2007/2007020724490.html
    00:00:00 1797943298449139 [星梦缘全集在线观看] 8 5 www.6wei.net/dianshiju/????\xa1\xe9|????do=index

案例:手机基站日志分析

  1. 示例数据
    基站信息数据(基站ID 经度 纬度 信号辐射类型)

    1
    2
    3
    9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
    CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
    16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6

    用户信息数据(手机号 时间(时间戳) 基站ID 连接状态(“1”为连接,“0”为断开))

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    13554756349,20211202082000,16030401EAFB68F1E3CDF819735E1C66,1
    13236723613,20211202082500,16030401EAFB68F1E3CDF819735E1C66,1
    13554756349,20211202180000,16030401EAFB68F1E3CDF819735E1C66,0
    13236723613,20211202180000,16030401EAFB68F1E3CDF819735E1C66,0
    13236723613,20211202075000,9F36407EAD0629FC166F14DDE7970F68,1
    13554756349,20211202075100,9F36407EAD0629FC166F14DDE7970F68,1
    13236723613,20211202081000,9F36407EAD0629FC166F14DDE7970F68,0
    13554756349,20211202081300,9F36407EAD0629FC166F14DDE7970F68,0
    13554756349,20211202175000,9F36407EAD0629FC166F14DDE7970F68,1
    13236723613,20211202182000,9F36407EAD0629FC166F14DDE7970F68,1
    13554756349,20211202220000,9F36407EAD0629FC166F14DDE7970F68,0
    13236723613,20211202230000,9F36407EAD0629FC166F14DDE7970F68,0
    13236723613,20211202081100,CC0710CC94ECC657A8561DE549D940E0,1
    13554756349,20211202081200,CC0710CC94ECC657A8561DE549D940E0,1
    13554756349,20211202081900,CC0710CC94ECC657A8561DE549D940E0,0
    13236723613,20211202082000,CC0710CC94ECC657A8561DE549D940E0,0
    13554756349,20211202171000,CC0710CC94ECC657A8561DE549D940E0,1
    13554756349,20211202171600,CC0710CC94ECC657A8561DE549D940E0,0
    13236723613,20211202180500,CC0710CC94ECC657A8561DE549D940E0,1
    13236723613,20211202181500,CC0710CC94ECC657A8561DE549D940E0,0
  2. case Class

    1
    2
    3
    4
    5
    6
    case class BaseStation(
    baseStationId: String,
    longitude: Double,
    latitude: Double,
    signalRadiationType: Int
    )
    1
    2
    3
    4
    5
    6
    case class UserRecord(
    phone: String,
    time: Long,
    baseStationId: String,
    connectionStatus: Int
    )
  3. main

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.{SparkConf, SparkContext}

    /*
    需求一:要求使用Sparkcore对分析数据进行分割处理获取业务需求所使用到的数据,根据不同用户在基站停留的时长数据统计出用户与停留地点的关系。
    (1)根据用户信息中"手机号,基站ID"做为唯一标识,和时间戳构成新的元组-->((手机号, 站点), 时间戳)
    (2)以(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构,以便和坐标数据进行join,生成元组-->(基站ID,(手机号,停留时间))
    (3)将基站坐标数据信息进行拆分,通过map,构建成包含基站ID、经纬度的元组 -->(基站ID,(经度,纬度))
    (4)将2、3结果根据基站ID进行join操作,构成新的数据类型-->(手机号,基站ID,停留时间,(经度,纬度))
    (5)按手机号进行分组-->(手机号,(手机号,基站ID,停留时间,(经度,纬度)))
    (6)对时间进行排序取出停留时间最长的基站ID(top1),则可能是居住地点和工作地点。
    (7)格式结果:(手机号,List(手机号,停留时间,基站ID,(经度,纬度)))
    (8)注意:按照手机号进行分组时,运算结果按照顺序为part-00000,part-00001...
    */
    object MobileLocation {
    def main(args: Array[String]): Unit = {
    Logger
    .getLogger("org.apache.spark")
    .setLevel(Level.ERROR)
    val sparkConf = new SparkConf().setAppName("MobileLocation").setMaster("local[*]")
    val sparkContext = new SparkContext(config = sparkConf)
    val infoFile = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/mobilebasestationloganalysis/info.txt"
    val userFile = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/mobilebasestationloganalysis/user.txt"
    val outputPath = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/mobilebasestationloganalysis"
    val rddUserRecord = sparkContext.textFile(userFile)
    .filter(line => line != null)
    .mapPartitions(it => {
    it.map(line => {
    val contents = line.trim.split(",")
    // 注意字段格式
    UserRecord(
    contents(0),
    contents(1).toLong,
    contents(2),
    contents(3).toInt
    )
    })
    })

    rddUserRecord.persist(StorageLevel.MEMORY_AND_DISK).count()

    val basePhoneTime = rddUserRecord.mapPartitions(it => {
    it.map(record => {
    ((record.phone, record.baseStationId, record.connectionStatus), record.time)
    })
    }).reduceByKey(_ + _).sortBy(_._2, ascending = false)
    .mapPartitions(line => {
    line.map(
    line => {
    ((line._1._1, line._1._2), line._2)
    }
    )
    }).reduceByKey(_ - _)
    .mapPartitions(it => {
    it.map(line => {
    (line._1._2, (line._1._1, line._2))
    })
    }).sortBy(_._2._2, ascending = false)

    val rddInfoRecord = sparkContext.textFile(infoFile)
    .filter(line => line != null)
    .mapPartitions(it => {
    it.map(line => {
    val contents = line.trim.split(",")
    BaseStation(
    contents(0),
    contents(1).toDouble,
    contents(2).toDouble,
    contents(3).toInt
    )
    })
    })

    val baseLongLat = rddInfoRecord.mapPartitions(it => {
    it.map(record => {
    (record.baseStationId, (record.longitude, record.latitude))
    })
    })
    // (手机号,基站ID,停留时间,(经度,纬度))
    val basePhoneTimeLongLat = basePhoneTime.join(baseLongLat).mapPartitions(it => {
    it.map(line => {
    (line._2._1._1, line._1, line._2._1._2, (line._2._2._1, line._2._2._2))
    })
    // (手机号,(手机号,基站ID,停留时间,(经度,纬度))
    .map(line => {
    (line._1, (line._2, line._3, line._4))
    })
    }).groupByKey()

    // 假设dataRDD是一个RDD,包含了键值对(手机号,数据记录迭代器)
    // (手机号,停留时间,基站ID,(经度,纬度)
    val dataRDD: RDD[(String, Iterable[(String, Long, (Double, Double))])] = basePhoneTimeLongLat

    // 转换成指定格式的RDD,并按照停留时间进行排序
    val resultRDD: RDD[(String, List[(String, Long, String, (Double, Double))])] = dataRDD.map { case (phoneNumber, records) =>
    val resultData = records.map(record => (phoneNumber, record._2, record._1, (record._3._1, record._3._2))).toList
    val sortedResultData = resultData.sortBy(_._2)(Ordering.Long.reverse) // 按照停留时间降序排序
    (phoneNumber, sortedResultData)
    }

    // 获取每个手机号的top1基站ID
    val top1BaseStationRDD: RDD[(String, String)] = resultRDD.map { case (phoneNumber, sortedResultData) =>
    if (sortedResultData.nonEmpty) {
    val top1BaseStationID = sortedResultData.head._3
    (phoneNumber, top1BaseStationID)
    } else {
    (phoneNumber, "Unknown") // 若没有停留记录,则标记为Unknown
    }
    }

    // 输出结果
    top1BaseStationRDD.foreach { case (phoneNumber, top1BaseStationID) =>
    println(s"手机号 $phoneNumber 的停留时间最长的基站ID为 $top1BaseStationID")
    }

    resultRDD.saveAsTextFile(outputPath)
    sparkContext.stop()
    }
    }

学习目标评估

在本次培训开始之前,我们明确了学习目标,并与参与人员共享,以便让他们了解培训的预期成果。我们希望通过本次培训,参与人员能够:

  1. 理解 MapReduce 的基本概念和原理,掌握分布式计算的基本思想。
  2. 掌握 Hive 的使用,了解如何利用 Hive 进行数据仓库和数据分析。
  3. 熟悉 Spark 的核心概念和编程模型,能够使用 Spark 进行大规模数据处理和分析。

培训成果

通过本次培训,参与人员在以下方面取得了显著的进步:

MapReduce

  • 参与人员深入理解了 MapReduce 的基本原理和分布式计算的概念。
  • 他们学会了如何使用 Hadoop 生态系统来实现 MapReduce 任务,并在实际练习中体验了分布式计算的优势和挑战。

Hive

  • 参与人员掌握了 Hive 的基本使用方法,包括创建表、数据导入导出、查询数据等操作。
  • 他们了解了 Hive 的数据仓库和数据分析能力,能够使用 Hive 进行复杂的数据查询和数据处理。

Spark

  • 参与人员熟悉了 Spark 的核心概念,包括 RDD、DataFrame 和 Spark SQL 等。
  • 他们学会了使用 Spark 进行大规模数据处理和分析,并在实际实验中体验了 Spark 的高性能和灵活性。

专业成长

  • 通过本次培训,参与人员在大数据处理和分析领域获得了实际动手经验,提升了相关技能。
  • 培训中的案例和实践让他们对大数据处理技术有了更深入的理解,为未来在数据领域的工作提供了坚实的基础。

总结:
本次 MapReduce、Hive 和 Spark 的培训为参与人员提供了一次高质量的学习机会。参与人员在专业技能和知识水平方面取得了显著进步,他们对大数据处理和分析有了更深入的了解。通过持续改进培训内容和反馈机制,我们将继续提高培训的质量,帮助更多的人在大数据领域取得成功。

Prev:
Spark与Scala
Next:
尺度不变特征变换SIFT