我正在尝试使用PySpark将分数应用到Spark DataFrame中。假设我在Spark之外构建了一个简单的回归模型,并且希望将模型中创建的系数值映射到DataFrame中的各个列,以创建一个新的列,该列是各个源列乘以各自系数的总和。我知道Spark mllib中有许多用于建模的工具,但我希望了解这种“蛮力”方法是如何实现的。我也知道DataFrame/RDD是不可变的,因此需要创建一个新的DataFrame。
这里提供一些伪代码供参考:
#加载示例数据
df = sqlContext.createDataFrame(data)
df.show(5)
dfmappd.select("age", "parch", "pclass").show(5)
+----+-----+------+
| age|parch|pclass|
+----+-----+------+
|22.0| 0| 3|
|38.0| 0| 1|
|26.0| 0| 3|
|35.0| 0| 1|
|35.0| 0| 3|
+----+-----+------+
仅显示前5行
在Spark之外创建的模型是一个基于二元响应的逻辑回归模型。因此,我基本上希望将logit函数映射到这三个列,以生成第四个评分列。以下是模型中的系数:
截距:3.435222
年龄:-0.039841
parch:0.176439
pclass:-1.239452
供参考,这里是logit函数的描述:https://en.wikipedia.org/wiki/Logistic_regression
为了比较,这里是如何在R中使用tidyr和dplyr做同样的事情
library(dplyr)
library(tidyr)
#示例数据
Age <- c(22, 38, 26, 35, 35)
Parch <- c(0,0,0,0,0)
Pclass <- c(3, 1, 3, 1, 3)
#包装在一个数据框中
mydf <- data.frame(Age, Parch, Pclass)
#使用dplyr创建一个带有变异列的新数据框
scoredf = mydf %>%
mutate(score = round(1/(1 + exp(-(3.435 + -0.040 * Age + 0.176 * Parch + -1.239 * Pclass))),2))
scoredf
回答:
如果我正确理解了您的问题,您希望根据您离线计算的系数“手动”计算每个样本的类条件概率。
像这样的代码是否可行:
def myLogisticFunc(age, parch, pclass):
intercept = 3.435222
betaAge = -0.039841
betaParch = 0.176439
betaPclass = -1.239452
z = intercept + betaAge * age + betaParch * parch + betaPclass * pclass
return 1.0 / (1.0 + math.exp(-z))
myLogisticFuncUDF = udf(myLogisticFunc)
df.withColumn("score", myLogisticFuncUDF(col("age"), col("parch"), col("pclass"))).show()