博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
商品订单频繁项集Bolt实现
阅读量:177 次
发布时间:2019-02-28

本文共 13767 字,大约阅读时间需要 45 分钟。

一 设计方案介绍
SplitBolt:对订单中的商品进行两两组合并发送
PairCountBolt:计算商品对出现的次数
PairTotalCountBolt:计算商品对总数
SupportComputeBolt:计算商品对的支持度
ConfidenceComputeBolt:计算商品对的置信度
FilterBolt:过滤符合条件的商品对并存入redis
二 SplitBolt实现
package com.hust.grid.leesf.ordertest.bolt;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/** * 对订单中的商品进行两两组合并发送 * * @author leesf * */public class SplitBolt extends BaseRichBolt {     private static final long serialVersionUID = 1L;     private OutputCollector collector;     private Map
> orderItems; // 存储订单及其商品 public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; orderItems = new HashMap
>(); } public void execute(Tuple tuple) { // 获取订单号和商品名称 String id = tuple.getStringByField(FieldNames.ID); String newItem = tuple.getStringByField(FieldNames.NAME); if (!orderItems.containsKey(id)) { // 不包含该订单 // 新生商品链表 ArrayList
items = new ArrayList
(); // 添加商品 items.add(newItem); orderItems.put(id, items); return; } // 包含订单,取出订单中包含的商品 List
items = orderItems.get(id); for (String existItem : items) { // 遍历商品 // 将元组中提取的商品与订单中已存在的商品组合后发射 collector.emit(createPair(newItem, existItem)); } // 添加新的商品 items.add(newItem); } private Values createPair(String item1, String item2) { // 按照指定顺序生成商品对 if (item1.compareTo(item2) > 0) { return new Values(item1, item2); } return new Values(item2, item1); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 声明元组字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2)); }}
三 PairCountBolt实现
package com.hust.grid.leesf.ordertest.bolt;import java.util.HashMap;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import com.hust.grid.leesf.ordertest.common.ItemPair;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/*** 计算商品对出现的次数** @author leesf**/public class PairCountBolt extends BaseRichBolt {    private static final long serialVersionUID = 1L;    private OutputCollector collector;    private Map
pairCounts; // 存储商品对及其出现的次数 public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.pairCounts = new HashMap
(); } public void execute(Tuple tuple) { String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); ItemPair itemPair = new ItemPair(item1, item2); int pairCount = 0; if (pairCounts.containsKey(itemPair)) { // 包含商品对 // 取出商品对出现的次数 pairCount = pairCounts.get(itemPair); } // 更新出现次数 pairCount++; pairCounts.put(itemPair, pairCount); collector.emit(new Values(item1, item2, pairCount)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 声明元组字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.PAIR_COUNT)); }}
四 PairTotalCountBolt实现
package com.hust.grid.leesf.ordertest.bolt;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/*** 计算商品对总数** @author leesf**/public class PairTotalCountBolt extends BaseRichBolt {    private static final long serialVersionUID = 1L;    private OutputCollector collector;    private int totalCount; // 商品对总数    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {        this.collector = collector;        totalCount = 0;    }    public void execute(Tuple tuple) {        totalCount++; // 每收到一个元组,便增加商品对总数        collector.emit(new Values(totalCount)); // 发射商品对总数    }    public void declareOutputFields(OutputFieldsDeclarer declarer) {        // 声明元组字段        declarer.declare(new Fields(FieldNames.TOTAL_COUNT));    }}
五 SupportComputeBolt实现
package com.hust.grid.leesf.ordertest.bolt;import java.util.HashMap;import java.util.Map;import com.hust.grid.leesf.ordertest.common.FieldNames;import com.hust.grid.leesf.ordertest.common.ItemPair;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;/*** 计算商品对的支持度** @author leesf**/public class SupportComputeBolt extends BaseRichBolt {    private static final long serialVersionUID = 1L;    private OutputCollector collector;    private Map
pairCounts; // 存储商品对及其出现的次数 private int pairTotalCount; // 商品对总数 public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; pairCounts = new HashMap
(); pairTotalCount = 0; } /** * 由于SupportComputeBolt订阅了多个流,其需要根据不同的字段做出不同的行为 */ public void execute(Tuple tuple) { if (tuple.getFields().get(0).equals(FieldNames.TOTAL_COUNT)) { // 对应PairTotalCountBolt // 取出商品对总数量 pairTotalCount = tuple.getIntegerByField(FieldNames.TOTAL_COUNT); } else if (tuple.getFields().size() == 3) { // 对应PairCountBolt // 取出商品及其商品对出现的次数 String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT); // 存储商品对及其次数 pairCounts.put(new ItemPair(item1, item2), pairCount); } else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) { // 对应CommandSpout for (ItemPair itemPair : pairCounts.keySet()) { // 遍历商品对 // 计算商品支持度,使用商品对出现的次数除以商品对总数量 double itemSupport = (double) (pairCounts.get(itemPair).intValue()) / pairTotalCount; collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemSupport)); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 定义元组字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT)); }}
六 ConfidenceComputeBolt实现
package com.hust.grid.leesf.ordertest.bolt;import java.util.HashMap;import java.util.Map;import com.hust.grid.leesf.ordertest.common.ConfKeys;import com.hust.grid.leesf.ordertest.common.FieldNames;import com.hust.grid.leesf.ordertest.common.ItemPair;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import redis.clients.jedis.Jedis;/*** 计算商品对的置信度** @author leesf*/public class ConfidenceComputeBolt extends BaseRichBolt {    private static final long serialVersionUID = 1L;    private OutputCollector collector;    private Map
pairCounts; // 存储商品对及其出现的次数 private String host; private int port; private Jedis jedis; public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.host = conf.get(ConfKeys.REDIS_HOST).toString(); this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString()); pairCounts = new HashMap
(); connectToRedis(); } private void connectToRedis() { jedis = new Jedis(host, port); jedis.connect(); } /** * 由于ConfidenceComputeBolt订阅了多个流,其需要根据元组不同的字段做出不同的行为 */ public void execute(Tuple tuple) { if (tuple.getFields().size() == 3) { // 对应PairCountBolt // 取出商品对及其出现次数 String item1 = tuple.getStringByField(FieldNames.ITEM1); String item2 = tuple.getStringByField(FieldNames.ITEM2); int pairCount = tuple.getIntegerByField(FieldNames.PAIR_COUNT); pairCounts.put(new ItemPair(item1, item2), pairCount); } else if (tuple.getFields().get(0).equals(FieldNames.COMMAND)) { // 对应CommandSpout,需要进行统计 for (ItemPair itemPair : pairCounts.keySet()) { // 遍历商品对 // 从redis中取出商品对中商品出现的次数 double item1Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem1())); double item2Count = Integer.parseInt(jedis.hget("itemCounts", itemPair.getItem2())); double itemConfidence = pairCounts.get(itemPair).intValue(); // 计算商品对置信度 if (item1Count < item2Count) { itemConfidence /= item1Count; } else { itemConfidence /= item2Count; } collector.emit(new Values(itemPair.getItem1(), itemPair.getItem2(), itemConfidence)); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // 声明元组字段 declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.CONFIDENCE)); }}
七 FilterBolt实现
package com.hust.grid.leesf.ordertest.bolt;import java.util.Map;import org.json.simple.JSONObject;import com.hust.grid.leesf.ordertest.common.ConfKeys;import com.hust.grid.leesf.ordertest.common.FieldNames;import com.hust.grid.leesf.ordertest.common.ItemPair;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import redis.clients.jedis.Jedis;/*** 过滤符合条件的商品对并存入redis** @author leesf**/public class FilterBolt extends BaseRichBolt {    private static final long serialVersionUID = 1L;    // 商品对的支持度和置信度    private static final double SUPPORT_THRESHOLD = 0.01;    private static final double CONFIDENCE_THRESHOLD = 0.01;    private OutputCollector collector;    private Jedis jedis;    private String host;    private int port;    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {        this.collector = collector;        this.host = conf.get(ConfKeys.REDIS_HOST).toString();        this.port = Integer.parseInt(conf.get(ConfKeys.REDIS_PORT).toString());        connectToRedis();    }    private void connectToRedis() {        jedis = new Jedis(host, port);        jedis.connect();    }    @SuppressWarnings("unchecked")    public void execute(Tuple tuple) {        // 取出商品并构造商品对        String item1 = tuple.getStringByField(FieldNames.ITEM1);        String item2 = tuple.getStringByField(FieldNames.ITEM2);        ItemPair itemPair = new ItemPair(item1, item2);        String pairString = itemPair.toString();        double support = 0;        double confidence = 0;        if (tuple.getFields().get(2).equals(FieldNames.SUPPORT)) { // 对应SupportComputeBolt            // 获取支持度并存入redis            support = tuple.getDoubleByField(FieldNames.SUPPORT);            jedis.hset("supports", pairString, String.valueOf(support));        } else if (tuple.getFields().get(2).equals(FieldNames.CONFIDENCE)) { // 对应ConfidenceComputeBolt            // 获取置信度并存入redis            confidence = tuple.getDoubleByField(FieldNames.CONFIDENCE);            jedis.hset("confidences", pairString, String.valueOf(confidence));        }        if (!jedis.hexists("supports", pairString) || !jedis.hexists("confidences", pairString)) { // 商品对的支持度和置信度还未计算完成,返回            return;        }        // 商品对的支持度和置信度已经计算完成        support = Double.parseDouble(jedis.hget("supports", pairString));        confidence = Double.parseDouble(jedis.hget("confidences", pairString));        if (support >= SUPPORT_THRESHOLD && confidence >= CONFIDENCE_THRESHOLD) { // 支持度和置信度超过阈值            // 将该商品对信息存入redis中            JSONObject pairValue = new JSONObject();            pairValue.put(FieldNames.SUPPORT, support);            pairValue.put(FieldNames.CONFIDENCE, confidence);            jedis.hset("recommendedPairs", pairString, pairValue.toJSONString());            collector.emit(new Values(item1, item2, support, confidence));        } else { // 不高于阈值,则从redis中删除            jedis.hdel("recommendedPairs", pairString);        }    }    public void declareOutputFields(OutputFieldsDeclarer declarer) {        // 声明元组字段        declarer.declare(new Fields(FieldNames.ITEM1, FieldNames.ITEM2, FieldNames.SUPPORT, FieldNames.CONFIDENCE));    }}
八 参考

转载地址:http://fiej.baihongyu.com/

你可能感兴趣的文章