本文共 13351 字,大约阅读时间需要 44 分钟。
本文将详细介绍Storm框架中基于Bolt拓扑的设计方案,并对各个Bolt组件的实现进行分析。
在本项目中,我们设计了一个基于Storm框架的数据处理pipeline,主要用于处理订单中的商品数据。系统通过多个Bolt拓扑组件,完成商品对的组合、计数、支持度计算以及最终的过滤和存储操作。
SplitBolt 拓扑负责对订单中的商品进行两两组合并发送。其核心逻辑是将订单中的商品与新加入的商品进行配对,并将结果发射到下游流。
PairCountBolt 拓扑用于计算商品对的出现次数。通过接收SplitBolt发射的商品对信息,统计每一对商品的组合次数。
PairTotalCountBolt 拓扑用于计算商品对的总数。每当接收到一个商品对元组时,拓扑会增加一个计数器,记录商品对的总数量。
SupportComputeBolt 拓扑负责计算商品对的支持度。支持度的计算方法是根据商品对出现的次数除以商品对总数。
ConfidenceComputeBolt 拓扑用于计算商品对的置信度。置信度的计算基于Redis存储的商品对出现次数,通过从Redis中获取商品对的单品出现次数,计算出最终的置信度值。
FilterBolt 拓扑负责对符合一定支持度和置信度的商品对进行过滤并存入Redis。具体实现是根据支持度和置信度阈值对商品对进行筛选,符合条件的商品对存入Redis数据库。
package com.hust.grid.leesf.ordertest.bolt;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/** * 对订单中的商品进行两两组合并发送 * * @author leesf */public class SplitBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Map > orderItems; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; orderItems = new HashMap >(); } public void execute(Tuple tuple) { String id = tuple.getStringByField(FieldNames.ID); String newItem = tuple.getStringByField(FieldNames.NAME); if (!orderItems.containsKey(id)) { ArrayList items = new ArrayList (); items.add(newItem); orderItems.put(id, items); return; } List items = orderItems.get(id); for (String existItem : items) { collector.emit(createPair(newItem, existItem)); } items.add(newItem); } private Values createPair(String item1, String item2) { if (item1.compareTo(item2) > 0) { return new Values(item1, item2); } return new Values(item2, item1); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2)); }} package com.hust.grid.leesf.ordertest.bolt;import java.util.HashMap;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import com.hust.grid.leesf.ordertest.common.ItemPair;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/** * 计算商品对出现的次数 * * @author leesf */public class PairCountBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Map pairCounts; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; pairCounts = new HashMap (); } public void execute(Tuple tuple) { String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); ItemPair itemPair = new ItemPair(item1, item2); int pairCount = 0; if (pairCounts.containsKey(itemPair)) { pairCount = pairCounts.get(itemPair); } pairCount++; pairCounts.put(itemPair, pairCount); collector.emit(new Values(item1, item2, pairCount)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.PAIR_COUNT)); }} package com.hust.grid.leesf.ordertest.bolt;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/** * 计算商品对总数 * * @author leesf */public class PairTotalCountBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private int totalCount; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; totalCount = 0; } public void execute(Tuple tuple) { totalCount++; collector.emit(new Values(totalCount)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FieldNames.TOTAL_COUNT)); }} package com.hust.grid.leesf.ordertest.bolt;import java.util.HashMap;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import com.hust.grid.leesf.ordertest.common.ItemPair;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/** * 计算商品对的支持度 * * @author leesf */public class SupportComputeBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Map pairCounts; private int pairTotalCount; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; pairCounts = new HashMap (); pairTotalCount = 0; } public void execute(Tuple tuple) { if (tuple.getFields().get(0).equals(FieldNames.TOTAL_COUNT)) { pairTotalCount = tuple.getIntegerByField(FieldNames.TOTAL_COUNT); } else if (tuple.getFields().size() == 3) { String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT); pairCounts.put(new ItemPair(item1, item2), pairCount); } else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) { for (ItemPair itemPair : pairCounts.keySet()) { double itemSupport = (double) (pairCounts.get(itemPair).intValue()) / pairTotalCount; collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemSupport)); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT)); }} package com.hust.grid.leesf.ordertest.bolt;import java.util.HashMap;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import com.hust.grid.leesf.ordertest.common.ItemPair;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import redis.clients.jedis.Jedis;/** * 计算商品对的置信度 * * @author leesf */public class ConfidenceComputeBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private OutputCollector collector; private Map pairCounts; private String host; private int port; private Jedis jedis; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.host = conf.get("REDIS_HOST").toString(); this.port = Integer.parseInt(conf.get("REDIS_PORT").toString()); pairCounts = new HashMap (); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } public void execute(Tuple tuple) { if (tuple.getFields().size() == 3) { String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT); pairCounts.put(new ItemPair(item1, item2), pairCount); } else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) { for (ItemPair itemPair : pairCounts.keySet()) { double item1Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem1())); double item2Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem2())); double itemConfidence = pairCounts.get(itemPair).intValue(); if (item1Count < item2Count) { itemConfidence /= item1Count; } else { itemConfidence /= item2Count; } collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemConfidence)); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.CONFIDENCE)); }} package com.hust.grid.leesf.ordertest.bolt;import java.util.Map;import org.json.simple.JSONObject;import com.hust.grid.leesf.ordertest.common.ConfKeys;import com.hust.grid.leesf.ordertest.common.FieldNames;import com.hust.grid.leesf.ordertest.common.ItemPair;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import redis.clients.jedis.Jedis;/** * 过滤符合条件的商品对并存入redis * * @author leesf */public class FilterBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private static final double SUPPORT_THRESHOLD = 0.01; private static final double CONFIDENCE_THRESHOLD = 0.01; private OutputCollector collector; private Jedis jedis; private String host; private int port; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.host = conf.get(ConfKeys.REDIS_HOST).toString(); this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString()); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } @SuppressWarnings("unchecked") public void execute(Tuple tuple) { String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); ItemPair itemPair = new ItemPair(item1, item2); String pairString = itemPair.toString(); double support = 0; double confidence = 0; if (tuple.getFields().get(2).equals(FieldNames.SUPPORT)) { support = tuple.getDoubleByField(FieldNames.SUPPORT); jedis.hset("supports", pairString, String.valueOf(support)); } else if (tuple.getFields().get(2).equals(FieldNames.CONFIDENCE)) { confidence = tuple.getDoubleByField(FieldNames.CONFIDENCE); jedis.hset("confidences", pairString, String.valueOf(confidence)); } if (!jedis.hexists("supports", pairString) || !jedis.hexists("confidences", pairString)) { return; } support = Double.parseDouble(jedis.hget("supports", pairString)); confidence = Double.parseDouble(jedis.hget("confidences", pairString)); if (support >= SUPPORT_THRESHOLD && confidence >= CONFIDENCE_THRESHOLD) { JSONObject pairValue = new JSONObject(); pairValue.put(FieldNames.SUPPORT, support); pairValue.put(FieldNames.CONFIDENCE, confidence); jedis.hset("recommendedPairs", pairString, pairValue.toJSONString()); collector.emit(new Values(item1, item2, support, confidence)); } else { jedis.hdel("recommendedPairs", pairString); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT, FieldNames.CONFIDENCE)); }} 以上内容详细介绍了Storm框架中基于Bolt拓扑的设计方案及其实现。每个Bolt组件都有详细的代码解释和功能描述,涵盖了从商品对的组合到支持度和置信度的计算,以及最终的过滤和存储操作。
转载地址:http://fiej.baihongyu.com/