当前位置: 首页 > news >正文

网站建设小图标站长工具权重查询

网站建设小图标,站长工具权重查询,四川城乡和建设厅网站首页,网站正在建设中的图片素材Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客 Spark 的Standalone集群环境安装与测试-CSDN博客 PySpark 本地开发环境搭建与实践-CSDN博客 Spark 程序开发与提交:本地与集群模式全解析-CSDN博客 Spark on YARN:Spark集群模式…

Spark 的介绍与搭建:从理论到实践_spark环境搭建-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

Spark 程序开发与提交:本地与集群模式全解析-CSDN博客

Spark on YARN:Spark集群模式之Yarn模式的原理、搭建与实践-CSDN博客

Spark 中 RDD 的诞生:原理、操作与分区规则-CSDN博客

Spark 中的 RDD 分区的设定规则与高阶函数、Lambda 表达式详解-CSDN博客

RDD 算子全面解析:从基础到进阶与面试要点-CSDN博客

目录

一、手机号码流量统计案例

(一)需求分析

(二)代码实现

(三)代码解析

二、合同数据分析案例

(一)需求分析

(二)代码实现

(三)代码解析

三、日志分析案例

(一)需求分析

(二)jieba分词器

安装一下

使用

测试

(四)代码实现

(三)代码解析

四、常见错误及解决方法

五、总结


        在大数据处理领域,PySpark 作为强大的工具,能够高效地处理大规模数据。本文将通过几个实际案例,详细介绍 PySpark 在数据处理中的应用,包括数据清洗、统计分析等操作,帮助读者深入理解 PySpark 的使用方法和数据处理流程。

一、手机号码流量统计案例

(一)需求分析

        给定一组数据,要求计算每个手机号码的总流量(上行 + 下行),但需排除手机号码不正确以及数据长度不够的数据。数据长度不一致的数据指的是一行数据切割后的列数与其他数据列数不同的数据。

(二)代码实现

以下是实现该功能的 PySpark 代码:

import math
import os
import re
from collections.abc import Iterable# 导入pyspark模块
from pyspark import SparkContext, SparkConfif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("rdd的创建方式")sc = SparkContext(conf=conf)fileRdd = sc.textFile("../../datas/zuoye/HTTP_20130313143750.dat")print(fileRdd.count())filterRdd = fileRdd.filter(lambda line: len(re.split("\t+",line)) == 11 and re.fullmatch(r"1[3-9]\d{9}",re.split("\t+",line)[1]) is not None )print(filterRdd.count())mapRdd = filterRdd.map(lambda line:(re.split("\t+",line)[1],int(re.split("\t+",line)[-3])+int(re.split("\t+",line)[-2])))rsRdd = mapRdd.reduceByKey(lambda sum,num:sum+num)rsRdd.foreach(lambda x:print(x[0],str(round(x[1]/1024,2))+"MB"))# 使用完后,记得关闭sc.stop()

(三)代码解析

  1. 首先,配置了 PySpark 运行所需的环境变量,包括 JAVA_HOMEHADOOP_HOME 以及 Python 解析器路径。
  2. 通过 SparkConf 设置运行模式为本地(local[*])并指定应用名称,然后创建 SparkContext 对象。
  3. 使用 textFile 读取数据文件,得到 fileRdd
  4. 利用 filter 操作过滤数据,先检查数据长度是否为 11,再通过正则表达式验证手机号码格式是否正确,得到 filterRdd
  5. 对 filterRdd 进行 map 操作,提取手机号码和总流量。
  6. 通过 reduceByKey 按手机号码分组并计算总流量。
  7. 最后,使用 foreach 输出每个手机号码及其对应的总流量(转换为 MB 并保留两位小数)。

二、合同数据分析案例

(一)需求分析

        给定合同数据文件,包含合同 ID、客户 ID、合同类型、总金额、合同付款类型、注册时间、购买数量、合同签约时间、购买的产品、是否已经交货等字段。需要查询已交货和未交货的数量分别是多少、购买合同的总金额是多少以及分期付款占全部订单的比例。

(二)代码实现

以下是实现该功能的 PySpark 代码:

