打怪升级之FPGA组成原理(LE部分)

Posted 考琪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了打怪升级之FPGA组成原理(LE部分)相关的知识,希望对你有一定的参考价值。

FPGA芯片逻辑单元的原理

不论你使用哪一款FPGA芯片,其核心可编程逻辑单元都是从一段内存种按顺序读取执行并执行的过程。具体来说,FOGA芯片内部包括可编程逻辑块(LAB)、可配置输入输出单元(IOE)、时钟管理模块、嵌入式RAM(BRAN,在Cyclone IV中是M9K)、丰富的布线资源、内嵌和底层功能单元和嵌入式专用硬核等。

FPGA通过查找表来等效实现所有可能的门电路。一个查找表可以实现与门、或门、非门、加减法等各种组合逻辑。查找表可以直接由Quartus综合出来,可以实现Verilog里的各种语法。比如下面这段代码:

module top(
    input   A,
    input   B,
    input   C,
    input   D,
    output  dout
    );
    assign dout = A | B | C | D;
 endmodule

就可以实现一个与门结构。我们可以综合出这样的电路,在Quartus中看它对应的RTL视图与Chip planner中的对应:


打开Chip planner找到上述资源位置:

双击进入资源分配界面:

这样的查找表就是最基本的结构了。一个查找表LUT的内部会有一个SRAM存储器。针对地址的不同,SRAM的值也会不同,这样就等效的实现了门电路结构。

对FPGA而言,由基本的查找表组成LE,由LE组成LAB,由LAB、MK9(内存)、DSP、I/O共同构成了整个芯片的资源分布:

上图就是Cyclone IV E芯片的资源分布,其中蓝色代表LAB资源,浅绿代表M9K内存资源,白色代表DSP资源,浅棕色代表I/O资源。

对XILIMX、ALTERA等厂家的FPGA进行编程,实际上就是把需要进行运算的结果计算出来并存储在SRAM中,输入发生相应变化时,将SRAM中的值输出。

对于一个可编程逻辑块(LAB)而言,包含了以下要素:

1.一个逻辑块包含了16个逻辑单元(LE);
2.一个LAB控制信号;
3.一个LE进位链;
4.一个寄存器链;
5.本地互联;


本地互联在同一个LAB和LE之间传输信号,寄存器链连接把一个LE寄存器的输出传输到LAB中相邻的LE寄存器上。

Cyclone IV 基本组成

Cyclone IV器件的M9K存储器模块都具有9Kbit的嵌入式SRAM存储器。可以把M9K模块配置成单端口、简单双端口RAM以及FIFO缓冲器或者ROM。

Cyclone IV器件的嵌入式乘法器可以在单一模块中实现一个18 X 18 或9 X 9 乘法器。Altera针对乘法器模块提供了一套DSP IP核。包括有限脉冲响应(FIR),快速傅里叶变换(FFT)和数字控制震荡器(NCO)功能。

Cyclone IV器件的I/O支持可编程总线保持、可编程上拉电阻、可编程延迟、可编程驱动能力以及可编程slew-rate控制,从而实现了信号完整性以及热插拔的优化。

Cyclone IV 器件支持符合单端 I/O 标准的校准后片上串行匹配 (Rs OCT) 或者驱动阻抗匹配 (Rs)。 在 Cyclone IV GX 器件中,高速收发器 I/O 位于器件的左侧。器件的顶部,底部及右侧可以实现通用用户 I/O。

Cyclone IV 器件包含了高达 30 个全局时钟 (GCLK) 网络以及高达 8 个 PLL (每个 PLL上均有五个输出端 ),以提供可靠的时钟管理与综合。您可以在用户模式中对Cyclone IV 器件 PLL 进行动态重配置来改变时钟频率或者相位。

Cyclone IV 器件支持位于器件顶部,底部和右侧的 SDR、 DDR, DDR2 SDRAM 和 QDRII SRAM 接口。Cyclone IV E 器件也支持这些接口位于器件左侧。接口可能位于器件的两个或多个侧面,以实现更灵活的电路板设计。Altera® DDR SDRAM 存储器接口解决方案由一个 PHY 接口和一个存储控制器组成。Altera 提供了 PHY IP,您可以将它与您自己定制的存储控制器或 Altera 提供的存储控制器一起使用。Cyclone IV 器件支持在 DDR和 DDR2 SDRAM 接口上使用纠错编码(ECC) 位。

Cyclone IV 器件使用 SRAM 单元存储配置数据。每次器件上电后,配置数据会被下载到Cyclone IV 器件中。低成本配置选项包括 Altera EPCS 系列串行闪存器件以及商用并行闪存配置选项。这些选项实现了通用应用程序的灵活性,并提供了满足特定配置以及应用程序唤醒时间要求的能力。

