博客
关于我
商品订单频繁项集Bolt实现
阅读量:179 次
发布时间:2019-02-28

本文共 13351 字,大约阅读时间需要 44 分钟。

Storm框架中的Bolt拓扑设计与实现

本文将详细介绍Storm框架中基于Bolt拓扑的设计方案,并对各个Bolt组件的实现进行分析。

一、设计方案介绍

在本项目中,我们设计了一个基于Storm框架的数据处理pipeline,主要用于处理订单中的商品数据。系统通过多个Bolt拓扑组件,完成商品对的组合、计数、支持度计算以及最终的过滤和存储操作。

1.1 SplitBolt

SplitBolt 拓扑负责对订单中的商品进行两两组合并发送。其核心逻辑是将订单中的商品与新加入的商品进行配对,并将结果发射到下游流。

1.2 PairCountBolt

PairCountBolt 拓扑用于计算商品对的出现次数。通过接收SplitBolt发射的商品对信息,统计每一对商品的组合次数。

1.3 PairTotalCountBolt

PairTotalCountBolt 拓扑用于计算商品对的总数。每当接收到一个商品对元组时,拓扑会增加一个计数器,记录商品对的总数量。

1.4 SupportComputeBolt

SupportComputeBolt 拓扑负责计算商品对的支持度。支持度的计算方法是根据商品对出现的次数除以商品对总数。

1.5 ConfidenceComputeBolt

ConfidenceComputeBolt 拓扑用于计算商品对的置信度。置信度的计算基于Redis存储的商品对出现次数,通过从Redis中获取商品对的单品出现次数,计算出最终的置信度值。

1.6 FilterBolt

FilterBolt 拓扑负责对符合一定支持度和置信度的商品对进行过滤并存入Redis。具体实现是根据支持度和置信度阈值对商品对进行筛选,符合条件的商品对存入Redis数据库。

二、SplitBolt实现

package com.hust.grid.leesf.ordertest.bolt;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.hust.grid.leesf.ordertest.common.FieldNames;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 对订单中的商品进行两两组合并发送
*
* @author leesf
*/
public class SplitBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private Map
> orderItems;
public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
orderItems = new HashMap
>();
}
public void execute(Tuple tuple) {
String id = tuple.getStringByField(FieldNames.ID);
String newItem = tuple.getStringByField(FieldNames.NAME);
if (!orderItems.containsKey(id)) {
ArrayList
items = new ArrayList
();
items.add(newItem);
orderItems.put(id, items);
return;
}
List
items = orderItems.get(id);
for (String existItem : items) {
collector.emit(createPair(newItem, existItem));
}
items.add(newItem);
}
private Values createPair(String item1, String item2) {
if (item1.compareTo(item2) > 0) {
return new Values(item1, item2);
}
return new Values(item2, item1);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2));
}
}

三、PairCountBolt实现

package com.hust.grid.leesf.ordertest.bolt;
import java.util.HashMap;
import java.util.Map;
import com.hust.grid.leesf.ordertest.common.FieldNames;
import com.hust.grid.leesf.ordertest.common.ItemPair;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 计算商品对出现的次数
*
* @author leesf
*/
public class PairCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private Map
pairCounts;
public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
pairCounts = new HashMap
();
}
public void execute(Tuple tuple) {
String item1 = tuple.getStringByField(FieldNames.ITEM1);
String item2 = tuple.getStringByField(FieldNames.ITEM2);
ItemPair itemPair = new ItemPair(item1, item2);
int pairCount = 0;
if (pairCounts.containsKey(itemPair)) {
pairCount = pairCounts.get(itemPair);
}
pairCount++;
pairCounts.put(itemPair, pairCount);
collector.emit(new Values(item1, item2, pairCount));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.PAIR_COUNT));
}
}

四、PairTotalCountBolt实现

package com.hust.grid.leesf.ordertest.bolt;
import java.util.Map;
import com.hust.grid.leesf.ordertest.common.FieldNames;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 计算商品对总数
*
* @author leesf
*/
public class PairTotalCountBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private int totalCount;
public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
totalCount = 0;
}
public void execute(Tuple tuple) {
totalCount++;
collector.emit(new Values(totalCount));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.TOTAL_COUNT));
}
}

五、SupportComputeBolt实现

package com.hust.grid.leesf.ordertest.bolt;
import java.util.HashMap;
import java.util.Map;
import com.hust.grid.leesf.ordertest.common.FieldNames;
import com.hust.grid.leesf.ordertest.common.ItemPair;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
/**
* 计算商品对的支持度
*
* @author leesf
*/
public class SupportComputeBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private Map
pairCounts;
private int pairTotalCount;
public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
pairCounts = new HashMap
();
pairTotalCount = 0;
}
public void execute(Tuple tuple) {
if (tuple.getFields().get(0).equals(FieldNames.TOTAL_COUNT)) {
pairTotalCount = tuple.getIntegerByField(FieldNames.TOTAL_COUNT);
} else if (tuple.getFields().size() == 3) {
String item1 = tuple.getStringByField(FieldNames.ITEM1);
String item2 = tuple.getStringByField(FieldNames.ITEM2);
int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT);
pairCounts.put(new ItemPair(item1, item2), pairCount);
} else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) {
for (ItemPair itemPair : pairCounts.keySet()) {
double itemSupport = (double) (pairCounts.get(itemPair).intValue()) / pairTotalCount;
collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemSupport));
}
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT));
}
}

