在本文中,我们将演示计算机视觉问题,它具有结合两种最先进技术的能力:深度学习和Apache Spark。我们将利用深度学习管道的强大功能来解决多类图像分类问题。
PySpark 是 Spark 为 Python 开发者提供的 API。
PySpark 提供的类
1、pyspark.SparkConf
2、pyspark.SparkContext
3、pyspark.SparkFiles
4、pyspark.RDD
5、pyspark.Accumulator
6、pyspark.Broadcast
7、pyspark.Accumulator
迁移学习
1、 开发模型的方法 2、 预训练模型的方法
开发模型的方法
预训练模型方法
深度学习管道是一个高级深度学习框架,通过Spark MLlib Pipelines API 促进常见的深度学习工作流程。它目前支持TensorFlow和Keras以及TensorFlow后端。
该库来自Databricks,并利用Spark的两个最强大的方面:
本着Spark和Spark MLlib的精神,它提供了易于使用的API,可以在极少数代码行中实现深度学习。
它使用Spark强大的分布式引擎来扩展大规模数据集的深度学习。
转移学习
转移学习一般是机器学习中的一种技术,侧重于在解决一个问题时保存所获得的知识(权重和偏见),并进一步将其应用于不同但相关的问题。
深度学习管道提供实用程序来对图像执行传输学习,这是开始使用深度学习的最快方法之一。借助Featurizer的概念, Deep Learning Pipelines可以在Spark-Cluster上实现快速转移学习。现在,它为转移学习提供了以下神经网络:
InceptionV3
Xception
ResNet50
VGG16
VGG19
出于演示目的,我们将仅使用InceptionV3模型。您可以从此处阅读此模型的技术细节。
以下示例将Spark中的InceptionV3模型和多项逻辑回归组合在一起。
Deep Learning Pipelines中的一个名为DeepImageFeaturizer的效用函数会自动剥离预训练神经网络的最后一层,并使用所有前一层的输出作为逻辑回归算法的特征。
数据集
孟加拉语脚本有十个数字(字母或符号表示从0到9的数字)。使用位置基数为10的数字系统在孟加拉语中写入大于9的数字。


图2:孟加拉语手写数字
加载图片
# necessary import
from pyspark.sql import SparkSession
from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import lit
from functools import reduce
# create a spark session
spark = SparkSession.builder.appName(‘DigitRecog’).getOrCreate()
# loaded image
zero = ImageSchema.readImages("0").withColumn("label", lit(0))
one = ImageSchema.readImages("1").withColumn("label", lit(1))
two = ImageSchema.readImages("2").withColumn("label", lit(2))
three = ImageSchema.readImages("3").withColumn("label", lit(3))
four = ImageSchema.readImages("4").withColumn("label", lit(4))
five = ImageSchema.readImages("5").withColumn("label", lit(5))
six = ImageSchema.readImages("6").withColumn("label", lit(6))
seven = ImageSchema.readImages("7").withColumn("label", lit(7))
eight = ImageSchema.readImages("8").withColumn("label", lit(8))
nine = ImageSchema.readImages("9").withColumn("label", lit(9))
dataframes = [zero, one, two, three,four,
five, six, seven, eight, nine]
# merge data frame
df = reduce(lambda first, second: first.union(second), dataframes)
# repartition dataframe
df = df.repartition(200)
# split the data-frame
train, test = df.randomSplit([0.8, 0.2], 42)
df.printSchema()
root
|-- image: struct (nullable = true)
| |-- origin: string (nullable = true)
| |-- height: integer (nullable = false)
| |-- width: integer (nullable = false)
| |-- nChannels: integer (nullable = false)
| |-- mode: integer (nullable = false)
| |-- data: binary (nullable = false)
|-- label: integer (nullable = false)
模型训练
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer
# model: InceptionV3
# extracting feature from images
featurizer = DeepImageFeaturizer(inputCol="image",
outputCol="features",
modelName="InceptionV3")
# used as a multi class classifier
lr = LogisticRegression(maxIter=5, regParam=0.03,
elasticNetParam=0.5, labelCol="label")
# define a pipeline model
sparkdn = Pipeline(stages=[featurizer, lr])
spark_model = sparkdn.fit(train) # start fitting or training评估
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# evaluate the model with test set
evaluator = MulticlassClassificationEvaluator()
tx_test = spark_model.transform(test)
print('F1-Score ', evaluator.evaluate(tx_test,
{evaluator.metricName: 'f1'}))
print('Precision ', evaluator.evaluate(tx_test,
{evaluator.metricName: 'weightedPrecision'}))
print('Recall ', evaluator.evaluate(tx_test,
{evaluator.metricName: 'weightedRecall'}))
print('Accuracy ', evaluator.evaluate(tx_test,
{evaluator.metricName: 'accuracy'}))F1-Score 0.8111782234361806
Precision 0.8422058244785519
Recall 0.8090909090909091
Accuracy 0.8090909090909091混淆矩阵
在这里,我们将使用混淆矩阵总结分类模型的性能。
import matplotlib.pyplot as plt
import numpy as np
import itertools
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.GnBu):
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]),
range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
为此,我们需要先将Spark-DataFrame转换为Pandas-DataFrame,然后使用true和predict标签调用Confusion Matrix。
from sklearn.metrics import confusion_matrix
y_true = tx_test.select("label")
y_true = y_true.toPandas()
y_pred = tx_test.select("prediction")
y_pred = y_pred.toPandas()
cnf_matrix = confusion_matrix(y_true, y_pred,labels=range(10))
让我们想象一下混淆矩阵
import seaborn as sns
sns.set_style("darkgrid")
plt.figure(figsize=(7,7))
plt.grid(False)
# call pre defined function
plot_confusion_matrix(cnf_matrix, classes=range(10))