逻辑单元(LE)基本组成

一个LE单元由以下内容组成:

1.一个四输入查找表(LUT),以实现四种变量的任意功能;
2.一个可编程寄存器;
3.一个进位链连接;
4.一个寄存器连接;

实际上不止有这一种LE结构,但我们可以以此为典型进行分析:

一个LE单元包括LUT、控制信号逻辑、同步加载清零逻辑、可编程寄存器等。其中最核心的时LUT和可编程寄存器。可编程寄存器可以配置成D、JK、T或者SR触发器。

如果没有时序逻辑功能,则LE适用于一般的逻辑应用和组合功能;如果有计算需求,会利用到外置的DSP模块做计算。

打怪升级之小白的大数据之旅(五十)<MapReduce框架原理二:shuffle>

打怪升级之小白的大数据之旅(五十)

MapReduce框架原理二:shuffle

上次回顾

上一章。我们学习了Hadoop中MapReduce框架原理中的工作流程以及InputFormat模块,本章我为大家带来shuffle

shuffle机制

shuffle工作机制

  • 在上一章开篇MapReduce框架整体认知中,我们了解了MapReduce的模块构成,当时我只是提了一下shuffle,它的工作内容是map阶段的后半段以及reduce的前半段
  • Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle
    在这里插入图片描述

shuffle的工作流程如下:

  • 上面这张图,就是shuffle的工作内容,当我们的数据通过map方法之后,数据会进入到环形缓冲区中
  • 数据中包含了key,value和分区号,分区号=hashcode(key)%reduceTask的数量
  • 环形缓冲区底层就是数组,它里面会存储数据和索引,并预留20%的位置用于写入数据,其他80%位置它会将数据进行分区,然后排序
  • 分区就是在map进入环形缓冲区之前进行,会对数据分配一个分区号,环形缓冲区通过这个分区号进行分区,排序就是对该分区中的数据进行排序,排序是按照快速排序进行的
  • 当数据写满了80%就会对数据进行排序,排序后会将数据以文件形式写入到磁盘上
  • 接着会对磁盘上的文件进行合并,然后进行归并排序,因为前面排过序,所以速度会很快
  • 归并排序后的数据会交给reduce
  • resuce会将数据进行拷贝到内存中,如果内存不够就溢出写入到磁盘上,然后对每个map来的数据进行归并排序,按照相同的key进行分组
  • 分组之后就会调用reduce方法做下一步处理

partition分区

  • 在前面我们知道,数据通过map后,进入环形缓冲区时,会为数据分配分区号,进入环形缓冲区后,对数据进行分区
  • 默认分区是根据key的hashcode值然后对reduceTask数量进行取模得到的,用户没有办法控制key存储到哪个分区
  • 所以我们需要进行自定义分区

自定义分区

