opencl(十六)----MapReduce

Posted feihu-h

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了opencl(十六)----MapReduce相关的知识,希望对你有一定的参考价值。

MapReduce 两个部分:

  映射:产生键值对

       归并:处理这些键值对

技术图片
// kernel

__kernel void string_search(char16 pattern, __global char* text,
     int chars_per_item, __local int* local_result, 
     __global int* global_result) {

   char16 text_vector, check_vector;

   /* initialize local data */
   local_result[0] = 0;
   local_result[1] = 0;
   local_result[2] = 0;
   local_result[3] = 0;

   /* Make sure previous processing has completed */
   barrier(CLK_LOCAL_MEM_FENCE); //局部同步

   int item_offset = get_global_id(0) * chars_per_item; //该项的全局id × 每一项处理数据德大小

   /* Iterate through characters in text */
   // 遍历该项处理德数据
   for(int i=item_offset; i<item_offset + chars_per_item; i++) {

      /* load global text into private buffer */
      text_vector = vload16(0, text + i);

      /* compare text vector and pattern */
      check_vector = text_vector == pattern;

      /* Check for ‘that‘ */
      if(all(check_vector.s0123))
         atomic_inc(local_result);  //原子操作

      /* Check for ‘with‘ */
      if(all(check_vector.s4567))
         atomic_inc(local_result + 1);

      /* Check for ‘have‘ */
      if(all(check_vector.s89AB))
         atomic_inc(local_result + 2);

      /* Check for ‘from‘ */
      if(all(check_vector.sCDEF))
         atomic_inc(local_result + 3);
   }

   /* Make sure local processing has completed */
   barrier(CLK_GLOBAL_MEM_FENCE);  // 全局同步

   /* Perform global reduction */
   if(get_local_id(0) == 0) {  //归并
      atomic_add(global_result, local_result[0]);  //原子操作
      atomic_add(global_result + 1, local_result[1]);
      atomic_add(global_result + 2, local_result[2]);
      atomic_add(global_result + 3, local_result[3]);
   }
}
View Code

 

#define _CRT_SECURE_NO_WARNINGS
#define PROGRAM_FILE "string_search.cl"
#define KERNEL_FUNC "string_search"
#define TEXT_FILE "kafka.txt"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#ifdef MAC
#include <OpenCL/cl.h>
#else
#include <CL/cl.h>
#endif

