Celer信息服务

简介

Celer消息服务(MS)是一个信息平台,为网络应用提供了可靠的、高性能的通讯服务。通过MS可以将重要的数据放到后端服务器中通过JavaScript进行处理。

MS可以视为骨干web应用程序,因为它可以将大量的应用程序整合在一起,并将消息传递给接收端。

MS 概念

消息

发送的消息要小于64KB。任何消息都可以被跟踪因为它们都有序号。未编序的消息也可以传递,但不能保证他们会到达目的地。

消息需要遵循特定的规则,以至于它的接收端可以接收并理解它。

规则如下:

  1. 数据库日志(dblog):基于数据库日志快速的同步协议,允许两个或多个的Celer DB完全同步。
  2. 二进制(二进制):键-值对的编码格式利于缓存的使用,可被视为二进制版的JSON
  3. JSON(JSON):UTF8编码的JSON数据。
  4. Celer 2 XML(Celer20):使用XML编码的键-值对
  5. 熟知的转换(hepconvert):HEP资源数据转换格式

传输

传输通过相应的URI标识。支持以下传输方式

  1. HTTP 1.1 (http): 使用二进制文件、JSON和Celer 2.0 XML。示例: http://example.com
  2. Web Socket(ws): 用二进制和 DB 日志。示例: ws://example.com/ws
  3. Local(no scheme): 传递消息内相同的MS。示例: localService

MS消息组

1个MS消息组包含一个发布端和0个或多个接收端

因为一个subscriber需要运行在不同的空间,所以一个MS组需要在不同的接收端中提供不同的消息。

一个Topic能够存储所有由发布端发布的消息,并能为发布端和接收端提供检索服务,同时支持在系统崩溃的时候恢复消息。

发布端

发布端将消息从src发布到Topic,Topic连接着Celer DB,所以消息可以实现永久存储。

发布端的主要功能就是用src传输消息。

接收端

接收端能够识别出Topic上出现的新消息,然后将消息转换成能够被destination所识别的格式或者是直接执行这个消息任务。

执行模型

MS属于事件驱动模式,当发布端发现一个事件的时候,它就将此事件发送给它属于的Topic进行必要的转换。

配置细节

配置介绍

每一个MS应用程序都由一个MS XML文档进行配置,文档的总体结构如下所示:

  • <ms  domain=" " >
    • <topic >
      • <publisher/>
      • <subscriber/>
      • *
      </topic>
    • *
    • <mapReudce/>
    • *
    • <template/>
    • *
    </ms>

每个MS应用程序必须有唯一的一个ms元素,它必须定义domain属性,domain值能唯一标识这个应用程序,一般来说文件名和domain值相同。

XML文件之后被拷贝到Celer MS服务器的models字典中,此时应用程序就能在服务器端自动加载。

Topic 配置

一个Topic连接着一个DB,Topic持续地共享它的属性来实现对数据库的配置。

属性 是否必填 描述
name Y Topic的名称
dict Y DB的字段表,包含字段名称及类型
fields N 索引字段
protocol Y 接收端到目的端的数据传输协议

一旦定义一个Topic,就能够在domain对象下用JS执行检索。例如,定义了一个name="users"的topic,MS将为Topic生成一个检索模型domain.users。这个模型中包含search和get方法:domain.users.search("param1=value&param2=value&...")将检索一组符合参数条件的数据。domain.users.get(primaryKey)将检索到符合条件的一条数据。

发布端配置

所有发布端都共有以下属性:

属性 描述
src 数据来源
protocol 发布端和数据来源之间传递消息的协议

允许通过以下方式将数据传输和协议进行结合:

传输类型 协议 描述 附加属性
ws dblog 用于Celer3.0和MS之间的数据传输,dblog(日志)协议能够确保Celer3.0和MS之间的数据一致性。
local json 用来连接Map Reducer和数据库,将Map Reducer的输出结果存储到数据库中。
local binary 用来连接生成二进制消息的另一个接收端或者处理端。

接收端配置

所有接收端都共有以下属性:

属性 描述
dest 接收端消息发送的目的地
protocol 当发送消息到目的地的时候,接收端必须遵循的消息协议

允许通过以下方式将数据传输和协议进行结合:

数据传输 协议 描述 附加属性
local json 将标准格式的JSON数据发送到目的地,这个接收端同时支持查询属性,通常用于进一步的数据分析。
local json/uploader 用来上传资源到Celer,发送的JSON数据应该包括oid,path和mimeType.
  1. oid: 初始ID,在特定业务中,服务将只传递给对应的接收方。
  2. path: 资源路径。
  3. mimeType: 资源的mime类型。