import os
import re# 导入pyspark模块
from pyspark import SparkContext, SparkConfclass Contract:def __init__(self,line):# 合同类型, 总金额,合同付款类型,是否已经交货tuple1 = re.split(",",line)self.contract_type=tuple1[2]self.contract_money=int(tuple1[3])self.pay_type=tuple1[4]self.isDelivery=tuple1[-1]def __repr__(self):return "合同类型:%s,总金额:%d,合同付款类型:%s,是否已经交货:%s" % (self.contract_type,self.contract_money,self.pay_type,self.isDelivery)if __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象# setMaster  按照什么模式运行,local  bigdata01:7077  yarn#  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核#  appName 任务的名字conf = SparkConf().setMaster("local[*]").setAppName("合同分析")# 假如我想设置压缩# conf.set("spark.eventLog.compression.codec","snappy")# 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字sc = SparkContext(conf=conf)print(sc)mapRdd = sc.textFile("../../datas/zuoye/DEMO_CONTRACT.csv") \.filter(lambda line: line.find("合同ID") == -1) \.map(lambda line: Contract(line)) \"""1. 已交货和未交货的数量分别是多少2. 购买合同的总金额是多少3. 分期付款占全部订单的比例"""totalNum = mapRdd.count()deliverNum = mapRdd.filter(lambda contract:contract.isDelivery == '是').count()print("已交货和未交货的数量分别是:",deliverNum,totalNum-deliverNum)gouMaiMoney = mapRdd.filter(lambda contract:contract.contract_type=='购买合同') \.map(lambda contract:contract.contract_money).reduce(lambda sum,money:sum+money)gouMaiMoney2 = mapRdd.filter(lambda contract: contract.contract_type == '购买合同') \.map(lambda contract: contract.contract_money).sum()print("购买合同的总金额是:",gouMaiMoney2)# 第三问fenQiNum = mapRdd.filter(lambda contract:contract.pay_type=='分期付款').count()print("分期付款占全部订单的比例是:",fenQiNum/totalNum)# 使用完后,记得关闭sc.stop()

(三)代码解析

  1. 同样先配置环境变量并创建 SparkContext 对象。
  2. 定义了 Contract 类来封装合同数据的相关字段。
  3. 读取合同数据文件并进行过滤,排除标题行,然后将每行数据映射为 Contract 对象,得到 mapRdd
  4. 对于已交货和未交货数量的统计,先计算总订单数 totalNum,再通过过滤得到已交货订单数 deliverNum,进而得出未交货订单数。
  5. 计算购买合同总金额时,先过滤出购买合同类型的数据,然后提取金额并进行求和操作。
  6. 计算分期付款占比,先统计分期付款订单数 fenQiNum,再除以总订单数 totalNum

三、日志分析案例

(一)需求分析

  1. 统计热门搜索词 Top10,即统计用户搜索每个词出现的次数,然后降序排序取前 10。
  2. 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数,也就是计算所有用户在所有搜索过程中的最大、最小和平均点击次数。
  3. 统计一天每小时点击量并按照点击量降序排序,即统计每个小时点击的数据量并按降序排列。

(二)jieba分词器

汉语是需要分词的

python语言: Jieba 分词器

Java语言: IK 分词器(好久没更新过了)

安装一下

pip install jieba -i https://pypi.tuna.tsinghua.edu.cn/simple/

没有自定版本,安装的就是最新的版本

使用

语法:jieba.cut(“语句”) / jieba.cut_for_search(“语句”)
全模式:将句子中所有可以组成词的词语都扫描出来, 速度非常快,但可能会出现歧义
jieba.cut("语句", cut_all=True)
精确模式:将句子最精确地按照语义切开,适合文本分析,提取语义中存在的每个词
jieba.cut("语句", cut_all=False)
搜索引擎模式:在精确模式的基础上,对长词再次切分,适合用于搜索引擎分词
jieba.cut_for_search("语句")

测试

import jieba
# 测试一下结巴分词器
str = "中华人民共和国"
list01 = jieba.cut(str, cut_all=True)
# 中华,中华人民,中华人民共和国,华人,人民,人民共和国,共和,共和国
print(",".join(list01))
# 中华人民共和国
list02 = jieba.cut(str, cut_all=False)
for ele in list02:print(ele)# 中华 华人 人民 共和 共和国 中华人民共和国  比全模式少多,比精确模式多,适用于搜索引擎
list03 = jieba.cut_for_search(str)
print(*list03)

(四)代码实现

以下是实现日志分析功能的 PySpark 代码:

import os
import re# 导入pyspark模块
from pyspark import SparkContext, SparkConf
import jiebaif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'# 获取 conf 对象conf = SparkConf().setMaster("local[*]").setAppName("")# 根据配置文件,得到一个 SC 对象,第一个 conf 是形参的名字,第二个 conf 是实参的名字sc = SparkContext(conf=conf)# print(sc)# 清洗数据print("===========清洗数据===========")fileRdd = sc.textFile("../../datas/sogou/sogou.tsv")print(fileRdd.count())print(fileRdd.first())listRdd = fileRdd.map(lambda line: re.split("\\s+", line))filterList = listRdd.filter(lambda l1: len(l1) == 6)# 这个结果只获取而来时间 uid 以及热词,热词将左右两边的[] 去掉了tupleRdd = filterList.map(lambda l1: (l1[0], l1[1], l1[2][1:-1]))# 求热词top10print("===========求热词top10===========")wordRdd = tupleRdd.flatMap(lambda t1: jieba.cut_for_search(t1[2]))filterRdd2 = wordRdd.filter(lambda word: len(word.strip()) != 0 and word != "的").filter(lambda word: re.fullmatch("[\u4e00-\u9fa5]+", word) is not None)# filterRdd2.foreach(print)result = filterRdd2.map(lambda word: (word, 1)).reduceByKey(lambda sum, num: sum + num).sortBy(keyfunc=lambda tup: tup[1], ascending=False).take(10)for ele in result:print(ele)# 统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数print("===========统计所有用户搜索中最大点击次数、最小点击次数、平均点击次数===========")def splitWord(tupl):li1 = jieba.cut_for_search(tupl[2])  # 中国 中华 共和国li2 = list()for word in li1:li2.append(((tupl[1], word), 1))return li2newRdd = tupleRdd.flatMap(splitWord)# newRdd.foreach(print)reduceByUIDAndWordRdd = newRdd.reduceByKey(lambda sum, num: sum + num)# reduceByUIDAndWordRdd.foreach(print)valList = reduceByUIDAndWordRdd.values()print(f"最大点击次数: {valList.max()}")print(f"最小点击次数: {valList.min()}")print(f"中位数: {valList.mean()}")  # 中位数print(f"平均点击次数: {valList.sum() / valList.count()}")# 统计一天每小时点击量并按照点击量降序排序print("===========统计一天每小时点击量并按照点击量降序排序===========")reductByKeyRDD = tupleRdd.map(lambda tup: (tup[0][0:2], 1)).reduceByKey(lambda sum, num: sum + num)sortRdd = reductByKeyRDD.sortBy(keyfunc=lambda tup: tup[1], ascending=False)listNum = sortRdd.take(24)for ele in listNum:print(ele)# 使用完后,记得关闭sc.stop()

