Scala化规则引擎
1. 引言
什么是规则引擎
一个业务规则包含一组条件和在此条件下执行的操作,它们表示业务规则应用程序的一段业务逻辑。业务规则通常应该由业务分析人员和策略管理者开发和修改,但有些复杂的业务规则也可以由技术人员使用面向对象的技术语言或脚本来定制。业务规则的理论基础是:设置一个或多个条件,当满足这些条件时会触发一个或多个操作。
规则引擎(rule engine)是指将复杂的业务逻辑抽象成规则,然后使用特定的算法(比如Rete)对规则进行求值等操作。简单点说,规则引擎就是实现复杂业务逻辑的框架。
为什么要用规则引擎
在维护和更新项目的业务逻辑代码时,大家深有体会:
- 因编码风格的问题,不同人有不同的代码实现,而造成代码理解的困难;
- 每一次业务逻辑的更改会导致项目的重编译;
- 为了能实时响应更改,而不得不做服务重启的无缝衔接
从上面的需求出发,规则引擎应满足如下特点:
- 脚本化,允许用类Python的脚本语言或DSL来描述规则;
- 动态化,实时动态地加载规则脚本,规则的修改能实时地反馈于服务系统;
- 快速的执行速度。
2. 实现
已有的开源方案
Drools应该是Java界名头最响、功能最丰富、社区最活跃的开源规则引擎了,目前的版本号为6.5.0.Final。淘宝也开源了自己的一套规则引擎QLExpress,但是似乎在2011年后项目没有再维护了。相较于QLExpress,Drools设计一套类DSL的脚本语言,用起来非常爽。但是在我看来,对于特定场景(比如用户画像),Drools显得过于重量级了,而QLExpress设计的API过于Java化了。
Scala方案
为了做到脚本化与动态化,Scala化的规则引擎将会把规则作为Scala Script独立出来,采用动态编译来加载。第三方Scala库hammurabi与scalascriptengine完美地契合了需求。其中,hammurabi是规则引擎的具体实现,scalascriptengine用于对scala文件做动态编译。虽然hammurabi没有wiki,但是DZone上有一篇介绍使用的文章。scalascriptengine的版本号"com.googlecode.scalascriptengine" % "scalascriptengine_2.11" % "1.3.10"。
ScalaScriptEngine的API为java.io.File;为了与Spark做集成,我们不得不做接口封装——从HDFS路径得到local File对象。基本思路便是把HDFS目录拷贝到本地的目标路径,然后基于本地路径new File对象:
<code>import org.apache.hadoop.conf.Configuration<br/>
import org.apache.hadoop.fs.{FileSystem, Path}<br/>
import java.io.File
implicit def path2File(hdfsPath: String): File = {<br/>
val fs = FileSystem.get(new Configuration)<br/>
val dstDir = System.getProperty("java.io.tmpdir") + "/scala-tmp"<br/>
val dstPath = new Path(dstDir)<br/>
fs.deleteOnExit(dstPath)<br/>
fs.copyToLocalFile(new Path(hdfsPath), dstPath)<br/>
new File(dstDir)<br/>
}<br/>
</code>
由HDFS路径(作为Scala Script的package根目录)导入规则脚本生成规则集合对象:
<code>import com.googlecode.scalascriptengine.ScalaScriptEngine
def loadRules[T](sourceDir: String, className: String): T = {<br/>
val sse = ScalaScriptEngine.onChangeRefresh(sourceDir)<br/>
// delete all compiled classes (i.e. from previous runs)<br/>
sse.deleteAllClassesInOutputDirectory()<br/>
sse.refresh<br/>
sse.newInstance[T](className)<br/>
}<br/>
</code>
为了使得规则集合具有类型,而特地定义了RulesTrait:
<code>// RulesTrait.scala<br/>
package rule.util
import hammurabi.Rule
trait RulesTrait {<br/>
val rules: Set[Rule]<br/>
}
// User.scala<br/>
case class User(uid: String, apps: Array[String]) {<br/>
val tags = mutable.Set.empty[String]
override def toString: String = "uid: %s, apps: %s, tags: %s".format(uid, apps.mkString, tags)<br/>
}<br/>
</code>
Scala Script所描述的业务规则集合(用于给用户打标签)如下:
<code>import hammurabi.Rule<br/>
import _root_.rule.util.{RulesTrait, User}<br/>
import hammurabi.Rule._
class TagRules extends RulesTrait {
override val rules: Set[Rule] = Set(<br/>
rule("add 母婴 Tag") let {<br/>
val u = any(kindOf[User])<br/>
when {<br/>
u.apps.mkString.matches(".*(孕|宝宝|育儿).*")<br/>
} then {<br/>
u.tags += "母婴"<br/>
}<br/>
},<br/>
rule("add 大学生 Tag") let {<br/>
val u = any(kindOf[User])<br/>
when {<br/>
u.apps.mkString.matches(".*(四级|六级|大学).*")<br/>
} then {<br/>
u.tags += "大学生"<br/>
}<br/>
}<br/>
)<br/>
}
object TagRules {<br/>
}<br/>
</code>
在Spark程序中集成规则引擎:
<code>import rule.util.{RulesTrait, User}
val rulesObj = loadRules[RulesTrait](params.scriptPath, "TagRules")<br/>
val ruleEngine = RuleEngine(rulesObj.rules)<br/>
val engineBC = sc.broadcast(ruleEngine)
val log: RDD[User]<br/>
val tagRdd = log.mapPartitions { iter =><br/>
val seq = iter.toSeq<br/>
val workingMemory = WorkingMemory(seq)<br/>
engineBC.value execOn workingMemory<br/>
seq.filter(_.tags.nonEmpty).toIterator<br/>
}<br/>
</code>
3. 参考资料
[1] 李国乐, Java规则引擎与其API(JSR-94).
转发申明:
本文转自互联网,由小站整理并发布,在于分享相关技术和知识。版权归原作者所有,如有侵权,请联系本站 top8488@163.com,将在24小时内删除。谢谢