例如:

                                
    {"oid":"1","path":"IMG_1092.JPG","mimeType":"image/jpeg"}
    
ws dblog 以dblog(日志)形式同步两个DB,以保证数据的一致性。
http json 将标准格式的JSON数据发送到目的地。
http json/uploader 通过发送JSON数据,以chunk形式上传文件到远程服务器。

Map Reducer

Map Reducer连接发布端和接收端,它接收接收端的消息,然后处理这些消息并将结果发送给一个发布端。

Map Reducer运行中有3个阶段:

  1. Filtering: 确定接收到的消息可以进一步的处理。
  2. Mapping: 将接收到的消息映射到一个中间格式进行进一步处理。例如,分组。
  3. Reducing: 执行最后的计算并生成输出

  • <mapReduce  name=" userEmail" >
    • <processor > (function() {
      • var o = null;
      • function map(data) {
        • // this method is called whenever a new message arrives
        • // this method should check if the data is an item that is of interest, if so, map its attributes to a tempory object
        • o = data;
      • }
      • function process() {
        • //this get calld after calling map, one should return an empty array if nothing to return, or an array of objects for downstream.
        • if (o) {
          • return [o];
        • }
        • return [];
      • }
      • return {
        • map: map,
        • process: process
      • };
      • }());
      </processor>
    </mapReduce>

举例

创建一个前后端数据交互的MS

问题: 我们需要实现应用于表单提交的业务逻辑

解决方法:

1. 在你的sitemap.xml,添加下列代码:

  • <service  path=" /forms"   store=" forms.db" >
    • <post  xpipe=" http://www.xmlpipe.org/xpe/ms/request" />
    • <get  xpipe=" http://www.xmlpipe.org/xpe/ms/request" />
    • <webSocket  path=" forms/req/ws"   protocol=" dblog"   xpipe=" http://www.xmlpipe.org/xpe/ms/record" />
    </service>
  • <webSocket  path=" forms/resp/ws"   protocol=" dblog"   xpipe=" http://www.xmlpipe.org/xpe/ms/record/response"   store=" formsResponses.db" />

在本例中,我们允许用户发送get和post请求,我们将在forms.db里存储所有的请求。我们还定义了两个websockets: forms/req/ws用于发送请求到消息处理器,forms/resp/ws用于响应返回。

2. 我们通过在src.ms文件夹下面创建一个msdemo.mml来定义下面的模型:

  • <ms  domain=" msdemo" >
    • <topic  dict=" id:i,jobId:i,path:s,params:s,body:s,created:t"   name=" requests"   fields=" id" >
      • <pub  src=" /forms/req/ws"   protocol=" dblog"   transport=" ws" />
      • <sub  dest=" handleRequest"   protocol=" json"   mask=" id,jobId,path,params,body,created" />
      </topic>
    • <topic  name=" responses"   primaryKey=" id"   seqKey=" true"   dict=" id:i,jobId:i,response:s,created:t" >
      • <pub  src=" handleRequest"   protocol=" json" />
      • <sub  dest=" /forms/resp/ws"   protocol=" dblog"   transport=" ws" />
      </topic>
    • <topic  name=" test"   dict=" id:i,body,created:t" >
      • <pub  protocol=" json" />
      </topic>
    • <mapReduce  name=" handleRequest" >
      • <processor > (function() {
        • var o = null;
        • function map(data) {
          • o = data;
        • }
        • function process() {
          • var r = [];
          • var data = {};
          • data.body = 'text body';
          • //now store data to the test db
          • //you access each topic via the domain object
          • //update is only available on a topicwith local publisher
          • var res = domain.test.update(data);
          • //print(JSON.stringify(res));
          • //simillary, you can access the searcher
          • //searcher is available on all topics
          • res = domain.test.search("all=all");
          • //print(JSON.stringify(res));
          • if (o) {
            • var resp = {};
            • resp.response = o;
            • resp.id = o.id;
            • resp.jobId = o.jobId;
            • r.push(resp);
          • }
          • return r;
        • }
        • return {
          • map: map,
          • process: process
        • };
        • }());
        </processor>
      </mapReduce>
    </ms>

下面代码实现MS服务器接收Web端请求并返回响应值:

  • <topic  dict=" id:i,jobId:i,path:s,params:s,body:s,created:t"   name=" requests"   fields=" id" >
    • <pub  src=" /forms/req/ws"   protocol=" dblog"   transport=" ws" />
    • <sub  dest=" handleRequest"   protocol=" json"   mask=" id,jobId,path,params,body,created" />
    </topic>
  • <topic  name=" responses"   primaryKey=" id"   seqKey=" true"   dict=" id:i,jobId:i,response:s,created:t" >
    • <pub  src=" handleRequest"   protocol=" json" />
    • <sub  dest=" /forms/resp/ws"   protocol=" dblog"   transport=" ws" />
    </topic>