六、ConfidenceComputeBolt实现

package com.hust.grid.leesf.ordertest.bolt;
import java.util.HashMap;
import java.util.Map;
import com.hust.grid.leesf.ordertest.common.FieldNames;
import com.hust.grid.leesf.ordertest.common.ItemPair;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
/**
* 计算商品对的置信度
*
* @author leesf
*/
public class ConfidenceComputeBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private Map
pairCounts;
private String host;
private int port;
private Jedis jedis;
public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.host = conf.get("REDIS_HOST").toString();
this.port = Integer.parseInt(conf.get("REDIS_PORT").toString());
pairCounts = new HashMap
();
connectToRedis();
}
private void connectToRedis() {
jedis = new Jedis(host, port);
jedis.connect();
}
public void execute(Tuple tuple) {
if (tuple.getFields().size() == 3) {
String item1 = tuple.getStringByField(FieldNames.ITEM1);
String item2 = tuple.getStringByField(FieldNames.ITEM2);
int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT);
pairCounts.put(new ItemPair(item1, item2), pairCount);
} else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) {
for (ItemPair itemPair : pairCounts.keySet()) {
double item1Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem1()));
double item2Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem2()));
double itemConfidence = pairCounts.get(itemPair).intValue();
if (item1Count < item2Count) {
itemConfidence /= item1Count;
} else {
itemConfidence /= item2Count;
}
collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemConfidence));
}
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.CONFIDENCE));
}
}

七、FilterBolt实现

package com.hust.grid.leesf.ordertest.bolt;
import java.util.Map;
import org.json.simple.JSONObject;
import com.hust.grid.leesf.ordertest.common.ConfKeys;
import com.hust.grid.leesf.ordertest.common.FieldNames;
import com.hust.grid.leesf.ordertest.common.ItemPair;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
/**
* 过滤符合条件的商品对并存入redis
*
* @author leesf
*/
public class FilterBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final double SUPPORT_THRESHOLD = 0.01;
private static final double CONFIDENCE_THRESHOLD = 0.01;
private OutputCollector collector;
private Jedis jedis;
private String host;
private int port;
public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.host = conf.get(ConfKeys.REDIS_HOST).toString();
this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString());
connectToRedis();
}
private void connectToRedis() {
jedis = new Jedis(host, port);
jedis.connect();
}
@SuppressWarnings("unchecked")
public void execute(Tuple tuple) {
String item1 = tuple.getStringByField(FieldNames.ITEM1);
String item2 = tuple.getStringByField(FieldNames.ITEM2);
ItemPair itemPair = new ItemPair(item1, item2);
String pairString = itemPair.toString();
double support = 0;
double confidence = 0;
if (tuple.getFields().get(2).equals(FieldNames.SUPPORT)) {
support = tuple.getDoubleByField(FieldNames.SUPPORT);
jedis.hset("supports", pairString, String.valueOf(support));
} else if (tuple.getFields().get(2).equals(FieldNames.CONFIDENCE)) {
confidence = tuple.getDoubleByField(FieldNames.CONFIDENCE);
jedis.hset("confidences", pairString, String.valueOf(confidence));
}
if (!jedis.hexists("supports", pairString) || !jedis.hexists("confidences", pairString)) {
return;
}
support = Double.parseDouble(jedis.hget("supports", pairString));
confidence = Double.parseDouble(jedis.hget("confidences", pairString));
if (support >= SUPPORT_THRESHOLD && confidence >= CONFIDENCE_THRESHOLD) {
JSONObject pairValue = new JSONObject();
pairValue.put(FieldNames.SUPPORT, support);
pairValue.put(FieldNames.CONFIDENCE, confidence);
jedis.hset("recommendedPairs", pairString, pairValue.toJSONString());
collector.emit(new Values(item1, item2, support, confidence));
} else {
jedis.hdel("recommendedPairs", pairString);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT, FieldNames.CONFIDENCE));
}
}

八、参考

以上内容详细介绍了Storm框架中基于Bolt拓扑的设计方案及其实现。每个Bolt组件都有详细的代码解释和功能描述,涵盖了从商品对的组合到支持度和置信度的计算,以及最终的过滤和存储操作。

转载地址:http://fiej.baihongyu.com/

你可能感兴趣的文章
nodejs常用组件
查看>>
nodejs开发公众号报错 40164,白名单配置找不到,竟然是这个原因
查看>>
Nodejs异步回调的处理方法总结
查看>>
NodeJS报错 Fatal error: ENOSPC: System limit for number of file watchers reached, watch ‘...path...‘
查看>>
Nodejs教程09:实现一个带接口请求的简单服务器
查看>>
nodejs服务端实现post请求
查看>>
nodejs框架,原理,组件,核心,跟npm和vue的关系
查看>>
Nodejs概览: 思维导图、核心技术、应用场景
查看>>
nodejs模块——fs模块
查看>>
Nodejs模块、自定义模块、CommonJs的概念和使用
查看>>
nodejs生成多层目录和生成文件的通用方法
查看>>
nodejs端口被占用原因及解决方案
查看>>
Nodejs简介以及Windows上安装Nodejs
查看>>
nodejs系列之express
查看>>
nodejs系列之Koa2
查看>>
Nodejs连接mysql
查看>>
nodejs连接mysql
查看>>
NodeJs连接Oracle数据库
查看>>
nodejs配置express服务器,运行自动打开浏览器
查看>>
NodeMCU教程 http请求获取Json中文乱码解决方案
查看>>