C++通过http协议操作hdfs
Posted byxdaz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++通过http协议操作hdfs相关的知识,希望对你有一定的参考价值。
一、http协议存在hdfs组件
通过http协议操作hdfs有两个组件,httpfs和webhdfs,我一开始还以为这两个是同一个东西,其实不是。webhdfs是namenode、datanode自带的,httpfs是完全独立的一个组件。webhdfs上传文件等操作需要通过某个datanode进行,而不是直接通过namenode上传,客户端有可能访问多个机器。而httpfs,所有的操作都通过httpfs进行。webhdfs和httpfs的使用方法基本是一样的,只有很小很小的差别。
WebHDFS提供HDFS的RESTful接口,可通过此接口进行HDFS文件操作。WebHDFS服务内置在HDFS中,不需额外安装、启动。
Httpfs是hadoop2.x中hdfs项目的内置应用,基于tomcat和jesery,对外提供完备HDFS操作的RESTful接口,无需安装客户端,可方便实现数据交互。
WebHDFS与HttpFS的使用参考这篇文章:https://www.jianshu.com/p/9b46aebcb963
二、WebHDFS REST API
http://hadoop.apache.org/docs/r1.0.4/webhdfs.html
三、使用libcurl封装类操作hdfs
//HttpfsClient.h
#pragma once
#include <string>
#include <vector>
//文件状态
typedef struct FileStatus
__int64 accessTime;
__int64 blocksize;
std::string group;
__int64 length;
__int64 modificationTime;
std::string owner;
std::string pathSuffix;
std::string permission;
int replication;
std::string type;
FileStatus;
//进度回调回调函数
typedef void (*FileUploadOrDownloadProgressCallback)(unsigned long long ullTotal,unsigned long long ullNow,void *pUserData);
class CHttpFSClient
private:
std::string m_hostaddr; //http://<HOST>:<PORT>/webhdfs/v1/
std::string m_username; //i.e. hadoop
long m_operatetimeout;
long m_connecttimeout;
public:
CHttpFSClient(std::string& hostaddr,std::string& username,int connectTimeout=120,int operateTimeout=5184000);
~CHttpFSClient(void);
//创建文件并写入数据
bool create(std::string& local_file,std::string& rem_file,bool overwrite = false,FileUploadOrDownloadProgressCallback funcFileUpload = NULL,void * pUserData = NULL);
//插入文件
bool append(std::string& local_file,std::string& rem_file,FileUploadOrDownloadProgressCallback funcFileUpload = NULL,void * pUserData = NULL);
//创建目录
bool mkdirs(std::string& path);
//修改名称
bool rename(std::string& src,std::string& dst);
//删除目录或文件
bool del(std::string& path, bool recursive=false);
//下载文件
bool read(std::string& rem_file,std::string& local_file, long offset=0, long length=0,FileUploadOrDownloadProgressCallback funcFileDownload = NULL,void * pUserData = NULL);
//查询目录中文件/目录状态列表
bool ls(std::string& rem_path,std::vector<FileStatus>& results);
//获取文件或目录状态
bool getStatus(std::string& rem_file,FileStatus & fsFileStatus);
protected:
static size_t fileread_callback(void *ptr, size_t size, size_t nmemb, void *stream);
static size_t filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream);
static size_t memwrite_data(const char *contents, size_t size, size_t nmemb, std::string *stream);
static size_t header_callback(const char *ptr, size_t size, size_t nmemb, std::string *stream);
static size_t progress_callback(void *p,long long dltotal,long long dlnow,long long ultotal,long long ulnow);
//print FileStatus
void showFileStatus(FileStatus results);
protected:
FileUploadOrDownloadProgressCallback m_funcUploadProgressCallback;
void * m_funcUploadProgressCallbackUserData;
FileUploadOrDownloadProgressCallback m_funcDownloadProgressCallback;
void * m_funcDownloadProgressCallbackUserData;
;
//HttpfsClient.cpp
// HttpfsClient.cpp : 定义控制台应用程序的入口点。
//
#include "stdafx.h"
#include "HttpfsClient.h"
#include <assert.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <curl/curl.h>
#include <json/json.h>
#include <iostream>
#include <fstream>
//自定义进度数据结构体
typedef struct tagProgressStruct
CURL *pCurlObject;
FileUploadOrDownloadProgressCallback funcCallback;
void *pUserData;
MyProgressStruct;
CHttpFSClient::CHttpFSClient(std::string& hostaddr,std::string& username,int connectTimeout,int operateTimeout)
m_hostaddr = hostaddr;
m_username = username;
m_operatetimeout = operateTimeout;
m_connecttimeout = connectTimeout;
m_funcUploadProgressCallback = NULL;
m_funcUploadProgressCallbackUserData = NULL;
m_funcDownloadProgressCallback = NULL;
m_funcDownloadProgressCallbackUserData = NULL;
CHttpFSClient::~CHttpFSClient(void)
/*
Create and Write to a File
@param local_file string
@param rem_file string
@param overwirte: ture,false
@return true/false
Step 1: Submit a HTTP PUT request without automatically following redirects and without sending the file data.
curl -i -X PUT "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE [&overwrite=<true|false>][&blocksize=<LONG>][&replication=<SHORT>]
[&permission=<OCTAL>][&buffersize=<INT>]"
The request is redirected to a datanode where the file data is to be written:
HTTP/1.1 307 TEMPORARY_REDIRECT
Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE...
Content-Length: 0
Step 2: Submit another HTTP PUT request using the URL in the Location header with the file data to be written.
curl -i -X PUT -T <LOCAL_FILE> "http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=CREATE..."
The client receives a 201 Created response with zero content length and the WebHDFS URI of the file in the Location header:
HTTP/1.1 201 Created
Location: webhdfs://<HOST>:<PORT>/<PATH>
Content-Length: 0
*/
bool CHttpFSClient::create(std::string& local_file,std::string& rem_file,bool overwrite,FileUploadOrDownloadProgressCallback funcFileUpload,void * pUserData)
m_funcUploadProgressCallback = funcFileUpload;
m_funcUploadProgressCallbackUserData = pUserData;
std::string url = m_hostaddr + rem_file + "?op=CREATE&user.name="+m_username;
if(overwrite) url += "&overwrite=true";
std::string szheader_buffer;
char* redir_url;
std::string strredir_url;
long response_code=0;
bool curlerr = false;
CURL *curl;
CURLcode res;
// get a curl handle
curl = curl_easy_init();
if(curl)
curl_easy_setopt(curl, CURLOPT_PUT, 1L);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_operatetimeout);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_connecttimeout);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 0L);
curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0); //上传的字节数
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "hdfs create first request failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
else
res = curl_easy_getinfo(curl,CURLINFO_REDIRECT_URL,&redir_url);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_REDIRECT_URL failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
strredir_url = redir_url;
// always cleanup!!!!
curl_easy_cleanup(curl);
if(curlerr)
return false;
//upload file to hdfs
struct stat file_info;
// get the file size of the local file
stat(local_file.c_str(), &file_info);
MyProgressStruct *pMyProgressStruct = NULL;
FILE * hd_src = NULL;
hd_src = fopen(local_file.c_str(), "rb");
if(GetLastError() != 0)
return false;
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Content-Type:application/octet-stream");
curl = curl_easy_init();
if(curl)
// we want to use our own read function
curl_easy_setopt(curl, CURLOPT_READFUNCTION, CHttpFSClient::fileread_callback);
// enable uploading
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1L);
// HTTP PUT please
curl_easy_setopt(curl, CURLOPT_PUT, 1L);
// specify target URL, and note that this URL should include a file name, not only a directory
curl_easy_setopt(curl, CURLOPT_URL, strredir_url.c_str());
// specify content type
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
// now specify which file to upload
curl_easy_setopt(curl, CURLOPT_READDATA, hd_src);
// provide the size of the upload, we specicially typecast the value to curl_off_t
// since we must be sure to use the correct data size
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE,
(curl_off_t)file_info.st_size);
//设置进度回调
#if LIBCURL_VERSION_NUM >= 0x072000
/* xferinfo was introduced in 7.32.0, no earlier libcurl versions will
compile as they won't have the symbols around.
If built with a newer libcurl, but running with an older libcurl:
curl_easy_setopt() will fail in run-time trying to set the new
callback, making the older callback get used.
New libcurls will prefer the new callback and instead use that one even
if both callbacks are set. */
pMyProgressStruct = new MyProgressStruct();
pMyProgressStruct->pCurlObject = curl;
pMyProgressStruct->funcCallback = m_funcUploadProgressCallback;
pMyProgressStruct->pUserData = m_funcUploadProgressCallbackUserData;
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, progress_callback);
/* pass the struct pointer into the xferinfo function, note that this is
an alias to CURLOPT_PROGRESSDATA */
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, pMyProgressStruct);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
#endif
// Now run off and do what you've been told!
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "upload file to hdfs failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
fclose(hd_src); // close the local file
if(pMyProgressStruct != NULL)
delete pMyProgressStruct;
pMyProgressStruct = NULL;
// always cleanup!!!!
curl_slist_free_all(headers);
curl_easy_cleanup(curl);
if(curlerr)
return false;
return true;
/*
Append to a File
@param local_file string
@param rem_file string
@return true/false
Step 1: Submit a HTTP POST request without automatically following redirects and without sending the file data.
curl -i -X POST "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND[&buffersize=<INT>]"
The request is redirected to a datanode where the file data is to be appended:
HTTP/1.1 307 TEMPORARY_REDIRECT
Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND...
Content-Length: 0
Step 2: Submit another HTTP POST request using the URL in the Location header with the file data to be appended.
curl -i -X POST -T <LOCAL_FILE> "http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=APPEND..."
The client receives a response with zero content length:
HTTP/1.1 200 OK
Content-Length: 0
*/
bool CHttpFSClient::append(std::string& local_file,std::string& rem_file,FileUploadOrDownloadProgressCallback funcFileUpload,void * pUserData)
m_funcUploadProgressCallback = funcFileUpload;
m_funcUploadProgressCallbackUserData = pUserData;
std::string url = m_hostaddr + rem_file + "?op=APPEND&user.name="+m_username;
char* redir_url;
std::string strredir_url = "";
long response_code=0;
bool curlerr = false;
CURL *curl;
CURLcode res;
// get a curl handle
curl = curl_easy_init();
if(curl)
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_operatetimeout);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 0L);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_connecttimeout);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, 0);
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "hdfs append first request failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
else
res = curl_easy_getinfo(curl,CURLINFO_REDIRECT_URL,&redir_url);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_REDIRECT_URL failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
strredir_url = redir_url;
// always cleanup!!!!
curl_easy_cleanup(curl);
if(curlerr)
return false;
// append file to hdfs
MyProgressStruct * pMyProgressStruct = NULL;
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Content-Type: application/octet-stream");
curl = curl_easy_init();
if(curl)
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_URL, strredir_url.c_str());
//curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
/*//multipart/formdata请求
struct curl_httppost *formpost = NULL;
struct curl_httppost *lastptr = NULL;
curl_formadd(&formpost, &lastptr, CURLFORM_COPYNAME, "file", CURLFORM_FILE, local_file.c_str(), CURLFORM_CONTENTTYPE, "application/octet-stream", CURLFORM_END);
curl_easy_setopt(curl, CURLOPT_HTTPPOST, formpost);*/
//C++代码一次读取文本文件全部内容到string对象
std::ifstream fin(local_file.c_str(),std::ios::in);
std::istreambuf_iterator<char> beg(fin), end;
std::string strdata(beg, end);
fin.close();
curl_easy_setopt(curl,CURLOPT_POSTFIELDS,strdata.c_str());
//设置进度回调
#if LIBCURL_VERSION_NUM >= 0x072000
/* xferinfo was introduced in 7.32.0, no earlier libcurl versions will
compile as they won't have the symbols around.
If built with a newer libcurl, but running with an older libcurl:
curl_easy_setopt() will fail in run-time trying to set the new
callback, making the older callback get used.
New libcurls will prefer the new callback and instead use that one even
if both callbacks are set. */
pMyProgressStruct = new MyProgressStruct();
pMyProgressStruct->pCurlObject = curl;
pMyProgressStruct->funcCallback = m_funcUploadProgressCallback;
pMyProgressStruct->pUserData = m_funcUploadProgressCallbackUserData;
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, progress_callback);
/* pass the struct pointer into the xferinfo function, note that this is
an alias to CURLOPT_PROGRESSDATA */
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, pMyProgressStruct);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
#endif
res = curl_easy_perform(curl);
//curl_formfree(formpost);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "append file to hdfs failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
else
res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
if(pMyProgressStruct != NULL)
delete pMyProgressStruct;
pMyProgressStruct = NULL;
// always cleanup!!!!
curl_slist_free_all(headers);
curl_easy_cleanup(curl);
if(curlerr)
return false;
if(response_code == 200)
return true;
else
return false;
/*
Make a Directory
Submit a HTTP PUT request.
curl -i -X PUT "http://<HOST>:<PORT>/<PATH>?op=MKDIRS[&permission=<OCTAL>]"
The client receives a response with a boolean JSON object:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
"boolean": true
*/
bool CHttpFSClient::mkdirs(std::string& path)
std::string url = m_hostaddr + path + "?op=MKDIRS&user.name="+m_username;
long response_code=0;
long headerlen = 0;
bool curlerr = false;
std::string response_contents = "";
CURL *curl;
CURLcode res;
// get a curl handle
curl = curl_easy_init();
if(curl)
// http put
curl_easy_setopt(curl, CURLOPT_PUT, 1L);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_operatetimeout);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_connecttimeout);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents);
curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0);
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "hdfs mkdirs failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
else
res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
// always cleanup!!!!
curl_easy_cleanup(curl);
if(curlerr)
return false;
if(response_code == 200)
Json::Reader reader;
Json::Value root;
const char *content = response_contents.c_str();
if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
return false;
return root["boolean"].asBool();
else
return false;
/*
Rename a File/Directory
Submit a HTTP PUT request.
curl -i -X PUT "<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>"
The client receives a response with a boolean JSON object:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
"boolean": true
*/
bool CHttpFSClient::rename(std::string& src,std::string& dst)
std::string url = m_hostaddr + src + "?op=RENAME&user.name="+m_username+"&destination="+dst;
long response_code=0;
long headerlen = 0;
bool curlerr = false;
std::string response_contents = "";
CURL *curl;
CURLcode res;
// get a curl handle
curl = curl_easy_init();
if(curl)
// http put
curl_easy_setopt(curl, CURLOPT_PUT, 1L);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_operatetimeout);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_connecttimeout);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents);
curl_easy_setopt(curl, CURLOPT_INFILESIZE, 0);
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "hdfs rename failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
else
res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
// always cleanup!!!!
curl_easy_cleanup(curl);
if(curlerr)
return false;
if(response_code == 200)
Json::Reader reader;
Json::Value root;
const char *content = response_contents.c_str();
if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
return false;
return root["boolean"].asBool();
else
return false;
/*
Delete a File/Directory
@param file string, the file or directory to be deleted
@return ture/false
Submit a HTTP DELETE request
curl -i -X DELETE "http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
[&recursive=<true|false>]"
The client receives a response with a boolean JSON object:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
"boolean": true
*/
bool CHttpFSClient::del(std::string& path, bool recursive)
std::string url = m_hostaddr + path + "?op=DELETE&user.name="+m_username;
if(recursive) url+="&recursive=true";
std::string response_contents = "";
char redir_url[100];
long response_code=0;
long headerlen = 0;
bool curlerr = false;
CURL *curl;
CURLcode res;
// get a curl handle
curl = curl_easy_init();
if(curl)
// Set the DELETE command
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_operatetimeout);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_connecttimeout);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
curl_easy_setopt(curl,CURLOPT_WRITEDATA,&response_contents);
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "hdfs del failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
else
res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
// always cleanup!!!!
curl_easy_cleanup(curl);
if(curlerr)
return false;
if(response_code == 200)
Json::Reader reader;
Json::Value root;
const char *content = response_contents.c_str();
if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
return false;
return root["boolean"].asBool();
else
return false;
/*
Open and Read a File of remote an write to local_file
@param @remote_file
@param @local_file
Submit a HTTP GET request with automatically following redirects.
curl -i -L "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
[&offset=<LONG>][&length=<LONG>][&buffersize=<INT>]"
The request is redirected to a datanode where the file data can be read:
HTTP/1.1 307 TEMPORARY_REDIRECT
Location: http://<DATANODE>:<PORT>/webhdfs/v1/<PATH>?op=OPEN...
Content-Length: 0
The client follows the redirect to the datanode and receives the file data:
HTTP/1.1 200 OK
Content-Type: application/octet-stream
Content-Length: 22
Hello, webhdfs user!
*/
bool CHttpFSClient::read(std::string& rem_file,std::string& local_file, long offset, long length,FileUploadOrDownloadProgressCallback funcFileDownload,void * pUserData)
m_funcDownloadProgressCallback = funcFileDownload;
m_funcDownloadProgressCallbackUserData = pUserData;
char url[1024] = 0;
if(offset != 0 && length != 0)
sprintf_s(url,200,"%s%s?op=OPEN&user.name=%s&offset=%ld&length=%ld",m_hostaddr.c_str(),rem_file.c_str(),m_username.c_str(),offset,length);
else
sprintf_s(url,200,"%s%s?op=OPEN&user.name=%s",m_hostaddr.c_str(),rem_file.c_str(),m_username.c_str());
long response_code=0;
bool curlerr = false;
CURL *curl = NULL;
CURLcode res;
MyProgressStruct *pMyProgressStruct = NULL;
// get a curl handle
curl = curl_easy_init();
if(curl)
// HTTP GET please
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
// specify target URL, and note that this URL should include a file name, not only a directory
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_operatetimeout);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_connecttimeout);
/* send all data to this function */
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::filewrite_data);
FILE * pagefile;
pagefile = fopen(local_file.c_str(), "wb");
if(GetLastError() != 0)
return false;
// write the page body to this file handle
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pagefile);
//设置进度回调
#if LIBCURL_VERSION_NUM >= 0x072000
/* xferinfo was introduced in 7.32.0, no earlier libcurl versions will
compile as they won't have the symbols around.
If built with a newer libcurl, but running with an older libcurl:
curl_easy_setopt() will fail in run-time trying to set the new
callback, making the older callback get used.
New libcurls will prefer the new callback and instead use that one even
if both callbacks are set. */
pMyProgressStruct = new MyProgressStruct();
pMyProgressStruct->pCurlObject = curl;
pMyProgressStruct->funcCallback = m_funcDownloadProgressCallback;
pMyProgressStruct->pUserData = m_funcDownloadProgressCallbackUserData;
curl_easy_setopt(curl, CURLOPT_XFERINFOFUNCTION, progress_callback);
/* pass the struct pointer into the xferinfo function, note that this is
an alias to CURLOPT_PROGRESSDATA */
curl_easy_setopt(curl, CURLOPT_XFERINFODATA, pMyProgressStruct);
curl_easy_setopt(curl, CURLOPT_NOPROGRESS, 0L);
#endif
// Now run off and do what you've been told!
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "get file from hdfs failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
fclose(pagefile); // close the local file
if(pMyProgressStruct != NULL)
delete pMyProgressStruct;
pMyProgressStruct = NULL;
// always cleanup!!!!
curl_easy_cleanup(curl);
if(curlerr)
return false;
return true;
/*
list a directory
@param $dir string, the dir to list
@return json object
Submit a HTTP GET request.
curl -i "http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=LISTSTATUS"
The client receives a response with a FileStatuses JSON object:
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 427
"FileStatuses":
"FileStatus":
[
"accessTime" : 1320171722771,
"blockSize" : 33554432,
"group" : "supergroup",
"length" : 24930,
"modificationTime": 1320171722771,
"owner" : "webuser",
"pathSuffix" : "a.patch",
"permission" : "644",
"replication" : 1,
"type" : "FILE"
,
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0,
"modificationTime": 1320895981256,
"owner" : "szetszwo",
"pathSuffix" : "bar",
"permission" : "711",
"replication" : 0,
"type" : "DIRECTORY"
,
...
]
*/
bool CHttpFSClient::ls(std::string& rem_path,std::vector<FileStatus>& results)
std::string url = m_hostaddr + rem_path + "?op=LISTSTATUS&user.name="+m_username;
long response_code=0;
long headerlen = 0;
bool curlerr = false;
std::string response_contents = "";
CURL *curl;
CURLcode res;
// get a curl handle
curl = curl_easy_init();
if(curl)
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_operatetimeout);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_connecttimeout);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents);
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "hdfs GETFILESTATUS failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
else
res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
// always cleanup!!!!
curl_easy_cleanup(curl);
if(curlerr)
return false;
if(response_code == 200)
Json::Reader reader;
Json::Value root;
const char *content = response_contents.c_str();
if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
return false;
if(root.empty()) return false;
Json::Value FileStatuses = root.get("FileStatuses",Json::nullValue);
//if(FileStatuses == Json::nullValue) return false;
if(FileStatuses.isNull()) return false;
Json::Value FileStatusVec = FileStatuses.get("FileStatus",Json::nullValue);
//if(FileStatusVec == Json::nullValue) return false;
if(FileStatusVec.isNull()) return false;
results.clear();
int size = FileStatusVec.size();
for (int i=0; i<size; ++i)
FileStatus fst;
fst.accessTime = FileStatusVec[i]["accessTime"].asInt64();
fst.blocksize = FileStatusVec[i]["blockSize"].asInt64();
fst.group = FileStatusVec[i]["group"].asString();
fst.length = FileStatusVec[i]["length"].asInt64();
fst.modificationTime = FileStatusVec[i]["modificationTime"].asInt64();
fst.owner = FileStatusVec[i]["owner"].asString();
fst.pathSuffix = FileStatusVec[i]["pathSuffix"].asString();
fst.permission = FileStatusVec[i]["permission"].asString();
fst.replication = FileStatusVec[i]["replication"].asInt();
fst.type = FileStatusVec[i]["type"].asString();
//print FileStatus
showFileStatus(fst);
//insert
results.push_back(fst);
return true;
else
return false;
/*
request:
Submit a HTTP GET request.
http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS
response:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
"FileStatus":
"accessTime" : 0,
"blockSize" : 0,
"group" : "supergroup",
"length" : 0, //in bytes, zero for directories
"modificationTime": 1320173277227,
"owner" : "webuser",
"pathSuffix" : "",
"permission" : "777",
"replication" : 0,
"type" : "DIRECTORY" //enum FILE, DIRECTORY
*/
bool CHttpFSClient::getStatus(std::string& rem_file,FileStatus & fsFileStatus)
std::string url = m_hostaddr + rem_file + "?op=GETFILESTATUS&user.name="+m_username;
long response_code=0;
long headerlen = 0;
bool curlerr = false;
std::string response_contents = "";
CURL *curl;
CURLcode res;
// get a curl handle
curl = curl_easy_init();
if(curl)
curl_easy_setopt(curl, CURLOPT_HTTPGET, 1L);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_HEADER, 1L);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, m_operatetimeout);
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, m_connecttimeout);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, CHttpFSClient::memwrite_data);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &response_contents);
res = curl_easy_perform(curl);
// Check for errors
if(res != CURLE_OK)
fprintf(stderr, "hdfs GETFILESTATUS failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
else
res = curl_easy_getinfo(curl,CURLINFO_RESPONSE_CODE,&response_code);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_RESPONSE_CODE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
res = curl_easy_getinfo(curl,CURLINFO_HEADER_SIZE,&headerlen);
if(res != CURLE_OK)
fprintf(stderr, "curl_easy_getinfo CURLINFO::CURLINFO_HEADER_SIZE failed: %s\\n",
curl_easy_strerror(res));
curlerr = true;
// always cleanup!!!!
curl_easy_cleanup(curl);
if(curlerr)
return false;
if(response_code == 200)
Json::Reader reader;
Json::Value root;
const char *content = response_contents.c_str();
if(!reader.parse(content+headerlen,content+response_contents.length(),root,false))
return false;
if(root.empty()) return false;
Json::Value itemFileStatus = root.get("FileStatus",Json::nullValue);
if(itemFileStatus.isNull()) return false;
fsFileStatus.accessTime = itemFileStatus["accessTime"].asInt64();
fsFileStatus.blocksize = itemFileStatus["blockSize"].asInt64();
fsFileStatus.group = itemFileStatus["group"].asString();
fsFileStatus.length = itemFileStatus["length"].asInt64();
fsFileStatus.modificationTime = itemFileStatus["modificationTime"].asInt64();
fsFileStatus.owner = itemFileStatus["owner"].asString();
fsFileStatus.pathSuffix = itemFileStatus["pathSuffix"].asString();
fsFileStatus.permission = itemFileStatus["permission"].asString();
fsFileStatus.replication = itemFileStatus["replication"].asInt();
fsFileStatus.type = itemFileStatus["type"].asString();
//print FileStatus
showFileStatus(fsFileStatus);
return true;
else
return false;
void CHttpFSClient::showFileStatus(FileStatus results)
//print result
printf("filestatus:%s\\t%s\\t%ld\\t%d\\n",results.pathSuffix.c_str(),results.owner.c_str(),results.length,results.replication);
size_t CHttpFSClient::fileread_callback(void *ptr, size_t size, size_t nmemb, void *stream)
size_t retcode;
curl_off_t nread;
/* in real-world cases, this would probably get this data differently
as this fread() stuff is exactly what the library already would do
by default internally */
retcode = fread(ptr, size, nmemb, (FILE *)stream);
nread = (curl_off_t)retcode;
fprintf(stderr, "*** We read %" CURL_FORMAT_CURL_OFF_T
" bytes from file\\n", nread);
return retcode;
size_t CHttpFSClient::filewrite_data(const char *ptr, size_t size, size_t nmemb, void *stream)
size_t written = fwrite(ptr, size, nmemb, (FILE *)stream);
return written;
size_t CHttpFSClient::memwrite_data(const char *contents, size_t size, size_t nmemb, std::string *stream)
assert(stream != NULL);
size_t len = size * nmemb;
stream->append(contents, len);
return len;
size_t CHttpFSClient::header_callback(const char *ptr, size_t size, size_t nmemb, std::string *stream)
assert(stream != NULL);
size_t len = size * nmemb;
stream->append(ptr, len);
return len;
size_t CHttpFSClient::progress_callback(void *p,long long dltotal,long long dlnow,long long ultotal,long long ulnow)
// do something to display the progress
MyProgressStruct * pMyProgressStruct = (MyProgressStruct *)p;
if(pMyProgressStruct != NULL)
CURL *curl = pMyProgressStruct->pCurlObject;
double curtime = 0;
curl_easy_getinfo(curl, CURLINFO_TOTAL_TIME, &curtime);
printf("TOTAL TIME: %f \\r\\n", curtime);
printf("UP: %ld bytes of %ld bytes, DOWN: %ld bytes of %ld bytes \\n", ulnow, ultotal, dlnow, dltotal);
if (ultotal)
printf("UP progress: %0.2f\\n", float(ulnow / ultotal));
if(pMyProgressStruct != NULL && pMyProgressStruct->funcCallback != NULL)
pMyProgressStruct->funcCallback(ultotal,ulnow,pMyProgressStruct->pUserData);
if (dltotal)
printf("DOWN progress: %0.2f\\n", float(dlnow / dltotal));
if(pMyProgressStruct != NULL && pMyProgressStruct->funcCallback != NULL)
pMyProgressStruct->funcCallback(dltotal,dlnow,pMyProgressStruct->pUserData);
return 0;
//测试实例
#include "stdafx.h"
#include "HttpfsClient.h"
#include "curl/curl.h"
int main(int argc, _TCHAR* argv[])
curl_global_init(CURL_GLOBAL_ALL);
std::string hostaddr = "http://192.168.1.100:14000/webhdfs/v1";
std::string username = "hadoop";
CHttpFSClient httpfs(hostaddr,username);
std::vector<FileStatus> results;
std::string local_file = ".\\\\test.dat";
std::string rem_path = "/test.dat";
//httpfs.create(local_file,rem_path);
//httpfs.append(local_file,rem_path);
httpfs.read(rem_path,local_file);
getchar();
curl_global_cleanup();
return 0;
四、常见问题
1、JsonCpp读取较大数字出错问题
JsonCpp是c++中解析Json常用的解析库。在低版本JsonCpp库中访问大数字时,asUInt方法不存在。解决方法有两种:使用高版本jsoncpp和用asDouble替换asUInt
2、如果要获取上传文件或下载文件的进度信息,curl版本要大于7.32,新版的函数才生效。
3、上传下载的操作建议另起一个线程,其实这种用法在libcurl里也可以应用在http协议上传下载。
以上是关于C++通过http协议操作hdfs的主要内容,如果未能解决你的问题,请参考以下文章