注意,当我们接收到请求,我们将其发布到用JS定义的一个Map Reducer处理器handleRequest中。

同步一个远程Celer 2.0服务器的数据

问题: 现有的网站是用Celer 2.0技术开发的,我们需要导入数据到Celer 3.0网站。

整个应用程序使用以下配置进行定义:

  • <ms  domain=" xpe20to30" >
    • <dbsync  name=" news"   thumbWidth=" 400"   restartInterval=" 20000"   map=" news.map"   store=" news.db"   primaryKey=" id"   fields=" " >
      • <pub  src=" http://example.xpe20.com/data" />
      • <sub  dest=" ws://example.xpe30.com/news/ws"   protocol=" dblog" />
      </dbsync>
    </ms>

在上面的例子中,dbsync元素专为Celer 2.0定义了一个Topic,映射规则在news.map文件里被重新定义。restartInterval定义了在Celer 2.0网站中取数据前的等待时间。

模块

附加的模块也是支持的,但是需要部署在modules目录下。当部署一个新模块的时候目前要求进行重启操作。

附加模块也支持数据处理和协议。

数据推荐模块

在接下来的例子中,一个推荐模块被配置为支持binary/userAction协议的模块。

这个模块要求Topic的dict属性中必须包含以下字段标识itemId:i,rating:b,created:t。也允许包含其他的字段,但是上面的字段是要求必须添加。

当事件数量超过所定义的事件数量时,推荐系统被触发,它将为每个用户生成一定数量的推荐数据。

  • <topic  dict=" id,userId,itemId:i,rating:b,created:t"   name=" testData"   store=" uc.db"   primaryKey=" id"   storeType=" binary"   seqKey=" true"   fields=" " >
    • <pub  src=" ws://example"   protocol=" dblog" />
    • <sub  dest=" recommendations"   protocol=" binary/userAction"   numberOfEvents=" 10000" />
    </topic>
  • <topic  dict=" id,userId:l,itemId:i,rating:i,created:t"   name=" recommendations"   store=" rec.db"   primaryKey=" id" >
    • <pub  src=" recommendations"   protocol=" binary" />
    </topic>

发送邮件

问题: 新用户注册时实现系统向用户自动发送欢迎邮件

整个应用程序配置如下:

  • <ms  domain=" example" >
    • <topic  dict=" id:l,username,password,email,profile:j,permMask:s,group:l,created:t,lastUpdated:t"   name=" users"   store=" users.db"   primaryKey=" id"   storeType=" binary"   seqKey=" false"   fields=" id,username" >
      • <pub  src=" ws://example.com/ws/users"   protocol=" dblog" />
      • <sub  dest=" userEmail"   protocol=" json"   mask=" username,profile,email,created,lastUpdated" />
      </topic>
    • <topic  name=" sendMail"   primaryKey=" id"   seqKey=" true"   dict=" id,from,to,cc,bcc,subject,body"   store=" mails.db" >
      • <pub  src=" userEmail"   protocol=" json" />
      • <sub  dest=" mail://example@celer.cc"   smtp.host=" mail.tpg.com.au"   smtp.debug=" true"   username=" username"   password=" password"   from=" celer@celer.cc" />
      </topic>
    • <template  name=" welcome"   src=" welcome.html" />
    • <mapReduce  name=" userEmail" >
      • <processor > (function() {
        • var o = null;
        • function map(data) {
          • o = null;
          • if (data.created && data.created == data.lastUpdated) {
            • o = {};
            • if (data.profile && data.profile.name) {
              • o.name = data.profile.name;
            • } else {
              • o.name = "Customer";
            • }
            • o.to = data.email;
          • }
        • }
        • function process() {
          • if (o) {
            • o.subject = "Welcome to example.com";
            • o.body = welcome.gen(o);
            • var r = [];
            • r.push(o);
            • return r;
          • }
          • return [];
        • }
        • return {
          • map: map,
          • process: process
        • };
        • }());
        </processor>
      </mapReduce>
    </ms>

我们先监听user db,因此修改用户时我们将得到一个帐户。如果创建时间和更新时间相同,那么我们知道了刚注册的用户。

我们使用template元素定义一个模板。模板一旦定义,该模板可作为JavaCcript中的对象使用。该对象中有一个gen方法,当执行该方法时,会发送一个JSON数据到模板中,模板中定义的相应值会被替换为JSON的数据。

调用外部的REST服务