语法步骤:

  1. 自定义类继承自Partitioner,重写Partition方法

    public class CustomPartitioner extends Partitioner<key,value>{
    	@Override
    	public int getPartition(Text text, FlowBean flowBean, int i) {
            String phone = text.toString();
    }
    
  2. 在Job驱动中,设置自定义的Partitioner

    job.setPartitionerClass(MyFlowPartition.class);
    
  3. 自定义Partition后,还需要根据自定的分区逻辑设置相应数量的ReduceTask

     job.setNumReduceTasks(5);
    

Partition分区案例实操

  • 需求

    • 还是前面序列化的需求,统计下面数据中的上行、下行总流量
    • 然后将下面测试数据中的手机号按照136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中
  • 测试数据

    1	13736230513	192.196.100.1	www.company.com	2481	24681	200
    2	13846544121	192.196.100.2			264	0	200
    3 	13956435636	192.196.100.3			132	1512	200
    4 	13966251146	192.168.100.1			240	0	404
    5 	18271575951	192.168.100.2	www.company.com	1527	2106	200
    6 	84188413	192.168.100.3	www.atguigu.com	4116	1432	200
    7 	13590439668	192.168.100.4			1116	954	200
    8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
    9 	13729199489	192.168.100.6			240	0	200
    10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	200
    11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
    12 	15959002129	192.168.100.9	www.company.com	1938	180	500
    13 	13560439638	192.168.100.10			918	4938	200
    14 	13470253144	192.168.100.11			180	180	200
    15 	13682846555	192.168.100.12	www.qq.com	1938	2910	200
    16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
    17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
    18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
    19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
    20 	13768778790	192.168.100.17			120	120	200
    21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
    22 	13568436656	192.168.100.19			1116	954	200
    
  • 编写思路

    • 还是上一章中序列化的案例实操,统计流量,这个流程就不再赘述了,这个案例多了一个按手机号的分区,我们直接定义一个Paritioner类,然后在Driver中注册一下,并定义好ReduceTask数量就好了

编写代码如下:

  • FlowBean.java

    package com.company.partition1;
    
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /*
        自定义一个类并实现Hadoop的序列化框架
    
        步骤:
            1.自定义类并实现Writable接口
            2.重写wrtie和readFileds方法
            3.注意:读取数据时必须和写的顺序保持一致
     */
    public class FlowBean implements Writable {
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
        public FlowBean() {
        }
    
        public FlowBean(long upFlow, long downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        }
    
        /*
                序列化时调用的方法
             */
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        /*
            反序列化时调用的方法
         */
        public void readFields(DataInput in) throws IOException {
            //注意:读取数据时必须和写的顺序保持一致
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        @Override
        public String toString() {
            return upFlow + " " + downFlow + " " + sumFlow;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    }
    
    
  • FlowDriver.java

    package com.company.partition;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FlowDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = Job.getInstance(new Configuration());
    
            /*
                默认情况下 :  分区数量 = ReduceTask的数量 (正常情况下)
                            分区数量 < ReduceTask的数量 : 会浪费资源--多出来的ReduceTask没有数据可以处理
                            分区数量 > ReduceTask的数量 :会报错
             */
            //设置ReduceTask的数量
            job.setNumReduceTasks(5);
            //设置使用自定义的分区类
            job.setPartitionerClass(MyPartitioner.class);
    
            job.setMapperClass(FlowMapper.class);
            job.setReducerClass(FlowReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            FileInputFormat.setInputPaths(job,new Path("D:\\\\io\\\\input2"));
            FileOutputFormat.setOutputPath(job,new Path("D:\\\\io\\\\output2"));
            job.waitForCompletion(true);
        }
    }
    
    
  • FlowMapper.java

    package com.company.partition;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /*
        作用 :用来实现需要在MapTask中实现的功能
    
        Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>:
            两组:
                第一组:
                    KEYIN :读取数据的偏移量的类型
                    VALUEIN : 读取的一行一行的内容的类型
                第二组:
                     KEYOUT : 写出的Key的类型(在这指的是手机号)
                     VALUEOUT :写出的Value的类型(在这指的是存储流量数据的对象)
     */
    public class FlowMapper extends Mapper<LongWritable,Text, Text, FlowBean> {
        /**
         * 在map方法中去实现需要在MapTask中实现的功能
         * 注意:map方法在被循环调用(MR框架-MapTask程序)
         *      每调用一次就会传入一行内容
         * @param key :读取数据的偏移量
         * @param value :读取的一行一行的内容
         * @param context :上下文(在这用来通过上下文将K,V写出)
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //1.将内容进行切割
            String[] line = value.toString().split("\\t");
            //2.封装K,V
            Text outKey = new Text(line[1]);
            FlowBean outValue = new FlowBean(Long.parseLong(line[line.length - 3]),
                    Long.parseLong(line[line.length - 2]));
            //3.写出K,V
            context.write(outKey,outValue);
        }
    }
    
    
  • FlowReducer.java

    package com.company.partition;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    /*
        作用 :用来实现需要在RedcueTask中实现的功能
    
        Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
            两组:
                第一组:
                    KEYIN :读取的Key的类型(Mapper输出的Key的类型)
                    VALUEIN :读取的Value的类型(Mapper输出的Value的类型)
                第二组:
                    KEYOUT :输出的Key的类型(在这指的是单词)
                    VALUEOUT :输出的Value的类型(在这指的是单词的数量)
    
     */
    public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> {
    
        /**
         * 在reduce方法中去实现需要在ReduceTask中实现的功能
         * 注意::reduce方法在被循环调用(MR框架-ReduceTask程序)
         *      每调用一次就会传入一组数据(key值相同为一组)
         * @param key :读取的key值
         * @param values :一组数据中所有的value
         * @param context : 上下文(在这用来将K,V写出去)
         * @throws IOException
         * @throws InterruptedException
         *
         * key                 value
         * 153211111111     100 200 300
         * 153211111111     80  20  100
         * 153211111111     80  20  100
         */
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            long sumUpFlow = 0;//总上行流量
            long sumDowFlow = 0;//总下行流量
            //遍历所有的value
            for (FlowBean value : values) {
                sumUpFlow += value.getUpFlow();
                sumDowFlow += value.getDownFlow();
            }
            //2.封装K,V
            FlowBean outValue = new FlowBean(sumUpFlow, sumDowFlow);
            //3.写出K,V
            context.write(key,outValue);
        }
    }
    
  • MyPartitioner.java

    package com.company.partition;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    /*
    
        Partitioner<KEY, VALUE>
            KEY : map输出的key的类型
            VALUE : map输出的value的类型
     */
    public class MyPartitioner extends Partitioner<Text,FlowBean> {
        /**
         * 返回分区号
         * @param text  map输出的key
         * @param flowBean map输出的value
         * @param numPartitions ReduceTask的数量
         * @return
         */
        public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
    
            String phoneNumber = text.toString();
            //判断手机号
            if (phoneNumber.startsWith("136")){
                return 0;
            }else if (phoneNumber.startsWith("137")){
                return 1;
            }else if (phoneNumber.startsWith("138")){
                return 2;
            }else if (phoneNumber.startsWith("139")){
                return 3;
            }else{
                return 4;
            }
    
    
        }
    }
    
    

WritableComparable排序

  • 排序是MapReduce中最重要的操作之一
  • 具体的排序流程我在本章的开篇讲过了,MapTask和ReduceTask都会对数据按照key进行排序,默认的排序是按照字典的顺序进行排序,且实现排序的方法是快排
  • 排序的思想就是在Java学习map集合时,我们学习的comparable和comparator,忘了的小伙伴可以回看前面集合的内容哈
  • 在我们的实际需求中,同样需要自定义排序,自定义排序需要实现WritableComparable接口,下面我们再次根据手机流量的案例进行排序的使用

WritableComparable案例实操

在前面Partition分区的基础上,对数据按照手机的总流量进行排序,我们只需要修改一下我们的FlowBean即可

  • FlowBean.java
    package com.company.comparable;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /*
        自定义类的对象可以作为key进行排序:
    
        要求:
            1.自定义的类实现WritableComparable
            2.WritableComparable<T> 继承了 Writable接口和Comparable接口
            3.重写compareTo方法,readFileds方法,write方法
    
     */
    public class FlowBean implements WritableComparable<FlowBean> {
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
        public FlowBean() {
        }
    
        public FlowBean(long upFlow, long downFlow,long sumFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = sumFlow;
        }
    
        /*
            指定按照哪个字段进行排序
         */
        public int compareTo(FlowBean o) {
            //按照sumFlow排序
            return Long.compare(this.sumFlow,o.sumFlow);
        }
    
        /*
                序列化时调用的方法
             */
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        /*
            反序列化时调用的方法
         */
        public void readFields(DataInput in) throws IOException {
            //注意:读取数据时必须和写的顺序保持一致
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        @Override
        public String toString() {
            return upFlow + " " + downFlow + " " + sumFlow;
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
    
    }
    
    

Combiner合并

  • Combiner是MR程序中Mapper和Reducer之外的一个组件

  • Combiner的父类就是Reduce

  • 它的作用就是在Reduce之前进行数据的合并,它会对每一个MapTask的输出进行局部汇总,这样就可以减小网络传输的量

  • 举个栗子
    在这里插入图片描述

  • 还是以我们前面的分芝麻案例举例,假设我们的芝麻有上千个分区,即

    黑芝麻:
    1
    2
    100
    200
    1000
    50
    60
    .....
    一千个
    
  • 此时我们的Reduce工作量就会比较大,它会将每一个分区进行挨个的合并,如果我们使用Combiner在MapTask输出时就对数据进行合并,那么Reduce的工作量就会少很多

  • 当然了,我们要注意,不是所有的地方都可以使用Combiner,它的应用场景就是用在不会对最终结果产生变化的场景下使用,这个很好理解,因为Combiner是在Reduce之前调用的,如果Combiner对数据进行了局部汇总,那么Reduce再次进行数据汇总时就有可能出现数据的偏差,举个栗子:
    在这里插入图片描述

Combiner合并实例

以前面我们的写的wordcount举例

Combiner的实现有两种方式:

  • 第一种

    • 增加一个自定义的Combiner类,继承自Reducer
    • 在自定义Combiner类中进行数据的汇总,然后输出统计结果
      // 指定需要使用combiner,以及用哪个类作为combiner的逻辑
      job.setCombinerClass(WordcountCombiner.class);
      
  • 第二种将Reducer类作为Combiner在Driver驱动类中指定

    //设置Combiner的类
    job.setCombinerClass(WordcountReducer.class);
    

我们继续通过一个实例来说明Combiner的用法,第二种直接就是将Reducer传入到job驱动中,我就不演示了,我就演示第一种,自定义的方法

编码步骤:
前面wordcount的代码基础上,修改一下Driver类,添加一个WcCombiner自定义合并类