(三)代码解析

  1. 配置环境变量后创建 SparkContext 对象。
  2. 定义 getWords 函数,用于将搜索词进行分词并构建 ((用户id,词), 1) 的格式。
  3. 读取日志数据文件,进行数据清洗,排除数据长度不足 6 的行和包含特定违禁词的行,然后提取相关字段得到 mapRdd
  4. 对于热词 Top10 的统计,先对热词进行分词,过滤掉特定词和非中文词,然后映射为 (词, 1) 格式,通过 reduceByKey 统计词频,最后按词频降序排序并取前 10。
  5. 统计最大、最小和平均点击次数时,先通过 flatMap 和 getWords 函数构建 ((用户id,词),点击次数) 格式的数据,过滤掉非中文词和特定词后,通过 reduceByKey 统计点击次数,再获取值并计算相关统计量。
  6. 统计一天每小时点击量时,先提取小时信息并映射为 (小时, 1) 格式,通过 reduceByKey 统计每小时点击量,最后按点击量降序排序并收集结果。

四、常见错误及解决方法

        在运行 PySpark 代码读取数据时,可能会遇到 Caused by: java.net.SocketException: Connection reset by peer: socket write error 错误。

Caused by: java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:477)

        原因是连接数过多,一般在本地 Windows 运行 Spark 代码且读取数据过多,或者代码中使用了 take() 算子时容易出现。解决方法有两种:一是将数据量变小一点,只截取一部分进行测试;二是避免使用 take 算子。

五、总结

        通过以上三个案例,我们详细展示了 PySpark 在不同数据处理场景下的应用。从手机号码流量统计到合同数据分析,再到日志分析,涵盖了数据过滤、映射、分组求和、排序以及特定数据统计等常见操作。同时,也指出了在实际运行代码过程中可能遇到的错误及解决方法。希望读者能够通过这些案例,深入理解 PySpark 的使用技巧,在大数据处理工作中更加得心应手。

http://www.mnyf.cn/news/39943.html

相关文章:

  • 什么软件 做短视频网站好快速排名生客seo
  • 做网站 挣广告联盟的佣金安全优化大师
  • 怎么接单做网站温州网站建设制作
  • 公司建设网站产生哪些费用外贸seo优化
  • 给公司做企业网站手游推广平台哪个好
  • 辽宁平台网站建设价位成都培训机构排名前十
  • 北京做建筑信息的网站站长工具忘忧草
  • 网络总体方案设计seo的中文含义是什么
  • 简述网站建设的流程网络营销比较好的企业
  • 郑州高新区建设环保局网站附近电脑培训速成班一个月
  • 西宁网站建设高端网站怎么接广告
  • 中华人民共和国建设厅网站爱站关键词
  • .网站建设的基本步骤制作一个网站步骤
  • 浙江网站建设费用长沙优化排名推广
  • 搭建什么网站比较赚钱sem对seo的影响有哪些
  • 仿制别人的竞价网站做竞价犯法吗怎么优化关键词
  • 靖宇东兴自助建站it菜鸡网seo
  • 番禺低价网站建设百度下载并安装最新版
  • 做公众好号的网站吗免费获客软件
  • 深圳做微信网站建设百度关键词怎么优化
  • 威海网站制作怎么样什么是软文
  • 如何给网站做右侧导航栏百度指数在线查询
  • 做网站题材网络推广方法
  • 外贸平台是做什么的河南seo网站多少钱
  • 护肤品网站建设目的广告代运营
  • 如何用域名访问网站销售课程视频免费
  • 用asp做旅游网站零基础能做网络推广吗
  • 怎么制作免费网站教程视频网站建设制作过程
  • 什么网站教你做早点百度号码认证平台官网
  • 做网站的公司是接入商吗成都网站排名 生客seo