问题: 处理一条外部服务传递的消息,并存储到Celer3.0中。例如,采购订单。

整个应用程序使配置如下:

  • <ms  domain=" hep" >
    • <topic  name=" resources"   primaryKey=" id"   store=" uploads.db"   dict=" id,username,path,filename,mime,created:t,lastUpdated:t,size:l,access:b" >
      • <pub  src=" ws://example.com/uploads/ws"   protocol=" dblog" />
      • <sub  dest=" http://10.3.200.60:8080/rps/resourceProcess.htm"   protocol=" hepconvert"   backURL=" http://{address}/jpk/converts"   uploadedResourceURI=" http://{resource address}/upload" />
      </topic>
    </ms>

ms元素是此应用程序的顶层元素,并且它需要domain属性。此属性必须唯一地标识应用程序。

topic元素定义了一个Topic。它的name属性惟一地标识这一Topic。PrimaryKey, store和dict属性需要定义后台数据库的结构。

pub元素定义了一个发布端,src属性定义了数据来源。 一般情况下,它是一个使用dblog协议的远程WebSocket地址,允许远程数据同步到本地Topic数据中。

sub元素定义了一个接收端。一般情况下,它将调用一个远程REST服务,使用一个特殊的数据映射器将内部数据库数据以XML格式映射到一个特殊的远程服务上。

数据分析

问题:我们想知道一个视频的访问量,但是如果在用户浏览的时候同时更新视频的访问量,会导致性能大量损耗。

为了解决这个问题,我们首先需要一个在Web前端存储日志的DB,DB的dict包含字段id,parent(视频的id)和事件发生的时间,每当用户查看视频时,记录一次访问事件。在MS端,我们只需要定义一个Topic:

  • <ms  domain=" hep" >
    • <topic  name=" actions"   dict=" id,parent:i,created:t"   primaryKey=" id"   store=" actions.db" >
      • <pub  src=" ws://example.com/logs/ws"   protocol=" dblog" />
      • <sub  dest=" logAnalysis"   protocol=" json"   query=" all=all"   timeout=" 100000" />
      </topic>
    • ...
    </ms>

在上面的示例中,我们使用dblog和webSocket同步远程数据到Topic。接下来,我们定义一个发送端,它可以在Topic的db中执行查询,每100秒执行一次查询(超时时间以毫秒为单位)。

接下来,我们定义一个Map Reducer处理器。

  • <mapReducer  name=" logAnalysis" >
    • <script  src=" " />
    • <filter  src=" " />
    • <processor  src=" ..." />
    </mapReducer>

你可以通过在src属性指定外部JS的URI或将外部脚本嵌入元素本身。

过滤器必须定义一个过滤函数:

            
     function pass(data) {
       return data.parent==1;
     }
  

只有通过检查的数据才能被处理器执行处理。

处理器有以下JavaScript。你可以将它嵌入处理器元素中或用src属性调用外部JS链接。

  
	   
 (function() {

    var m1 = {};

	//use parent as the grouping key
    function map(data) {
	    print(data);
        var a = m1[data.parent] || [];
        a.push(1);
        m1[data.parent] = a;
    }

    function reduce(key, values) {
        var c = 0;
        values.forEach(function(v) {
            c += v;
        });
        return {
            'id': key,
            'viewNum': c
        };
    }


    function process() {
        var o = [];
        for (var key in m1) {
            o.push(reduce(key, m1[key]));
        }
        return o;
    }

    return {
        map: map,
        process: process
    };

}());
	   
        
    

值得注意的是:

  1. Map Resucer中的name需和上面的sub中dest属性一样,这样sub的数据才能发送JSON数据到Map Reducer中。
  2. processor元素可直接嵌入一个JS,或者通过使用src属性来定义一个外部JS。
  3. processor的JavaScript中必须定义一个map函数和process函数。map函数在消息被接收的时候响应,处理函数在所有消息执行完map操作后响应。处理函数必须返回一个对象数组,每个对象都会发送到最终的目的地。

最后,我们定义:

  • <topic  name=" resources"   primaryKey=" id"   dict=" id,viewNum:i"   store=" resources.db" >
    • <pub  src=" logAnalysis"   protocol=" binary" />
    • <sub  dest=" http://{remote address}"   protocol=" json" />
    </topic>

在sub中我们定义了src属性和Map Reducer的name属性,所以sub可以接收到Map Reducer的处理过的数据,然后,数据会以JSON的形式发送到sub中定义的dest地址上。

如何调试你的Map Reduce应用?

最容易的方法是使用Celer IDE开发app,创建一个空白页将core.js和Map Reduce js包含进去,然后在页面中模拟数据执行。