int main() {

   /* Host/device data structures */
   cl_platform_id platform;
   cl_device_id device;
   cl_context context;
   cl_command_queue queue;
   cl_int err;

   /* Program/kernel data structures */
   cl_program program;
   FILE *program_handle;
   char *program_buffer, *program_log;
   size_t program_size, log_size;
   cl_kernel kernel;
   size_t offset = 0;
   size_t global_size, local_size; // 全局大小 和局部大小

   /* Data and buffers */
   char pattern[16] = "thatwithhavefrom";
   FILE *text_handle;
   char *text;
   size_t text_size;
   int chars_per_item;//每个item 的大小
   int result[4] = {0, 0, 0, 0};
   cl_mem text_buffer, result_buffer;
   
   /* Identify a platform */   // 获取平台
   err = clGetPlatformIDs(1, &platform, NULL);
   if(err < 0) {
      perror("Couldn‘t identify a platform");
      exit(1);
   } 

   /* Access a device */ // 获取设备
   err = clGetDeviceIDs(platform, CL_DEVICE_TYPE_GPU, 1, &device, NULL);
   if(err < 0) {
      perror("Couldn‘t access any devices");
      exit(1);   
   }

   /* Determine global size and local size */  //确定,全局大小 和 局部大小
   // CL_DEVICE_MAX_COMPUTE_UNITS  得到OpcnCL设备的计算单元(CU)数目
   clGetDeviceInfo(device, CL_DEVICE_MAX_COMPUTE_UNITS,         
      sizeof(global_size), &global_size, NULL);
   // CL_DEVICE_MAX_WORK_GROUP_SIZE  工作组中工作项德数量限制
   clGetDeviceInfo(device, CL_DEVICE_MAX_WORK_GROUP_SIZE,         
      sizeof(local_size), &local_size, NULL);
   // 全局大小   CU数目 × 每个工作组中工作的大小
   global_size *= local_size;

   /* Create a context */
   context = clCreateContext(NULL, 1, &device, NULL, NULL, &err);
   if(err < 0) {
      perror("Couldn‘t create a context");
      exit(1);   
   }

   /* Read program file and place content into buffer 读取program 文件*/

   program_handle = fopen(PROGRAM_FILE, "r");
   if(program_handle == NULL) {
      perror("Couldn‘t find the program file");
      exit(1);
   }
   fseek(program_handle, 0, SEEK_END);
   program_size = ftell(program_handle);
   rewind(program_handle);
   program_buffer = (char*)calloc(program_size+1, sizeof(char));
   fread(program_buffer, sizeof(char), program_size, program_handle);
   fclose(program_handle);

   /* Read text file and place content into buffer 读取文本文件*/
   text_handle = fopen(TEXT_FILE, "r");
   if(text_handle == NULL) {
      perror("Couldn‘t find the text file");
      exit(1);
   }
   fseek(text_handle, 0, SEEK_END);
   text_size = ftell(text_handle)-1;
   rewind(text_handle);
   text = (char*)calloc(text_size, sizeof(char));
   fread(text, sizeof(char), text_size, text_handle);
   fclose(text_handle);
   chars_per_item = text_size / global_size + 1;// 文本代销哦啊/全局大小,计算每个项的处理数据大小

   /* Create program from file */
   program = clCreateProgramWithSource(context, 1, 
      (const char**)&program_buffer, &program_size, &err);
   if(err < 0) {
      perror("Couldn‘t create the program");
      exit(1);
   }
   free(program_buffer);

   /* Build program */
   err = clBuildProgram(program, 0, NULL, NULL, NULL, NULL);
   if(err < 0) {
            
      /* Find size of log and print to std output */
      clGetProgramBuildInfo(program, device, CL_PROGRAM_BUILD_LOG, 
            0, NULL, &log_size);
      program_log = (char*) calloc(log_size+1, sizeof(char));
      clGetProgramBuildInfo(program, device, CL_PROGRAM_BUILD_LOG, 
            log_size+1, program_log, NULL);
      printf("%s
", program_log);
      free(program_log);
      exit(1);
   }

   /* Create a kernel 创建核*/
   kernel = clCreateKernel(program, KERNEL_FUNC, &err);
   if(err < 0) {
      perror("Couldn‘t create a kernel");
      exit(1);
   };

   /* Create buffers to hold the text characters and count */
   // 文本
   text_buffer = clCreateBuffer(context, CL_MEM_READ_ONLY |
         CL_MEM_COPY_HOST_PTR, text_size, text, &err);
   if(err < 0) {
      perror("Couldn‘t create a buffer");
      exit(1);   
   };
   // 全局结果
   result_buffer = clCreateBuffer(context, CL_MEM_READ_WRITE |
         CL_MEM_COPY_HOST_PTR, sizeof(result), result, NULL);

   /* Create kernel argument */
   err = clSetKernelArg(kernel, 0, sizeof(pattern), pattern);// 模式项
   err |= clSetKernelArg(kernel, 1, sizeof(cl_mem), &text_buffer);// 文本
   err |= clSetKernelArg(kernel, 2, sizeof(chars_per_item), &chars_per_item);//每个项处理数据的大小
   err |= clSetKernelArg(kernel, 3, 4 * sizeof(int), NULL);// 局部结果的大小
   err |= clSetKernelArg(kernel, 4, sizeof(cl_mem), &result_buffer); //全局结果
   if(err < 0) {
      printf("Couldn‘t set a kernel argument");
      exit(1);   
   };

   /* Create a command queue */
   queue = clCreateCommandQueue(context, device, 0, &err);
   if(err < 0) {
      perror("Couldn‘t create a command queue");
      exit(1);   
   };

   /* Enqueue kernel */
   /**
    * cl_int clEnqueueNDRangeKernel (
        cl_command_queue command_queue,     //命令队列
     cl_kernel kernel,                                   //核
     cl_uint work_dim,                                //数据的维度
     const size_t *global_work_offset,         // 各维度上的全局ID偏移量
     const size_t *global_work_size,     //各维度上的工作项数量
     const size_t *local_work_size,      // 各维度上一个工作组中工作项的数量
     cl_uint num_events_in_wait_list,
     const cl_event *event_wait_list,
     cl_event *event
)
    * **/
   err = clEnqueueNDRangeKernel(queue, kernel, 1, &offset, &global_size, 
         &local_size, 0, NULL, NULL); 
   if(err < 0) {
      perror("Couldn‘t enqueue the kernel");
      printf("Error code: %d
", err);
      exit(1);   
   }

   /* Read and print the result */
   // 读取数据命令
   err = clEnqueueReadBuffer(queue, result_buffer, CL_TRUE, 0, 
      sizeof(result), &result, 0, NULL, NULL);
   if(err < 0) {
      perror("Couldn‘t read the buffer");
      exit(1);   
   }

   printf("
Results: 
");
   printf("Number of occurrences of ‘that‘: %d
", result[0]);
   printf("Number of occurrences of ‘with‘: %d
", result[1]);
   printf("Number of occurrences of ‘have‘: %d
", result[2]);
   printf("Number of occurrences of ‘from‘: %d
", result[3]);

   /* Deallocate resources */
   clReleaseMemObject(result_buffer);
   clReleaseMemObject(text_buffer);  //释放缓存对象
   clReleaseKernel(kernel);  // 释放核
   clReleaseCommandQueue(queue); // 释放命令队列
   clReleaseProgram(program); //释放程序
   clReleaseContext(context); // 释放上下文
   return 0;
}

以上是关于opencl(十六)----MapReduce的主要内容,如果未能解决你的问题,请参考以下文章

从 GPU 获取 OpenCL 程序代码

使用 Visual Studio 2010 在 Nvidia GEFORCE 上的 OpenCL 代码

为啥这个 opencl 代码是不确定的?

如何为邻居访问优化 OpenCL 代码?

使用 OpenCL 内核的最近邻插值代码

我的 OpenCL 代码根据看似 noop 更改输出