pyspark使用笔记,含1.jupyter-docker环境搭建 2.dataframe2jdbc,jdbc2dataframe 3.模型训练及保存 4.模型导入及使用 等。
一 环境部署
执行以下命令,访问 ip:18804即可,密码为:my-password
镜像使用参考:https://jupyter-docker-stacks.readthedocs.io/en/latest
mkdir -p /data/pyspark && chmod 777 -R /data/pyspark
docker run -d \
--name my-spark-notebook \
--restart=always \
-e GRANT_SUDO=yes \
-v /data/pyspark:/home/jovyan \
--network host \
jupyter/all-spark-notebook \
start.sh jupyter lab \
--NotebookApp.password='sha1:7cca89c48283:e3c1f9fbc06d1d2aa59555dfd5beed925e30dd2c'
二 dataframe导出数据库
参考:
- https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/from_to_dbms.html
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameWriter.jdbc.html?highlight=dataframewriter
在jupyter打开的终端界面中执行以下命令,下载mysql驱动包
curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
如下,
from pyspark.sql import SparkSession
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from sklearn.datasets import load_boston
import pandas as pd
from pyspark.ml import Pipeline
spark = SparkSession.builder.master('local') \
.appName("pyspark-lian") \
.config("spark.jars","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.config("spark.driver.extraClassPath","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.getOrCreate()
sc = spark.sparkContext
boston = load_boston()
df_boston = pd.DataFrame(boston.data,columns=boston.feature_names)
df_boston['target'] = pd.Series(boston.target)
print(df_boston.head())
sqlContext = SQLContext(sc)
data = sqlContext.createDataFrame(df_boston)
print(type(data))
print(data.printSchema())
data.write.jdbc(table='df_boston',mode='overwrite',url='jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false',properties={'user':'root','password':'123456'})
三 读取mysql数据完成线性回归并导出模型文件
参考:https://www.datatechnotes.com/2021/05/mllib-linear-regression-example-with.html
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd
spark = SparkSession.builder.master('local') \
.appName("pyspark-lian") \
.config("spark.jars","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.config("spark.driver.extraClassPath","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.getOrCreate()
sc = spark.sparkContext
df_boston = ps.read_sql('df_boston', 'jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false&user=root&password=123456')
print(type(df_boston))
print(df_boston.head())
data = df_boston.to_spark()
print(type(data))
print(data.printSchema())
columns=data.columns
print(columns)
columns.remove('target')
print(columns)
va = VectorAssembler(inputCols=columns, outputCol='features')
print(data.count())
va_df = va.transform(data.limit(200))
va_df = va_df.select(['features', 'target'])
va_df.show(3)
lr=LinearRegression(featuresCol='features', labelCol='target',
regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(va_df)
print(type(lr_model))
print("Coefficients: ", lr_model.coefficients)
print("Intercept: ", lr_model.intercept)
print("MSE: ", lr_model.summary.meanSquaredError)
print("MAE: ", lr_model.summary.meanAbsoluteError)
print("R-squared: ", lr_model.summary.r2)
lr_model.save('LinearRegressionModel_model')
mdata = lr_model.transform(va_df)
mdata.show(3)
x_ax = range(0, mdata.count())
y_pred = mdata.select("prediction").collect()
y_orig = mdata.select("target").collect()
# Stop session
sc.stop()
四 加载模型并进行预测
注意 LinearRegressionModel 和 LinearRegression 的关系。
在上一步 LinearRegression fit之后产生LinearRegressionModel并保存为文件。
现在再使用 LinearRegressionModel的load方法从文件重新加载回来进行 transform 。
参考pipeline的概念的话:
LinearRegression 是 Estimator
而 LinearRegressionModel 是 Transformer
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd
import pyspark.ml as ml
import pyspark as pyspark
import sys
spark = SparkSession.builder.master('local') \
.appName("pyspark-lian") \
.config("spark.jars","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.config("spark.driver.extraClassPath","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.getOrCreate()
sc = spark.sparkContext
result_mysqlSource = ps.read_sql('df_boston', 'jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false&user=root&password=123456').to_spark()
result_transition0=result_mysqlSource
va = pyspark.ml.feature.VectorAssembler()
va.setParams(inputCols=['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT'])
va.setParams(outputCol='features')
result_va=va.transform(result_transition0)
lr_model = pyspark.ml.regression.LinearRegressionModel.load('file:///home/jovyan/LinearRegressionModel_model_2')
result_lr_model=lr_model.transform(result_va)
result_lr_model.select(['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT','target','prediction']) \
.write.jdbc(table='df_boston_result',mode='overwrite',url='jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false',properties={'user':'root','password':'123456'})
sc.stop()
五 使用 pipeline 改造
参考:https://spark.apache.org/docs/latest/ml-pipeline.html
部分中文翻译参考: https://zhuanlan.zhihu.com/p/33619687
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib.pyplot as plt
from sklearn.datasets import load_boston
import pandas as pd
import pyspark.ml as ml
import pyspark as pyspark
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
import sys
spark = SparkSession.builder.master('local') \
.appName("pyspark-lin") \
.config("spark.jars","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.config("spark.driver.extraClassPath","{}/mysql-connector-java-8.0.28.jar".format(os.getcwd())) \
.getOrCreate()
sc = spark.sparkContext
result_mysqlSource = ps.read_sql('df_boston', 'jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false&user=root&password=123456').to_spark()
result_transition0=result_mysqlSource
va = pyspark.ml.feature.VectorAssembler(inputCols=['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT'],outputCol='features')
lr_model = pyspark.ml.regression.LinearRegressionModel.load('file:///home/jovyan/LinearRegressionModel_model')
pipelineModel = PipelineModel(stages=[va, lr_model])
result=pipelineModel.transform(result_mysqlSource);
result.select(['CRIM','ZN','INDUS','CHAS','NOX','RM','AGE','DIS','RAD','TAX','PTRATIO','B','LSTAT','target','prediction']) \
.write.jdbc(table='df_boston_result2',mode='overwrite',url='jdbc:mysql://localhost:3306/test?createDatabaseIfNotExist=true&useSSL=false',properties={'user':'root','password':'123456'})
sc.stop()
Q.E.D.