springboot 集成storm的redis

Posted flyyu1

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot 集成storm的redis相关的知识,希望对你有一定的参考价值。

springboot 集成storm,计算日志中的展示信息,将实时的计算数据存储到redis中,并判断redis中的数量信息进行下一步的操作,存储到mysql中等

1.配置redis参数,redis采用集群模式,需要配置redis集群

spring:
redis:
database: 0
password:
cluster:
nodes:
- 127.0.0.1:6380
maxRedirects: 3
pool:
max-idle: 8
min-idle: 0
max-active: 8
max-wait: -1

2.redis配置类实现
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisConfig

@Autowired
private JedisConnectionFactory jedisConnectionFactory;

@Bean("redisTemplate")
public RedisTemplate<?, ?> getRedisTemplate()
RedisTemplate<?,?> template = new StringRedisTemplate(jedisConnectionFactory);
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(genericJackson2JsonRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(genericJackson2JsonRedisSerializer);
return template;





public class RedisConfUtils 

/**
* @link org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration#
* @param redisProperties
* @return
*/
public static RedisClusterConfiguration getClusterConfiguration(RedisProperties redisProperties)
if (redisProperties.getCluster() == null)
return null;

RedisProperties.Cluster clusterProperties = redisProperties.getCluster();
RedisClusterConfiguration config = new RedisClusterConfiguration(
clusterProperties.getNodes());

if (clusterProperties.getMaxRedirects() != null)
config.setMaxRedirects(clusterProperties.getMaxRedirects());

return config;


public static RedisTemplate buildRedisTemplate(byte[] redisProperties)
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(
RedisConfUtils.getClusterConfiguration(
(RedisProperties) Serializer.INSTANCE.deserialize(redisProperties)));
RedisTemplate<String, Long> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
jedisConnectionFactory.afterPropertiesSet();

GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;

3.storm 日志控制Builder配置redis信息,将redis信息传递到控制类中
@Getter
@Setter
@Configuration
@DependsOn("redisTemplate")
@ConfigurationProperties(prefix = "storm.bolt.logConsoleBolt")
public class LogConsoleBoltBuilder extends BoltBuilder
@Autowired
private RedisProperties redisProperties;
private int emitFrequencyInSeconds = 60;//每60s发射一次数据

@Bean("logConsoleBolt")
public LogConsoleBolt buildBolt()
super.setId("logConsoleBolt");
LogConsoleBolt logConsoleBolt = new LogConsoleBolt();
logConsoleBolt.setRedisProperties(Serializer.INSTANCE.serialize(redisProperties));
logConsoleBolt.setEmitFrequencyInSeconds(emitFrequencyInSeconds);
return logConsoleBolt;



4.storm 日志控制类获取实例化redis信息,将计算得到的信息存储到redis中
@Slf4j
public class LogConsoleBolt extends BaseRichBolt
private final static String AD_LIST_SHOW_COUNT = "AD_LIST_SHOW_COUNT";
private OutputCollector collector;
private HashOperations<String, String, Long> hashOperations;
@Setter
private byte[] redisProperties;
@Setter
private int emitFrequencyInSeconds;

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
this.collector=collector;
this.hashOperations = RedisConfUtils.buildRedisTemplate(redisProperties).opsForHash();

@Override
public Map<String, Object> getComponentConfiguration()
Map<String, Object> conf = new HashMap<String, Object>();
/**
* 这里配置TickTuple的发送频率
*/
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;


@Override
public void execute(Tuple input)
try
log.info(input.toString());
//判断日志类型,不是需要的日志则不做处理
if (input.size()<5)
collector.ack(input);
else
String value = input.getStringByField("value").toString();

AdShowLogEntity adShowLogEntity = AdShowLogEntity.logToEntity(value);
if (adShowLogEntity != null)
AdShowLogEntity.Message msg = adShowLogEntity.getMessage().get(0);
// 输出
// collector.emit(new Values(LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), String.valueOf(1)));
            //存储信息到redis
Long cont = hashOperations.increment(AD_LIST_SHOW_COUNT, LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), 1l);
collector.emit(new Values(Integer.parseInt(msg.getCreativeId()),System.currentTimeMillis(),0.01f));
else
// collector.ack(input);

collector.ack(input);
// System.out.println("received from kafka : "+ value);
// 必须ack,否则会重复消费kafka中的消息



catch (Exception e)
e.printStackTrace();
collector.fail(input);





@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("adId","updateTime","price")); //分词定义的field为word



以上是关于springboot 集成storm的redis的主要内容,如果未能解决你的问题,请参考以下文章

Storm框架:Storm整合springboot

Storm 系列—— Storm 集成 Redis 详解

SpringBoot整合Kafka和Storm

SpringBoot整合Kafka和Storm

Storm集成Siddhi

5Storm集成Kafka