培训报告
[学院组织]
[7.13-7.16]
培训目的
本次技术培训的目的是提升参与人员在特定技术领域的知识和技能,以应对公司在该领域面临的挑战和需求。通过培训,参与人员将能够掌握相关技术,并将其应用于日常工作中,从而提高工作效率和质量。
07-13 基础环境搭建
基础环境
搭建步骤
修改主机名,便于识别节点;
1
2
3
4修改主机名
hostnamectl set-hostname <hostname>
刷新
bash修改hosts文件,添加集群节点映射,按照给出的节点IP和对应的主机名进行设置;
1
2修改hosts文件内容
vim /etc/hosts1
2# 【局域网ip】 【主机名】
[ip] [hostname]要求各节点时区修改为中国时区( 中国标准时间CST+8)
1
2修改为中国时区
timedatectl set-timezone Asia/Shanghai安装ntp服务,要求主节点master为本地时钟源,从节点设置定时任务同步本地时间;
1
2修改master节点NTP配置
vim /etc/ntp.conf设置master为本地时间服务器,屏蔽默认server,服务器层级设为10
1
2
3# server <默认服务器>
server 127.127.1.0
fudge 127.127.1.0 stratum 10启动ntp服务
1
2systemctl start ntp
systemctl restart ntpd.service添加定时任务–在早十-晚五时间段内每隔半个小时同步一次本地服务器时间(24小时制、使用用户root任务调度crontab,服务器地址使用主机名)
1
sudo crontab -e
1
*/30 10-17 * * * /usr/sbin/ntpdate -u <主机名>
集群中数据传输需要节点之间免密访问,要求设置主节点之间到从节点的免密访问;
主节点生成公钥文件id_rsa.pub(数字签名RSA,用户root,主机名master)1
ssh-keygen
建⽴master⾃身使⽤root⽤户ssh访问localhost免密登录
若没有authorized_keys则使用下面命令创建1
cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys
建⽴master使⽤root⽤户到slave的ssh免密登录访问
1
ssh-copy-id root@<从节点IP或主机名>
测试
1
ssh-copy-id root@<从节点IP或主机名>
配置Java环境
解压下载的java JDK1
tar -zxvf [jdk]
添加系统环境变量
1
vim /etc/profile
添加内容
1
2export JAVA_HOME=<you_java_jdk_path>
export PATH="$JAVA_HOME/bin:$PATH"配置生效
1
source /etc/profile
分发
1
2scp -r /etc/profile root@slave:/etc/
scp -r /usr/java root@slave:/usr/
Zookeeper
概念
ZooKeeper是一个开源的分布式协调服务,它为分布式应用程序提供了高度可靠的协调功能。它旨在解决分布式系统中的一些常见问题,如配置管理、命名服务、分布式锁、分布式协调等。
ZooKeeper的设计目标是提供一个简单而高效的分布式协调服务,它采用了基于观察者模式的数据模型。在ZooKeeper中,数据被组织为一个分层的命名空间(类似于文件系统的目录结构),称为ZooKeeper树(ZooKeeper
tree)或ZooKeeper命名空间(ZooKeeper namespace)。每个节点在树中都有一个唯一的路径标识,并可以存储一个小的数据块。
ZooKeeper提供了临时节点、顺序节点、观察者机制等特性,可以用于实现分布式锁、选主(Leader
Election)、集群管理、分布式队列等场景。它的核心是一个高可用的、基于主从架构的协调服务器集群,通过选举机制确保了服务的高可用性和可靠性。
在分布式系统中,ZooKeeper被广泛应用于各种场景,如大数据、分布式数据库、消息队列、分布式应用程序等。它为分布式应用程序提供了一个可靠的、高性能的基础设施,帮助开发人员简化了分布式系统的设计和实现。
Zookeeper是一个分布式服务框架,是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
预装的配置文件zoo_sample.cfg下面默认有五个属性,分别是:
tickTime
心跳间隔,单位是毫秒,系统默认是2000毫秒,也就是间隔两秒心跳一次。
tickTime的意义:客户端与服务器或者服务器与服务器之间维持心跳,也就是每个tickTime时间就会发送一次心跳。通过心跳不仅能够用来监听机器的工作状态,还可以通过心跳来控制Flower跟Leader的通信时间,默认情况下FL的会话时常是心跳间隔的两倍。initLimit
集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数(tickTime的数量)。syncLimit
集群中flower服务器(F)跟leader(L)服务器之间的请求和答应最多能容忍的心跳数。clientPort
客户端连接的接口,客户端连接zookeeper服务器的端口,zookeeper会监听这个端口,接收客户端的请求访问,端口默认是2181。dataDir
该属性对应的目录是用来存放myid信息跟一些版本,日志,跟服务器唯一的ID信息等。
在集群Zookeeper服务在启动的时候,会回去读取zoo.cfg这个文件,从这个文件中找到这个属性然后获取它的值也就是dataDir
的路径,它会从这个路径下面读取myid这个文件,从这个文件中获取要启动的当前服务器的地址。
集群信息的配置:
在配置文件中,配置集群信息是存在一定的格式:service.N =YYY: A:B
N:代表服务器编号(准确对应对应服务器中myid里面的值)
YYY:服务器地址
A:表示 Flower 跟 Leader的通信端口,简称服务端内部通信的端口(默认2888)
B:表示是选举端口(默认是3888)
参考配置文件
1 | tickTime=2000 |
搭建步骤
将zookeeper安装包解压
1
tar -zxvf [zookeeper]
配置系统变量ZOOKEEPER_HOME,同时将Zookeeper安装路径中bin目录加入PATH系统变量
1
vim /etc/profile
1
2
3# zookeeper
export ZOOKEEPER_HOME=/usr/zookeeper/zookeeper-3.4.14
export PATH="$ZOOKEEPER_HOME/bin:$PATH"1
source /etc/profile
Zookeeper的默认配置文件为Zookeeper安装路径下conf/zoo_sample.cfg,将其修改为zoo.cfg
1
2cd zookeeper/zookeeper-3.4.14/conf/
mv zoo_sample.cfg zoo.cfg设置数据存储路径(dataDir)为/usr/zookeeper/zookeeper-3.4.14/zkdata
1
vim zoo.cfg
1
dataDir=/usr/zookeeper/zookeeper-3.4.14/zkdata
设置日志文件路径(dataLogDir)为/usr/zookeeper/zookeeper-3.4.14/zkdatalog
1
vim zoo.cfg
1
dataLogDir=/usr/zookeeper/zookeeper-3.4.14/zkdatalog
设置集群列表(要求master为1号服务器,slave1为2号服务器,slave2为3号服务器)
1
vim zoo.cfg
1
2
3server.1=master:2888:3888
server.2=slave1:2888:3888
server.3=slave2:2888:3888创建所需数据存储文件夹、日志存储文件夹
1
mkdir -p /usr/zookeeper/zookeeper-3.4.14/{zkdata,zkdatalog}
数据存储路径下创建myid,写入对应的标识主机服务器序号
1
2cd /usr/zookeeper/zookeeper-3.4.14/zkdata
vim myidserver.1 中的 1 就是主机服务器的序号
1
1
分发
1
2scp -r /usr/zookeeper/ root@slave:/usr
scp /etc/profile root@slave:/etc启动服务,查看进程QuorumPeerMain是否存在
1
zkServer.sh start
查看各节点服务器角色是否正常(leader/follower)
1
zkServer.sh status
Hadoop
概述
Hadoop是一个开源的分布式计算框架,旨在处理大规模数据集的存储和处理。它提供了可靠性、可扩展性和容错性,适用于在集群中并行处理大量数据的场景。
Hadoop的核心组件包括:
Hadoop Distributed File System (HDFS):这是一个分布式文件系统,用于存储大规模数据集。它将数据分散存储在集群中的多个节点上,并提供高容错性和高可靠性,以支持大规模数据的存储和处理。
MapReduce:这是一个分布式数据处理模型,用于在Hadoop集群上并行处理大规模数据。MapReduce将计算任务分解为可并行执行的“映射”(Map)和“归约”(Reduce)阶段,允许在分布式环境中处理海量数据。
除了核心组件之外,Hadoop生态系统还包括许多其他工具和组件,如:
- YARN (Yet Another Resource Negotiator):作为Hadoop的资源管理器,负责集群资源的管理和任务调度。
- Hive:一个基于Hadoop的数据仓库和查询工具,提供类似于SQL的查询语言,用于分析和处理存储在Hadoop上的数据。
- Spark:一个快速、通用的大数据处理引擎,提供高级别的API,支持内存计算和更复杂的数据处理模式。
- HBase:一个分布式、可扩展的列式数据库,用于存储和访问结构化数据。
- Pig:一种高级数据流脚本语言,用于编写复杂的数据转换和分析任务。
- ZooKeeper:一个分布式协调服务,用于管理和协调分布式系统中的各种任务。
Hadoop被广泛应用于大数据领域,它可以处理海量数据并提供可靠的数据存储和分析能力。它的设计理念和架构使得它适用于构建可扩展的分布式系统,以满足日益增长的大数据需求。
Hadoop是由Java语言编写的,在分布式服务器集群上存储海量数据并运行分布式分析应用的开源框架,其核心部件是HDFS与MapReduce。
- HDFS是一个分布式文件系统:引入存放文件元数据信息的服务器Namenode和实际存放数据的服务器Datanode,对数据进行分布式储存和读取。
- MapReduce是一个计算框架:MapReduce的核心思想是把计算任务分配给集群内的服务器里执行。通过对计算任务的拆分(Map计算/Reduce计算)再根据任务调度器(JobTracker)对任务进行分布式计算。
配置文件 | 配置对象 | 主要内容 |
---|---|---|
hadoop-env.sh | hadoop运行环境 | 用来定义Hadoop运行环境相关的配置信息; |
core-site.xml | 集群全局参数 | 定义系统级别的参数,包括HDFS URL、Hadoop临时目录等; |
hdfs-site.xml | HDFS参数 | 定义名称节点、数据节点的存放位置、文本副本的个数、文件读取权限等; |
mapred-site.xml | MapReduce参数 | 包括JobHistory Server 和应用程序参数两部分,如reduce任务的默认个数、任务所能够使用内存的默认上下限等; |
yarn-site.xml | 集群资源管理系统参数 | 配置ResourceManager ,nodeManager的通信端口,web监控端口等; |
Hadoop的配置类是由资源指定的,资源可以由一个String或Path来指定,资源以XML形式的数据表示,由一系列的键值对组成。资源可以用String或path命名(示例如下),
- String:指示hadoop在classpath中查找该资源;
- Path:指示hadoop在本地文件系统中查找该资源。
配置示例
1 | <configuration> |
core-site.xml
配置参数 说明 fs.default.name 用于指定NameNode的地址 hadoop.tmp.dir Hadoop运行时产生文件的临时存储目录 hdfs-site.xml
配置参数 说明 dfs.replication 指定备份数 dfs.namenode.name.dir NameNode在本地文件系统中持久存储命名空间和事务日志的路径 dfs.datanode.data.dir DataNode在本地文件系统中存放块的路径 dfs.permissions 集群权限系统校验 dfs.datanode.use.datanode.hostname datanode之间通过域名方式通信 注意:外域机器通信需要用外网IP,未配置hostname访问会访问异常。可以在Java api客户端使用conf.set(“
fs.client.use.datanode.hostname”,”true”);。mapreduce-site.xml
配置参数 说明 mapreduce.framework.name 指定执行MapReduce作业的运行时框架。属性值可以是local,classic或yarn yarn-site.xml
配置参数 说明 yarn.resourcemanager.admin.address 用于指定RM管理界面的地址(主机:端口) yarn.nodemanager.aux-services mapreduce 获取数据的方式,指定在进行mapreduce作业时,yarn使用mapreduce_shuffle混洗技术。这个混洗技术是hadoop的一个核心技术,非常重要。 yarn.nodemanager.auxservices.mapreduce.shuffle.class 用于指定混洗技术对应的字节码文件,值为org.apache.hadoop.mapred.ShuffleHandler
配置步骤
Hadoop安装包解压
1
tar -zxvf [hadoop]
配置环境变量HADOOP_HOME,将Hadoop安装路径中bin目录和sbin目录加入PATH系统变量
1
vim /etc/profile
1
2
3# hadoop
export HADOOP_HOME=/usr/hadoop/hadoop-2.7.7
export PATH="$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$PATH"1
source /etc/profile
配置Hadoop运行环境JAVA_HOME
1
2cd /usr/hadoop/hadoop-2.7.7/etc/hadoop
vim hadoop-env.sh1
export JAVA_HOME=/usr/java/jdk1.8.0_221
设置全局参数,指定HDFS上NameNode地址为master,端口默认为9000;指定临时存储目录为本地/root/hadoopData/tmp(要求为绝对路径,下同)
1
vim core-site.xml
1
2
3
4
5
6
7
8
9
10<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoopData/tmp</value>
</property>
</configuration>设置HDFS参数,指定备份文本数量为2;设置HDFS参数,指定NN存放元数据信息路径为本地/root/hadoopData/name;指定DN存放元数据信息路径为本地/root/hadoopData/data(要求为绝对路径);设置HDFS参数,关闭hadoop集群权限校验(安全配置),允许其他用户连接集群;指定datanode之间通过域名方式进行通信
1
vim hdfs-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/root/hadoopData/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/root/hadoopData/data</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.use.datanode.hostname</name>
<value>true</value>
</property>
</configuration>设置YARN运行环境$JAVA_HOME参数
1
vim yarn-env.sh
1
export JAVA_HOME=/usr/java/jdk1.8.0_221
设置YARN核心参数,指定ResourceManager进程所在主机为master,端口为18141;指定mapreduce 获取数据的方式为mapreduce_shuffle
1
vim yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14<configuration>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:18141</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.shuffleHandler</value>
</property>
</configuration>设置计算框架参数,指定MR运行在yarn上
1
2cp mapred-site.xml.template mapred-site.xml
vim mapred-site.xml1
2
3
4
5
6<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>设置节点文件,要求master为主节点; slave1、slave2为子节点
这里填入从节点1
vim slaves
1
slave
这里填入主节点
1
vim master
1
master
分发
1
2scp -r /usr/hadoop/ root@slave:/usr/
scp /etc/profile root@slave:/etc/对文件系统进行格式化
1
hdfs namenode -format
出现一下内容就是成功了
1
INFO common.Storage: Storage directory /root/hadoopData/name has been successfully formatted.
启动Hadoop集群查看各节点服务
1
2
3start-all.sh
hadoop-daemon.sh start datanode
jps查看集群运行状态是否正常
1
hadoop dfsadmin -report
Hive
概念
Hive是基于Hadoop的数据仓库基础架构,它提供了一种类似于SQL的查询语言(HiveQL)和用于处理大规模数据集的数据处理能力。Hive使得用户可以使用SQL语言进行数据查询、转换和分析,而无需编写复杂的MapReduce程序。
Hive的核心思想是将结构化查询语言(SQL)映射到Hadoop分布式文件系统(HDFS)上的大规模数据集。它将SQL查询转换为一系列MapReduce作业,从而能够在Hadoop集群上并行处理数据。
Hive的特性包括:
数据模型:Hive提供了类似于关系型数据库的表结构,支持基于模式的数据存储和查询。它可以将数据映射到表和分区,并支持复杂的数据类型和数据模型。
查询语言:Hive的查询语言HiveQL类似于SQL,允许用户使用SQL语句来查询和分析数据。HiveQL支持常见的查询操作,如SELECT、JOIN、GROUP
BY、ORDER BY等。数据转换和ETL:Hive支持数据转换和ETL(抽取、转换、加载)操作,可以通过HiveQL进行数据清洗、转换和提取。
扩展性和集成:Hive可以与其他Hadoop生态系统组件集成,如HDFS、HBase、Spark等。它还支持自定义函数(UDF)和扩展插件,允许用户编写自定义逻辑和扩展功能。
元数据管理:Hive维护表和分区的元数据信息,包括表的结构、存储位置、分区信息等。这使得Hive能够提供更高级别的查询优化和查询计划生成。
Hive广泛应用于大数据领域,特别是数据仓库、数据分析和数据处理场景。它提供了一种简化的方式来使用SQL语言进行大数据查询和处理,使得更多的人可以轻松地利用Hadoop集群进行数据分析和数据挖掘。
环境中已经安装mysql-community-server,注意mysql5.7默认安装后为root用户随机生成一个密码;
直接查看密码:grep “temporary password” /var/log/mysqld.log
登入数据库:mysql -uroot -p
输入随机密码即可登录根据要求设置密码,注意对应的安全策略修改;
设置密码强度为低级:set global validate_password_policy=????;
设置密码长度:set global validate_password_length=????;
修改本地密码:alter user ‘root‘@’localhost’ identified by ‘????’;根据要求满足任意主机节点root的远程访问权限(否则后续hive无法连接mysql);
1
GRANT ALL PRIVILEGES ON *.* TO '????'@'%' IDENTIFIED BY '????' WITH GRANT OPTION;
注意刷新权限;
1
flush privileges;
参考命令
- 启动mysql服务:systemctl start mysqld.service
- 关闭mysql服务:systemctl stop mysqld.service
- 查看mysql服务:systemctl status mysqld.service
配置步骤
环境中已经安装mysql-community-server,关闭mysql开机自启服务
1
systemctl disable mysqld
开启MySQL服务
1
systemctl start mysqld
判断mysqld.log日志下是否生成初临时密码
1
grep "temporary password" /var/log/mysqld.log
观察初始密码并复制下来
设置mysql数据库本地root用户密码为123456
登录mysql,使用临时密码1
mysql -uroot -p
sql中执行
1
2
3set global validate_password_policy=0;
set global validate_password_length=4;
alter user root@localhost identified by ‘123456’;退出
1
quit
Hive安装包解压
1
tar -zxvf [hive]
配置环境变量HIVE_HOME,将Hive安装路径中的bin目录加入PATH系统变量
1
vim /etc/profile
1
2export HIVE_HOME=/usr/hive/apache-hive-2.3.4-bin
export PATH=PATH : PATH:PATH:HIVE_HOME/bin1
source /etc/profile
修改HIVE运行环境,配置Hadoop安装路径HADOOP_HOME;修改HIVE运行环境,配置Hive配置文件存放路径HIVE_CONF_DIR;修改HIVE运行环境,配置Hive运行资源库路径HIVE_AUX_JARS_PATH
1
2
3cd /usr/hive/apache-hive-2.3.4-bin/conf/
cp hive-env.sh.template hive-env.sh
vim hive-env.sh1
2
3export HADOOP_HOME=/usr/hadoop/hadoop-2.7.7
export HIVE_CONF_DIR=/usr/hive/apache-hive-2.3.4-bin/conf
export HIVE_AUX_JARS_PATH=/usr/hive/apache-hive-2.3.4-bin/lib解决jline的版本冲突,将$HIVE_HOME/lib/jline-2.12.jar同步至$HADOOP_HOME/share/hadoop/yarn/lib/下
1
cp /usr/package277/mysql-connector-java-5.1.47-bin.jar /usr/hive/apache-hive-2.3.4-bin/lib/
分发
1
2scp -r /usr/hive root@slave:/usr
scp /etc/profile root@slave:/etc接下来的操作请选择刚才进行分发的slave从节点进行操作
驱动JDBC拷贝至hive安装目录对应lib下(依赖包存放于/usr/package277/)
1
cp /usr/package277/mysql-connector-java-5.1.47-bin.jar /usr/hive/apache-hive-2.3.4-bin/lib/
配置元数据数据存储位置为/user/hive_remote/warehouse;配置连接JDBC的URL地址主机名及默认端口号3306,数据库为hive,如不存在自行创建,ssl连接方式为false;配置数据库连接用户;配置数据库连接密码
1
vim /usr/hive/apache-hive-2.3.4-bin/conf/hive-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27<configuration>
<!-- Hive产生的元数据存放位置-->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive_remote/warehouse</value>
</property>
<!-- 数据库连接driver,即MySQL驱动-->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>root</value>
</property>
<!-- 数据库连接JDBC的URL地址-->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://slave2:3306/hive?createDatabaseIfNotExist=true&useSSL=false</value>
</property>
<!-- MySQL数据库用户名-->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<!-- MySQL数据库密码-->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
</configuration>接下来的操作请选择master节点进行操作
配置元数据存储位置为/user/hive_remote/warehouse;关闭本地metastore模式;配置指向metastore服务的主机为slave1,端口为9083
1
vim /usr/hive/apache-hive-2.3.4-bin/conf/hive-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17<configuration>
<!-- Hive产生的元数据存放位置-->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive_remote/warehouse</value>
</property>
<!--- 使用本地服务连接Hive,默认为true-->
<property>
<name>hive.metastore.local</name>
<value>false</value>
</property>
<!-- 连接服务器-->
<property>
<name>hive.metastore.uris</name>
<value>thrift://slave1:9083</value>
</property>
</configuration>服务器端初始化数据库,并启动metastore服务;
1
2schematool -dbType mysql -initSchema
hive --service metastore客户端开启进入hive,创建hive数据库
1
hive
1
create database hive;
Spark
Hive是基于Hadoop的数据仓库基础架构,它提供了一种类似于SQL的查询语言(HiveQL)和用于处理大规模数据集的数据处理能力。Hive使得用户可以使用SQL语言进行数据查询、转换和分析,而无需编写复杂的MapReduce程序。
Hive的核心思想是将结构化查询语言(SQL)映射到Hadoop分布式文件系统(HDFS)上的大规模数据集。它将SQL查询转换为一系列MapReduce作业,从而能够在Hadoop集群上并行处理数据。
Hive的特性包括:
数据模型:Hive提供了类似于关系型数据库的表结构,支持基于模式的数据存储和查询。它可以将数据映射到表和分区,并支持复杂的数据类型和数据模型。
查询语言:Hive的查询语言HiveQL类似于SQL,允许用户使用SQL语句来查询和分析数据。HiveQL支持常见的查询操作,如SELECT、JOIN、GROUP
BY、ORDER BY等。数据转换和ETL:Hive支持数据转换和ETL(抽取、转换、加载)操作,可以通过HiveQL进行数据清洗、转换和提取。
扩展性和集成:Hive可以与其他Hadoop生态系统组件集成,如HDFS、HBase、Spark等。它还支持自定义函数(UDF)和扩展插件,允许用户编写自定义逻辑和扩展功能。
元数据管理:Hive维护表和分区的元数据信息,包括表的结构、存储位置、分区信息等。这使得Hive能够提供更高级别的查询优化和查询计划生成。
Hive广泛应用于大数据领域,特别是数据仓库、数据分析和数据处理场景。它提供了一种简化的方式来使用SQL语言进行大数据查询和处理,使得更多的人可以轻松地利用Hadoop集群进行数据分析和数据挖掘。
Spark是Hadoop的子项目。 环境中将Spark安装到基于Linux的系统中。
相关配置变量如下:
- JAVA_HOME:Java安装目录
- HADOOP_HOME:Hadoop安装目录
- HADOOP_CONF_DIR:Hadoop集群的配置文件的目录
- SPARK_MASTER_IP:Spark集群的Master节点的ip地址
- SPARK_WORKER_MEMORY:每个worker节点能够最大分配给exectors的内存大小
配置步骤
将Spark安装包解压
1
tar -zvxf [spark]
文件/etc/profile中配置环境变量SPARK_HOME,将Spark安装路径中的bin目录加入PATH系统变量
1
vim /etc/profile
1
2export SPARK_HOME=/usr/spark/spark-2.4.3-bin-hadoop2.7
export PATH=$SPARK_HOME/bin:$SPARK_HOME/sbin:$PATH1
source /etc/profile
修改配置文件spark-env.sh,设置主机节点为master,设置java安装路径,设置节点内存为8g,设置hadoop安装目录、hadoop集群的配置文件的目录,添加spark从节点
1
2
3
4cd /usr/spark/spark-2.4.3-bin-hadoop2.7/conf
cp spark-defaults.sh.template spark-env.sh
vim spark-env.sh1
2
3
4
5export JAVA_HOME=/usr/java/jdk1.8.0_221
export SPARK_MASTER_IP=master
export HADOOP_CONF_DIR=/usr/hadoop/hadoop-2.7.7/etc/hadoop
export HADOOP_HOME=/usr/hadoop/hadoop-2.7.7
export SPARK_WORKER_MEMORY=8g添加从节点
1
2cp slaves.template slaves
vim slaves1
slave
分发
1
2scp -r /usr/spark root@slave:/usr
scp /etc/profile root@slave:/etc开启集群,查看各节点进程(主节点进程为Master,子节点进程为Worker)
1
2cd /usr/spark/spark-2.4.3-bin-hadoop2.7/sbin
./start-all.sh
07-14 Hadoop MapReduce
概念
Hadoop MapReduce是Hadoop生态系统中的一个计算模型和编程框架,用于处理大规模数据集的分布式计算任务。
MapReduce模型旨在解决大规模数据处理的并行化和分布式计算问题。它将计算任务分解为两个主要阶段:Map阶段和Reduce阶段。
在Map阶段,数据被分割成小的数据块,并由多个Map任务并行处理。每个Map任务将输入数据块转换为一系列键值对,其中键表示中间结果的标识,值表示相关数据。
在Reduce阶段,通过对Map阶段输出的中间结果进行整理和归并,将具有相同键的值聚合在一起。这样,Reduce任务可以对每个键的值进行处理,生成最终的计算结果。
Hadoop
MapReduce提供了一个编程模型和框架,使开发人员能够编写并行计算任务,并将其自动分布到Hadoop集群中的多个节点上执行。开发人员可以使用Java等编程语言编写MapReduce作业,利用Hadoop框架提供的API来处理输入数据、定义Map和Reduce函数,并设置作业的配置参数。
通过利用Hadoop MapReduce的分布式计算能力,可以有效地处理大规模数据集,实现数据处理、数据挖掘、日志分析等各种应用场景。
WordCount程序
数据说明
1 | Shakespeare Sonnet |
程序实现
1 | import org.apache.hadoop.conf.Configuration; |
1 | import org.apache.hadoop.io.IntWritable; |
1 | import org.apache.hadoop.io.IntWritable; |
07-15 Hive数据分析
概述
Hive是基于Hadoop的数据仓库基础设施,它提供了一种类SQL的查询语言(HiveQL)来进行数据分析。Hive使得使用类似于传统关系型数据库的查询语言进行大规模数据处理和分析变得更加容易。
以下是使用Hive进行数据分析的一般步骤:
创建数据表:首先,你需要在Hive中创建适合你数据的表。使用Hive的数据定义语言(DDL),你可以定义表的结构、字段和数据类型。你可以选择将数据加载到表中,或者使用外部表引用现有的数据。
导入数据:如果你的数据尚未在Hive中可用,你可以使用Hive的数据导入功能将数据加载到表中。Hive支持从各种数据源导入数据,如文本文件、CSV文件、Parquet文件、HBase等。
执行查询:使用HiveQL编写查询语句来执行数据分析操作。HiveQL类似于传统的SQL语言,你可以使用SELECT语句、WHERE子句、GROUP BY子句、JOIN操作等来进行数据查询、过滤、聚合等操作。
存储结果:根据需要,你可以将查询结果存储到新的表中,以供后续分析使用。Hive支持将查询结果存储为新表或以其他格式导出到文件系统中。
优化查询性能:为了提高查询性能,你可以使用Hive的优化技术,如分区、分桶、索引等。这些技术可以减少数据扫描量和提高查询效率。
定期维护和管理:作为数据仓库,你需要定期维护和管理Hive环境。这包括数据清理、分区维护、表结构变更等。
请注意,Hive适用于批处理和离线分析场景,对于实时查询和低延迟要求较高的场景,可能需要考虑其他技术或框架。
使用Hive进行数据分析需要一定的学习和了解,特别是HiveQL查询语言和Hive的表现行为。根据你的具体需求和数据情况,你可能需要深入学习和使用Hive的高级功能和技术。
案例:房屋数据分析
1 | # 标题 ,户型 ,区县,商圈,小区 ,房租,地铁,亮点1,亮点2,亮点3 |
07-16 Spark数据分析
案例:WordCount
1 | import org.apache.log4j.{Level, Logger} |
案例:搜索引擎日志分析
封装数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16/*
queryTime 访问时间,格式为:HH:mm:ss
userId 用户ID
queryWords 查询词
resultRank 该URL在返回结果中的排名
clickRank 用户点击的顺序号
clickUrl 用户点击的URL
*/
case class SogouRecord(
queryTime:String,
userId:String,
queryWords:String,
resultRank:Int,
clickRank:Int,
clickUrl:String
)使用case class反射,适用于简单分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72import com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import java.util
object SearchEngineLogAnalysis {
def main(args: Array[String]): Unit = {
Logger
.getLogger("org.apache.spark")
.setLevel(Level.ERROR)
// 新建SparkConf对象
val conf = new SparkConf()
.setAppName("SearchEngineLogAnalysis")
.setMaster("local[*]")
// 创建SparkContext对象
val sparkContext = new SparkContext(config = conf)
val file = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/reduced.txt"
val outputPath1 = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/queryWordsWordCountRDD"
val outputPath2 = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/userQueryAnalysisRDD"
val outputPath3 = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/timeQueryCountAnalysisRDD"
val rddSougouRecord = sparkContext.textFile(file)
// 过滤空行数据、过滤字段数量不等于6
.filter(line => line != null && line.trim.split("\\s+").length == 6)
// 将RDD[String]=>RDD[SogouRecord]
.mapPartitions(it => {
it.map(line => {
val contents = line.trim.split("\\s+")
// 注意字段格式
SogouRecord(
contents(0),
contents(1),
// [360安全卫士]
contents(2).replaceAll("[\\[|\\]]", ""),
contents(3).toInt,
contents(4).toInt,
contents(5)
)
})
})
rddSougouRecord.persist(StorageLevel.MEMORY_AND_DISK).count()
val queryWordsWordCountRDD = rddSougouRecord.mapPartitions(it => {
it.flatMap(record => {
val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)
import scala.collection.JavaConverters._
terms.asScala.map(_.word)
})
}).map((_, 1)).reduceByKey(_ + _).sortBy(_._2, ascending = false)
queryWordsWordCountRDD.coalesce(numPartitions = 1).saveAsTextFile(outputPath1)
val userQueryAnalysisRDD = rddSougouRecord.mapPartitions(it => {
it.map(record => {
((record.userId, record.queryWords), 1)
})
}).reduceByKey(_ + _).sortBy(_._2, ascending = false)
userQueryAnalysisRDD.coalesce(1).saveAsTextFile(outputPath2)
val timeQueryCountAnalysisRDD = rddSougouRecord.mapPartitions(it => {
it.map(record => {
(record.queryTime.substring(0, 2), 1)
})
}).reduceByKey(_ + _).sortBy(_._2, ascending = false)
timeQueryCountAnalysisRDD.coalesce(1).saveAsTextFile(outputPath3)
rddSougouRecord.unpersist()
sparkContext.stop()
}
}使用StructType编程,适用于复杂分析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
object SogouAnalysisSQL {
def main(args: Array[String]): Unit = {
// 设置输出的日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
val spark = SparkSession.builder()
.appName("SogouAnalysisSQL")
.master("local[*]")
.getOrCreate()
val file = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/reduced.txt"
val fileRDD = spark.sparkContext.textFile(file)
.filter(line => line != null && line.trim.split("\\s+").length == 6)
val schema = StructType(Array(
StructField("queryTime", StringType, false),
StructField("userId", StringType, false),
StructField("queryWords", StringType, false),
StructField("resultRank", IntegerType, false),
StructField("clickRank", IntegerType, false),
StructField("clickUrl", StringType, false)
))
// RDD[String] => RDD[Row]
val rddRow = fileRDD.mapPartitions(it => {
it.map(line => {
val contents = line.trim.split("\\s+")
try {
// 封装的字段的时候,顺序要与定义的schema顺序完全一致
Row(
contents(0),
contents(1),
contents(2).replaceAll("[\\[|\\]]", ""),
contents(3).toInt,
contents(4).toInt,
contents(5)
)
} catch {
case e: Exception => Row("error", "error", "error", 0, 0, "error")
}
})
})
// RDD[Row] ==> DataFrame
val df = spark.createDataFrame(rddRow, schema)
// 将DataFrame注册为临时视图
df.createOrReplaceTempView("sogou_view")
// 定义查询的sql
val sql =
"""
|select * from sogou_view where clickUrl like 'www%'
|limit 3
|""".stripMargin
// 执行sql
val sqlResult = spark.sql(sql)
// sqlResult.show(truncate = false)
val querySQL =
"""
|select userId,queryWords,count(*) as query_count from sogou_view
|group by userId,queryWords
|order by userid,query_count desc
|""".stripMargin
val queryResult = spark.sql(querySQL)
// queryResult.show(truncate = false)
val timeSQL =
"""
|select substring(queryTime,0,5) as query_time,count(*) as query_count from sogou_view
|group by substring(queryTime,0,5)
|order by query_time,query_count desc
|""".stripMargin
val timeResult = spark.sql(timeSQL)
timeResult.show(truncate = false)
timeResult.rdd.coalesce(1).saveAsTextFile("/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/timeResult")
spark.stop()
}
}示例数据
1
2
3
4
5
6
700:00:00 2982199073774412 [360安全卫士] 8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html
00:00:00 07594220010824798 [哄抢救灾物资] 1 1 news.21cn.com/social/daqian/2008/05/29/4777194_1.shtml
00:00:00 5228056822071097 [75810部队] 14 5 www.greatoo.com/greatoo_cn/list.asp?link_id=276&title=%BE%DE%C2%D6%D0%C2%CE%C5
00:00:00 6140463203615646 [绳艺] 62 36 www.jd-cd.com/jd_opus/xx/200607/706.html
00:00:00 8561366108033201 [汶川地震原因] 3 2 www.big38.net/
00:00:00 23908140386148713 [莫衷一是的意思] 1 2 www.chinabaike.com/article/81/82/110/2007/2007020724490.html
00:00:00 1797943298449139 [星梦缘全集在线观看] 8 5 www.6wei.net/dianshiju/????\xa1\xe9|????do=index
案例:手机基站日志分析
示例数据
基站信息数据(基站ID 经度 纬度 信号辐射类型)1
2
39F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6用户信息数据(手机号 时间(时间戳) 基站ID 连接状态(“1”为连接,“0”为断开))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2013554756349,20211202082000,16030401EAFB68F1E3CDF819735E1C66,1
13236723613,20211202082500,16030401EAFB68F1E3CDF819735E1C66,1
13554756349,20211202180000,16030401EAFB68F1E3CDF819735E1C66,0
13236723613,20211202180000,16030401EAFB68F1E3CDF819735E1C66,0
13236723613,20211202075000,9F36407EAD0629FC166F14DDE7970F68,1
13554756349,20211202075100,9F36407EAD0629FC166F14DDE7970F68,1
13236723613,20211202081000,9F36407EAD0629FC166F14DDE7970F68,0
13554756349,20211202081300,9F36407EAD0629FC166F14DDE7970F68,0
13554756349,20211202175000,9F36407EAD0629FC166F14DDE7970F68,1
13236723613,20211202182000,9F36407EAD0629FC166F14DDE7970F68,1
13554756349,20211202220000,9F36407EAD0629FC166F14DDE7970F68,0
13236723613,20211202230000,9F36407EAD0629FC166F14DDE7970F68,0
13236723613,20211202081100,CC0710CC94ECC657A8561DE549D940E0,1
13554756349,20211202081200,CC0710CC94ECC657A8561DE549D940E0,1
13554756349,20211202081900,CC0710CC94ECC657A8561DE549D940E0,0
13236723613,20211202082000,CC0710CC94ECC657A8561DE549D940E0,0
13554756349,20211202171000,CC0710CC94ECC657A8561DE549D940E0,1
13554756349,20211202171600,CC0710CC94ECC657A8561DE549D940E0,0
13236723613,20211202180500,CC0710CC94ECC657A8561DE549D940E0,1
13236723613,20211202181500,CC0710CC94ECC657A8561DE549D940E0,0case Class
1
2
3
4
5
6case class BaseStation(
baseStationId: String,
longitude: Double,
latitude: Double,
signalRadiationType: Int
)1
2
3
4
5
6case class UserRecord(
phone: String,
time: Long,
baseStationId: String,
connectionStatus: Int
)main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/*
需求一:要求使用Sparkcore对分析数据进行分割处理获取业务需求所使用到的数据,根据不同用户在基站停留的时长数据统计出用户与停留地点的关系。
(1)根据用户信息中"手机号,基站ID"做为唯一标识,和时间戳构成新的元组-->((手机号, 站点), 时间戳)
(2)以(手机号,基站ID)作为key,通过reduceByKey算子进行聚合,计算出在基站的停留时间,构成新的数据结构,以便和坐标数据进行join,生成元组-->(基站ID,(手机号,停留时间))
(3)将基站坐标数据信息进行拆分,通过map,构建成包含基站ID、经纬度的元组 -->(基站ID,(经度,纬度))
(4)将2、3结果根据基站ID进行join操作,构成新的数据类型-->(手机号,基站ID,停留时间,(经度,纬度))
(5)按手机号进行分组-->(手机号,(手机号,基站ID,停留时间,(经度,纬度)))
(6)对时间进行排序取出停留时间最长的基站ID(top1),则可能是居住地点和工作地点。
(7)格式结果:(手机号,List(手机号,停留时间,基站ID,(经度,纬度)))
(8)注意:按照手机号进行分组时,运算结果按照顺序为part-00000,part-00001...
*/
object MobileLocation {
def main(args: Array[String]): Unit = {
Logger
.getLogger("org.apache.spark")
.setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName("MobileLocation").setMaster("local[*]")
val sparkContext = new SparkContext(config = sparkConf)
val infoFile = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/mobilebasestationloganalysis/info.txt"
val userFile = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/mobilebasestationloganalysis/user.txt"
val outputPath = "/Volumes/KeQing/Documents/IntelliJ IDEA Ultimate/SparkTraning/src/main/resources/output/mobilebasestationloganalysis"
val rddUserRecord = sparkContext.textFile(userFile)
.filter(line => line != null)
.mapPartitions(it => {
it.map(line => {
val contents = line.trim.split(",")
// 注意字段格式
UserRecord(
contents(0),
contents(1).toLong,
contents(2),
contents(3).toInt
)
})
})
rddUserRecord.persist(StorageLevel.MEMORY_AND_DISK).count()
val basePhoneTime = rddUserRecord.mapPartitions(it => {
it.map(record => {
((record.phone, record.baseStationId, record.connectionStatus), record.time)
})
}).reduceByKey(_ + _).sortBy(_._2, ascending = false)
.mapPartitions(line => {
line.map(
line => {
((line._1._1, line._1._2), line._2)
}
)
}).reduceByKey(_ - _)
.mapPartitions(it => {
it.map(line => {
(line._1._2, (line._1._1, line._2))
})
}).sortBy(_._2._2, ascending = false)
val rddInfoRecord = sparkContext.textFile(infoFile)
.filter(line => line != null)
.mapPartitions(it => {
it.map(line => {
val contents = line.trim.split(",")
BaseStation(
contents(0),
contents(1).toDouble,
contents(2).toDouble,
contents(3).toInt
)
})
})
val baseLongLat = rddInfoRecord.mapPartitions(it => {
it.map(record => {
(record.baseStationId, (record.longitude, record.latitude))
})
})
// (手机号,基站ID,停留时间,(经度,纬度))
val basePhoneTimeLongLat = basePhoneTime.join(baseLongLat).mapPartitions(it => {
it.map(line => {
(line._2._1._1, line._1, line._2._1._2, (line._2._2._1, line._2._2._2))
})
// (手机号,(手机号,基站ID,停留时间,(经度,纬度))
.map(line => {
(line._1, (line._2, line._3, line._4))
})
}).groupByKey()
// 假设dataRDD是一个RDD,包含了键值对(手机号,数据记录迭代器)
// (手机号,停留时间,基站ID,(经度,纬度)
val dataRDD: RDD[(String, Iterable[(String, Long, (Double, Double))])] = basePhoneTimeLongLat
// 转换成指定格式的RDD,并按照停留时间进行排序
val resultRDD: RDD[(String, List[(String, Long, String, (Double, Double))])] = dataRDD.map { case (phoneNumber, records) =>
val resultData = records.map(record => (phoneNumber, record._2, record._1, (record._3._1, record._3._2))).toList
val sortedResultData = resultData.sortBy(_._2)(Ordering.Long.reverse) // 按照停留时间降序排序
(phoneNumber, sortedResultData)
}
// 获取每个手机号的top1基站ID
val top1BaseStationRDD: RDD[(String, String)] = resultRDD.map { case (phoneNumber, sortedResultData) =>
if (sortedResultData.nonEmpty) {
val top1BaseStationID = sortedResultData.head._3
(phoneNumber, top1BaseStationID)
} else {
(phoneNumber, "Unknown") // 若没有停留记录,则标记为Unknown
}
}
// 输出结果
top1BaseStationRDD.foreach { case (phoneNumber, top1BaseStationID) =>
println(s"手机号 $phoneNumber 的停留时间最长的基站ID为 $top1BaseStationID")
}
resultRDD.saveAsTextFile(outputPath)
sparkContext.stop()
}
}
学习目标评估
在本次培训开始之前,我们明确了学习目标,并与参与人员共享,以便让他们了解培训的预期成果。我们希望通过本次培训,参与人员能够:
- 理解 MapReduce 的基本概念和原理,掌握分布式计算的基本思想。
- 掌握 Hive 的使用,了解如何利用 Hive 进行数据仓库和数据分析。
- 熟悉 Spark 的核心概念和编程模型,能够使用 Spark 进行大规模数据处理和分析。
培训成果
通过本次培训,参与人员在以下方面取得了显著的进步:
MapReduce
- 参与人员深入理解了 MapReduce 的基本原理和分布式计算的概念。
- 他们学会了如何使用 Hadoop 生态系统来实现 MapReduce 任务,并在实际练习中体验了分布式计算的优势和挑战。
Hive
- 参与人员掌握了 Hive 的基本使用方法,包括创建表、数据导入导出、查询数据等操作。
- 他们了解了 Hive 的数据仓库和数据分析能力,能够使用 Hive 进行复杂的数据查询和数据处理。
Spark
- 参与人员熟悉了 Spark 的核心概念,包括 RDD、DataFrame 和 Spark SQL 等。
- 他们学会了使用 Spark 进行大规模数据处理和分析,并在实际实验中体验了 Spark 的高性能和灵活性。
专业成长
- 通过本次培训,参与人员在大数据处理和分析领域获得了实际动手经验,提升了相关技能。
- 培训中的案例和实践让他们对大数据处理技术有了更深入的理解,为未来在数据领域的工作提供了坚实的基础。
总结:
本次 MapReduce、Hive 和 Spark 的培训为参与人员提供了一次高质量的学习机会。参与人员在专业技能和知识水平方面取得了显著进步,他们对大数据处理和分析有了更深入的了解。通过持续改进培训内容和反馈机制,我们将继续提高培训的质量,帮助更多的人在大数据领域取得成功。