无编码利用协同算法实现个性化推荐

目标

根据昨天的URL上报数据生成ALS模型。之后将模型加载到流式计算中,对实时URL的访问用户进行内容推荐。整个流程只需要你写写SQL(做解析),弄弄配置就搞定。

资源准备

模型训练

首先我们拷贝一份配置文件 als-training,我在配置文件里模拟了一些数据,假设是一些URL,大体如下,表示itemId 为2的文章被userId=1的用户访问了。

http://123.com/path?userId=1&itemId=2

之后的SQL就是抽取出userid 和itemId,然后得到一个包含label, features 的表。在StreamingPro中,所有的的算法的输入都会遵循这个规范。对于ALS算法而言,label 表示userId, features则是userId,ItemId,rating 三个按逗号拼接的字符串。对于回归类算法,则是逗号拼接的数字。

最后通过组件AlgorithmOutputCompositor 完成模型训练。

{
        "name": "streaming.core.compositor.spark.output.AlgorithmOutputCompositor",
        "params": [
          {
            "path": "/tmp/als_log",
            "algorithm": "als"
          },
          {
            "rank": 10,
            "alpha": 1.0
          }
        ]
      }

path表示输出路径。 algorithm 表示算法。目前只支持 als,lr(线性回归),lr2(逻辑回归)三种算法。后续会不断添加。

第二组参数则是对应算法的一些配置参数。你可以配置多组,算法自动回选择最优的一组参数得到模型,并且保存到对应的path路径下。

你可以直接运行得到结果:

./bin/spark-submit   \
--class streaming.core.StreamingApp \
--master local[2] \
/tmp/streamingpro-0.3.2-SNAPSHOT-online-mllib-1.6.1.jar  \
-streaming.name test \
-streaming.platform spark  \
-streaming.job.file.path file://tmp/strategy.v2.json 

推荐预测

接着我们要给指定的用户进行推荐。参看 als-predict

解析出用户的逻辑是和上面的是一样的。里面的核心模块是:

{
        "name": "streaming.core.compositor.spark.transformation.AlgorithmCompositor",
        "params": [
          {
            "path": "file:///tmp/als_log",
            "algorithm": "als",
            "outputTableName": "test4",
            "recommendUsersForProductsNum": 1
          }
        ]
      }

path 是模型文件所在的位置。recommendUsersForProductsNum 表示对每个用户推荐多少内容。outputTableName是输出的表, 方便后续继续操作,比如存储到Redis或者数据库中,方便前端程序做调用。

大家讲上面的运行脚本里的配置文件路径调整下,就可以运行起来,看到运行结果,比如我这里的结果是:

+----+----+--------------------+
|user|item|             ratings|
+----+----+--------------------+
|   3|   2|[[2,3,0.900332472...|
|   2|   3|[[2,2,0.900333589...|
|   2|   2|[[2,2,0.900333589...|
+----+----+--------------------+

你可以输入到任何你感兴趣的系统中,StreamingPro目前支持ES,Parquet等Spark已经支持的格式作为输出。

在流式计算中进行数据推荐

参看 als-streaming-predict,将所有的包名前缀从

streaming.core.compositor.spark
转换为
streaming.core.compositor.spark.streaming

即可支持流式。运行脚本如下:

./bin/spark-submit   \
--class streaming.core.StreamingApp \
--name "join"  \
--master local[2] \
/tmp/streamingpro-0.3.2-SNAPSHOT-online-mllib-1.6.1.jar  \
-streaming.name test \
-streaming.job.file.path file://tmp/strategy.v2.json 

总结

在StreamingPro中,一个算法的模型训练,仅仅被看做一个特殊的存储。我们完全可以将对应的AlgothrimOutputCompositor换成 其他的输出源。

而对于数据的预测,我们仅仅是把它看做一个数据Transformer,根据进来的数据,新生成一个prediction字段。

无论是模型训练还是预测,都是基于SQL流来完成的,完美的融入到了数据的流程当中。

附录

感兴趣实现的,可以参考 代码