学习笔记Flink—— 基于Flink 在线交易反欺诈检测

Posted 别呀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了学习笔记Flink—— 基于Flink 在线交易反欺诈检测相关的知识,希望对你有一定的参考价值。

一、背景介绍

信用卡欺诈
信用卡欺诈是指故意使用伪造、作废的信用卡,冒用他人的信用卡骗取财物,或用本人信用卡进行恶意透支的行为。在当今数字时代,信用卡欺诈行为越来越被重视。
罪犯可以通过诈骗或者入侵安全级别较低系统来盗窃信用卡卡号。 用盗得的信用卡进行很小额度的消费进行测试。 如果测试消费成功,那么他们就会用这个信用卡进行大笔消费。

信用卡欺诈行为

交易3和交易4应该被标记为欺诈行为,因为交易3是一个100¥的小额交易,而紧随着的交易4是一个10000¥的大额交易。

另外,交易5、6和交易7就不属于欺诈交易了,因为在交易5这个500¥的小额交易之后,并没有跟随一个大额交易,而是一个金额适中的交易,这使得交易5到交易7不属于欺诈行为。


二、架构设计

架构设计

数据流设计

数据流落地实现


三、Kafka信用卡消费数据

3.1、Kafka Producer

模拟Kafka Producer定时生成消费数据

TransactionData.java:

package fraud_detection;

public class TransactionData 
    private String user;
    private double money;

    public TransactionData()
    public TransactionData(String user,double money)
        this.user=user;
        this.money=money;
    

    @Override
    public String toString()
        return this.user + "," + this.money;
    

TransactionDataGenerator.java:

package fraud_detection;

import java.util.Random;

public class TransactionDataGenerator 
    public static final  int USER_SIZE = 10;
    public static final float BIG_MONEY_PERCENT = 0.02f;
    static Random random = new Random();
    public static TransactionData getData()
        return new TransactionData(generateUser() ,  generateMoney()) ;
    
    private static String generateUser()
        return "user_"+random.nextInt(USER_SIZE);
    
    private static float generateMoney()
        float i = random.nextFloat();
        if( i > BIG_MONEY_PERCENT)
            return random.nextFloat() * 1000;
        else
            return i * 10000000;
        
    

    public static void main(String[] args)
        TransactionData data = null;
        for(int i = 10000 ;i >0 ; i--)
            data = getData();
            System.out.println(data);
        
    

TransactionDataProducer.java:

package fraud_detection;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
import java.util.Map;
public class TransactionDataProducer 
    public static void main(String[] args) throws InterruptedException 
        String topic = "fraud00";
        Map<String,Object> kafkaProperties = new HashMap<>();
        kafkaProperties.put("bootstrap.servers","node100:9092,node101:9092,node102:9092");
        kafkaProperties.put("acks", "all");
        kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(kafkaProperties);
        int size = 30*1000/10;
        long interval = 10L;
        String data = "";
        for (int i = 0; i < size; i++) 
            Thread.sleep(interval);
            data= TransactionDataGenerator.getData().toString();
            producer.send(new ProducerRecord<>(topic, data));
        
        producer.close();
        System.out.println("消息发送完成!");
    


3.2、整合Kafka Transaction数据

FraudDetection.scala:

package flink_kafka
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
object FraudDetection 
  def main(args: Array[String]): Unit = 
    val topic = "fraud00"
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "node100:9092")
    properties.setProperty("group.id", "test")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
      .addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties))
    stream.map(x => val data = x.split(","); (data(0).trim, data(1).trim.toDouble))
      .keyBy(0)
      .mapWithState[(String, Double), Double]((in: (String, Double), state: Option[Double]) =>
        state match 
          case None => (("", 0.0), Some(in._2))
          case Some(previous) => if (in._2 > 10000.0 && previous < 1000.0)
            ((in._1 + "->" + previous, in._2), Some(in._2)) else (("", 0.0), Some(in._2))
        )
      .filter(x => x._2 > 0.0)
      .print()
    env.execute("Fraud Detection")
  

测试:

① 先创建fraud00 话题

将产生的数据存到/tmp目录下(了解)

② 运行FraudDetection:

③ 运行TransactionDataProducer

结果:


以上是关于学习笔记Flink—— 基于Flink 在线交易反欺诈检测的主要内容,如果未能解决你的问题,请参考以下文章

学习笔记Flink—— 基于Flink 在线交易反欺诈检测

学习笔记Flink—— 基于Flink 在线交易反欺诈检测

Flink学习笔记

Flink零基础学习笔记:基础概念

17.Flink--练习--双十一实时交易大屏需求数据实现步骤代码实现

17.Flink--练习--双十一实时交易大屏需求数据实现步骤代码实现