图 3:10个孟加拉数字的混淆矩阵(0到9)
分类报告
在这里,我们还可以通过评估矩阵获得每个类别的分类报告。
from sklearn.metrics import classification_report
target_names = ["Class {}".format(i) for i in range(10)]
print(classification_report(y_true, y_pred,
target_names = target_names))
它将证明每个类标签预测的模型性能要好得多。
precision recall f1-score support
Class 0 1.00 0.92 0.96 13
Class 1 0.57 1.00 0.73 8
Class 2 0.64 1.00 0.78 7
Class 3 0.88 0.70 0.78 10
Class 4 0.90 1.00 0.95 9
Class 5 0.67 0.83 0.74 12
Class 6 0.83 0.62 0.71 8
Class 7 1.00 0.80 0.89 10
Class 8 1.00 0.80 0.89 20
Class 9 0.70 0.54 0.61 13
micro avg 0.81 0.81 0.81 110
macro avg 0.82 0.82 0.80 110
weighted avg 0.84 0.81 0.81 110
ROC AUC Score
让我们也找到这个模型的ROC AUC得分点。我从这里使用了以下代码。
from sklearn.metrics import roc_curve, auc, roc_auc_score
from sklearn.preprocessing import LabelBinarizer
def multiclass_roc_auc_score(y_test, y_pred, average="macro"):
lb = LabelBinarizer()
lb.fit(y_test)
y_test = lb.transform(y_test)
y_pred = lb.transform(y_pred)
return roc_auc_score(y_test, y_pred, average=average)
print('ROC AUC score:', multiclass_roc_auc_score(y_true,y_pred))
它得分0.901.
预测样本
让我们看看它的一些预测,与真实标签的比较。
# all columns after transformations
print(tx_test.columns)
# see some predicted output
tx_test.select('image', "prediction", "label").show()
结果如下
['image', 'label', 'features', 'rawPrediction',
'probability', 'prediction']
+------------------+----------+--------+
| image |prediction| label |
+------------------+----------+--------+
|[file:/home/i...| 1.0| 1|
|[file:/home/i...| 8.0| 8|
|[file:/home/i...| 9.0| 9|
|[file:/home/i...| 1.0| 8|
|[file:/home/i...| 1.0| 1|
|[file:/home/i...| 1.0| 9|
|[file:/home/i...| 0.0| 0|
|[file:/home/i...| 2.0| 9|
|[file:/home/i...| 8.0| 8|
|[file:/home/i...| 9.0| 9|
|[file:/home/i...| 0.0| 0|
|[file:/home/i...| 4.0| 0|
|[file:/home/i...| 5.0| 9|
|[file:/home/i...| 1.0| 1|
|[file:/home/i...| 9.0| 9|
|[file:/home/i...| 9.0| 9|
|[file:/home/i...| 1.0| 1|
|[file:/home/i...| 1.0| 1|
|[file:/home/i...| 9.0| 9|
|[file:/home/i...| 3.0| 6|
+--------------------+----------+-----+
only showing top 20 rows
结论
参考:
2、Apache Spark:https://spark.apache.org/docs/latest/api/python/index.html?source=post_page
3、本文中演示的源码:
https://github.com/iphton/Transfer-Learning-PySpark
长按订阅更多精彩▼

如有收获,点个在看,诚挚感谢