PostgreSQL数据库FDW——Parquet S3 ParquetReader类
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PostgreSQL数据库FDW——Parquet S3 ParquetReader类相关的知识,希望对你有一定的参考价值。
ParquetReader类定义和实现位于src/reader.cpp和src/reader.hpp下,其主要作用是创建postgres列数据类型和parquet列数据类型的映射、获取arrow array元素值并包装为Datum、arrow nested list和map数据转为datum。首先从create_parquet_reader函数可以看出,提供两种ParquetReader:DefaultParquetReader和CachingParquetReader。这两种ParquetReader都是ParquetReader类的子类。因此首先从ParquetReader类说起。
ParquetReader *create_parquet_reader(const char *filename, MemoryContext cxt, int reader_id, bool caching)
if (!caching)
return new DefaultParquetReader(filename, cxt, reader_id);
else
return new CachingParquetReader(filename, cxt, reader_id);
ParquetReader
构造函数/析构函数
ParquetReader类声明于src/reader.hpp中,首先其构造函数ParquetReader::ParquetReader(MemoryContext cxt) : allocator(new FastAllocatorS3(cxt))
形参为内存上下文cxt,用于构造FastAllocatorS3类初始化std::unique_ptr<FastAllocatorS3> allocator
成员。由于FastAllocatorS3代码量较少,这里先介绍一下该类,FastAllocatorS3类包含构造函数、析构函数、fast_alloc和recycle函数;其构造函数传入内存上下文并设置到segments_cxt成员中,其他成员segment_start_ptr等设置为nullptr;其析构函数就是调用recycle函数;fast_alloc函数预分配一个大内存段并从中分配块,当段耗尽时,它将被添加到垃圾段列表中,并在下一个执行器迭代时释放,如果请求的大小大于SEGMENT_size,则只使用palloc,并将其添加到垃圾段列表中;recycle函数用于将垃圾段列表中申请的内存进行pfree。
class FastAllocatorS3
private: /* Special memory segment to speed up bytea/Text allocations. */
MemoryContext segments_cxt;
char *segment_start_ptr; // 当前还有剩余空间内存段起始位置
char *segment_cur_ptr; // 当前还有剩余空间内存段已分配位置
char *segment_last_ptr; // 当前还有剩余空间内存段结束位置
std::list<char *> garbage_segments;
public:
FastAllocatorS3(MemoryContext cxt) : segments_cxt(cxt), segment_start_ptr(nullptr), segment_cur_ptr(nullptr), segment_last_ptr(nullptr), garbage_segments()
~FastAllocatorS3() this->recycle();
/* fast_alloc Preallocate a big memory segment and distribute blocks from it. When segment is exhausted it is added to garbage_segments list and freed on the next executors iteration. If requested size is bigger that SEGMENT_SIZE then just palloc is used. */
inline void *fast_alloc(long size)
void *ret;
if (size > SEGMENT_SIZE) /* If allocation is bigger than segment then just palloc */ /* #define SEGMENT_SIZE (1024 * 1024) */
MemoryContext oldcxt = MemoryContextSwitchTo(this->segments_cxt); // 切换到内存上下文
void *block = exc_palloc(size); // exc_palloc定义在src/common.cpp文件中,利用postgres内存上下文context->methods->alloc函数分配size内存
this->garbage_segments.push_back((char *) block); // 将其添加到垃圾段列表中
MemoryContextSwitchTo(oldcxt);
return block;
size = MAXALIGN(size); // 需求的内存大小不大于SEGMENT_SIZE
if (this->segment_last_ptr - this->segment_cur_ptr < size) /* If there is not enough space in current segment create a new one */
if (this->segment_start_ptr) /* Recycle the last segment at the next iteration (if there was one) */
this->garbage_segments.push_back(this->segment_start_ptr); // 将其添加到垃圾段列表中
MemoryContext oldcxt = MemoryContextSwitchTo(this->segments_cxt); // 重新分配SEGMENT_SIZE大小内存
this->segment_start_ptr = (char *) exc_palloc(SEGMENT_SIZE);
this->segment_cur_ptr = this->segment_start_ptr;
this->segment_last_ptr = this->segment_start_ptr + SEGMENT_SIZE - 1;
MemoryContextSwitchTo(oldcxt);
ret = (void *) this->segment_cur_ptr;
this->segment_cur_ptr += size; // 设置当前还有剩余空间内存段已分配位置
return ret;
void recycle(void) /* recycle old segments if any */
if (!this->garbage_segments.empty())
bool error = false;
PG_TRY();
for (auto it : this->garbage_segments) pfree(it);
PG_CATCH();
error = true;
PG_END_TRY();
if (error) throw std::runtime_error("garbage segments recycle failed");
this->garbage_segments.clear();
elog(DEBUG1, "parquet_s3_fdw: garbage segments recycled");
MemoryContext context() return segments_cxt;
;
reader_id作为ParquetReader类的成员,主要用于在并行foreign scan时作为ParquetReader对象的标识,在DefaultParquetReader和CachingParquetReader中初始化和使用,ParquetReader父类不涉及,仅提供getter函数int32_t ParquetReader::id() return reader_id;
。
column_mapping
column_mapping函数用于创建postgres列数据类型和parquet列数据类型的映射,create_column_mapping函数创建postgresl tuple descriptor包含的列类型和parquet列之间的映射,schemaless_create_column_mapping创建postgres jsonb和parquet列之间的映射。ParquetReader类有结构体成员TypeInfo,其中arrow结构体代表parquet数据类型,pg结构体代表postgres数据类型,need_cast标志是否需要类型转换,castfunc、outfunc、infunc就是其转换函数,children就是list和map类型所包含成员的数据类型TypeInfo。
struct TypeInfo
int index; /* Column index in parquet schema. For complex types and children index is equal -1. Currently only used for checking column statistics. */
struct
arrow::Type::type type_id;
std::string type_name;
arrow;
struct
Oid oid;
int16 len; /* */
bool byval; /* Only for array elements */
char align; /* */
pg;
/* Cast functions from dafult postgres type defined in `to_postgres_type` to actual table column type. */
bool need_cast;
FmgrInfo *castfunc;
FmgrInfo *outfunc; /* For cast via IO and for maps */
FmgrInfo *infunc; /* For cast via IO */
std::vector<TypeInfo> children; /* Underlying types for complex types like list and map */
TypeInfo() : arrow, pg, need_cast(false), castfunc(nullptr), outfunc(nullptr), infunc(nullptr), index(-1)
TypeInfo(TypeInfo &&ti) : arrow(ti.arrow), pg(ti.pg), need_cast(ti.need_cast), castfunc(ti.castfunc), outfunc(ti.outfunc), infunc(ti.infunc), children(std::move(ti.children)), index(-1)
TypeInfo(std::shared_ptr<arrow::DataType> arrow_type, Oid typid=InvalidOid) : TypeInfo()
arrow.type_id = arrow_type->id();
arrow.type_name = arrow_type->name();
pg.oid = typid; pg.len = 0; pg.byval = false; pg.align = 0;
;
preSortedColumnData结构体用于存放sorted column列的排序信息,this->sorted_cols_data
成员用于存放这些列的preSortedColumnData信息,this->sorted_col_map
成员存放的是列在sorted_cols中的位置。
struct preSortedColumnData
bool is_available; /* true if column is existed */
char *col_name; /* sorted column name */
Datum val; /* sorted column actual data */
bool is_null; /* true if sorted column is NULL */
SortSupportData sortkey; /* sortkey make from presorted column */
preSortedColumnData() :is_available(false), is_null(true)
;
set_schemaless_info函数设置Schemaless mode所需要的成员schemaless bool标识、list actual column和list sorted column。
void ParquetReader::set_schemaless_info(bool schemaless, std::set<std::string> slcols, std::set<std::string> sorted_cols)
this->schemaless = schemaless;
this->slcols = slcols;
this->sorted_cols = sorted_cols;
上一节我们提及Schemaless mode,涉及的点为No specific foreign foreign schema (column difinition) for each parquet file. 每个parquet文件没有特定的外来模式(列定义)。The schemaless foreign table has only one jsonb column to represent the data from the parquet file by following rule: Jsonb Key: parquet column name. Jsonb Value: parquet column data. schemaless 外部表只有一个jsonb列,通过以下规则表示parquet文件中的数据:JsonbKey: parquet列名。Jsonb值:parquet列数据。从schemaless_create_column_mapping函数可以看出,首先判定是否对列进行project,确定结果集类型映射字典的大小;然后遍历parquet file文件的schema中的列,确定是需要的列target_col或sorted_col,对arrow类类型向postgres类型转换,使用TypeInfo结构体代表转换mapping结果,所有结果存放到types成员中;为sorted_col创建preSortedColumnData结构体并存放到this->sorted_cols_data
成员,并设置this->sorted_col_map
成员(存放的是列在sorted_cols中的位置)。注意这里arrow提供的类parquet::arrow::SchemaManifest
和成员schema_fields
。
/* schemaless_create_column_mapping
* - Create mapping between jsonb column and actual parquet columns.
* - Create sortkeys for sorted column if existed. */
void ParquetReader::schemaless_create_column_mapping(parquet::arrow::SchemaManifest manifest)
std::set<std::string> slcols = this->slcols; // slcols为list actual column for schemaless mode
std::set<std::string> sorted_cols = this->sorted_cols; // sorted_cols为list sorted column for schemaless mode
bool is_select_all;
if (slcols.size() > 0) /* If slcols != NIL, get mapping for column existed in this list */
is_select_all = false; // select涉及该foreign table的project,需要指定列的映射
this->sorted_col_map.resize(slcols.size()); // sorted_col_map Mapping between sorted column and arrow result set columns. Corresponds to sorted_cols vector. 该成员为该函数的结果输出,即postgresl列类型和parquet列类型的映射
else
/* Otherwise, get mapping for all column in the parquet file */
is_select_all = true; // select查询所有列,schemaless模式下依据parquet file文件的schema来确定
this->sorted_col_map.resize(manifest.schema_fields.size());
/* Create sortkeys list for column in sortted column list */
if (sorted_cols.size() > 0) this->sorted_cols_data.resize(sorted_cols.size());
for (auto &schema_field : manifest.schema_fields) // 遍历parquet file文件的schema
auto field_name = schema_field.field->name(); // 列名
auto arrow_type = schema_field.field->type(); // 列类型
char arrow_colname[255];
if (field_name.length() > NAMEDATALEN) throw Error("parquet column name %s is too long (max: %d)",field_name.c_str(), NAMEDATALEN - 1);
tolowercase(field_name.c_str(), arrow_colname);
/* Find sorted column in parquet file */ // 找到该列在sorted column list的位置
size_t sorted_col_idx = std::distance(sorted_cols.begin(), sorted_cols.find(arrow_colname));
/* Column will be fetch if existed in slcol list or in select all column query */
bool is_target_col = is_select_all || (slcols.find(arrow_colname) != slcols.end());
/* Create mapping for target column, and get information for sorted column. */
if (is_target_col || sorted_col_idx < sorted_cols.size())
TypeInfo typinfo(arrow_type);
bool error(false);
std::string col_name = std::move(arrow_colname);
this->column_names.push_back(col_name); /* Found mapping! */
switch (arrow_type->id())
case arrow::Type::LIST: // LIST类型转为postgres JSONB类型
Assert(schema_field.children.size() == 1);
Oid elem_type; int16 elem_len; bool elem_byval; char elem_align;
PG_TRY();
auto child_arrow_type = schema_field.children[0].field->type(); // LIST成员的数据类型
elem_type = to_postgres_type(TypeInfo(child_arrow_type).arrow.type_id); // 转为postgres类型oid
if (OidIsValid(elem_type)) get_typlenbyvalalign(elem_type, &elem_len, &elem_byval, &elem_align);
PG_CATCH();
error = true;
PG_END_TRY();
if (error) throw Error("parquet_s3_fdw: failed to get type length (column %s)", col_name.c_str());
if (!OidIsValid(elem_type)) throw Error("parquet_s3_fdw: cannot convert parquet column of type LIST to scalar type of postgres column %s", col_name.c_str());
auto &child = schema_field.children[0];
typinfo.children.emplace_back(child.field->type(), elem_type);
typinfo.pg.oid = JSONBOID;
this->indices.push_back(child.column_index); /* Arrow column indices that are used in query */
break;
case arrow::Type::MAP: // MAP类型转为postgres JSONB类型
/*
* Map has the following structure:
*
* Type::MAP
* └─ Type::STRUCT
* ├─ key type
* └─ item type
*/
Assert(schema_field.children.size() == 1);
auto &strct = schema_field.children[0];
Assert(strct.children.size() == 2); // key type and item type
auto &key = strct.children[0]; auto &item = strct.children[1];
Oid pg_key_type = to_postgres_type(key.field->type()->id()); // 转换key type和item type到postgres类型
Oid pg_item_type = to_postgres_type(item.field->type()->id());
typinfo.pg.oid = JSONBOID;
typinfo.children.emplace_back(key.field->type(), pg_key_type);
typinfo.children.emplace_back(item.field->type(), pg_item_type);
PG_TRY();
typinfo.children[0].outfunc = find_outfunc(pg_key_type);
typinfo.children[1].outfunc = find_outfunc(pg_item_type);
PG_CATCH();
error = true;
PG_END_TRY();
if (error) throw Error("failed to initialize output function for Map column %s", col_name.c_str());
this->indices.push_back(key.column_index); /* Arrow column indices that are used in query */
this->indices.push_back(item.column_index);
break;
default: // 其他arrow类型:INT8、INT16、INT32、INT64、FLOAT、DOUBLE、TIMESTAMP、DATE32、STRING、BINARY
typinfo.pg.oid = to_postgres_type(typinfo.arrow.type_id);
typinfo.index = schema_field.column_index;
this->indices.push_back(schema_field.column_index);
typinfo.need_cast = false; /* In schemaless mode, parquet data is read as mapped type, so, cast is not needed */
this->types.push_back(std::move(typinfo));
/* Create sortkey for sorted_col if this column existed in parquet file */
if (sorted_col_idx < sorted_cols.size())
SortSupportData sort_key; Oid sort_op;
std::string error;
preSortedColumnData sorted_col_data;
/* Init sorted col data */
sorted_col_data.is_available = true; /* true if column is existed */
sorted_col_data.col_name = pstrdup(col_name.c_str()); /* sorted column name */
sorted_col_data.is_null = true; /* true if sorted column is NULL */
memset(&sort_key, 0, sizeof(SortSupportData)); /* sortkey make from presorted column */
/* Init sortkey data */
sort_key.ssup_cxt = allocator->context();
sort_key.ssup_collation = InvalidOid;
sort_key.ssup_nulls_first = true;
sort_key.ssup_attno = sorted_col_idx;
sort_key.abbreviate = false;
get_sort_group_operators(typinfo.pg.oid , true, false, false, &sort_op, NULL, NULL, NULL);
PrepareSortSupportFromOrderingOp(sort_op, &sort_key);
sorted_col_data.sortkey = sort_key;
this->sorted_cols_data[sorted_col_idx] = sorted_col_data;
this->sorted_col_map[this->column_names.size() - 1] = sorted_col_idx;
else
this->sorted_col_map[this->column_names.size() - 1] = -1;
create_column_mapping函数创建tuple descriptor和parquet columns之间的映射,首先获取SchemaManifest,如果是schemaless则调用上述函数获取类型映射;遍历tupleDesc中的列,遍历parquet文件schemamanifest.schema_fields
列,比较postgres列名和arrow schema列名(Compare postgres attribute name to the column name in arrow schema),设置映射信息TypeInfo
/* create_column_mapping
* Create mapping between tuple descriptor and parquet columns. */
void ParquetReader::create_column_mapping(TupleDesc tupleDesc, const std::set<int> &attrs_used)
parquet::ArrowReaderProperties props;
parquet::arrow::SchemaManifest manifest; // 获取SchemaManifest
// std::unique_ptr<parquet::arrow::FileReader> reader
auto p_schema = this->reader->parquet_reader()->metadata()->schema();
if (!parquet::arrow::SchemaManifest::Make(p_schema, nullptr, props, &manifest).ok())
throw std::runtime_error("parquet_s3_fdw: error creating arrow schema");
if (this->schemaless) /* get the column mapping for schemaless mode */
schemaless_create_column_mapping(manifest); return;
this->map.resize(tupleDesc->natts); // map成员Mapping between slot attributes and arrow result set columns. Corresponds to indices vector.
for (int i = 0; i < tupleDesc->natts; i++)
AttrNumber attnum = i + 1 - FirstLowInvalidHeapAttributeNumber;
const char *attname = NameStr(TupleDescAttr(tupleDesc, i)->attname);
this->map[i] = -1;
if (attrs_used.find(attnum) == attrs_used.end()) continue; /* Skip columns we dont intend to use in query */
char pg_colname[255];
tolowercase(NameStr(TupleDescAttr(tupleDesc, i)->attname), pg_colname);
for (auto &schema_field : manifest.schema_fields)
auto field_name = schema_field.field->name();
auto arrow_type = schema_field.field->type();
if (field_name.length() > NAMEDATALEN) throw Error("parquet column name %s is too long (max: %d)", field_name.c_str(), NAMEDATALEN - 1);
char arrow_colname[255]; tolowercase(schema_field.field->name().c_str(), arrow_colname);
/* Compare postgres attribute name to the column name in arrow schema. */
if (strcmp(pg_colname, arrow_colname) == 0)
TypeInfo typinfo(arrow_type);
bool error(false);
this->column_names.push_back(std::move(arrow_colname)); /* Found mapping! */
this->map[i] = this->column_names.size() - 1; /* index of last element */
typinfo.pg.oid = TupleDescAttr(tupleDesc, i)->atttypid;
switch (arrow_type->id())
case arrow::Type::LIST: // LIST类型转为postgres ARRAY类型
Assert(schema_field.children.size() == 1);
Oid elem_type; int16 elem_len; bool elem_byval; char elem_align;
PG_TRY();
elem_type = get_element_type(typinfo.pg.oid);
if (OidIsValid(elem_type)) get_typlenbyvalalign(elem_type, &elem_len, &elem_byval, &elem_align);
PG_CATCH();
error = true;
PG_END_TRY();
if (error) throw Error("failed to get type length (column %s)", pg_colname);
if (!OidIsValid(elem_type)) throw Error("cannot convert parquet column of type LIST to scalar type of postgres column %s", pg_colname);
auto &child = schema_field.children[0];
typinfo.children.emplace_back(child.field->type(), elem_type);
TypeInfo &elem = typinfo.children[0];
elem.pg.len = elem_len; elem.pg.byval = elem_byval; elem.pg.align = elem_align;
initialize_cast(elem, attname); // Check wether implicit cast will be required and prepare cast function call
this->indices.push_back(child.column_index);
break;
case arrow::Type::MAP:
/* Map has the following structure:
* Type::MAP
* └─ Type::STRUCT
* ├─ key type
* └─ item type
*/
Assert(schema_field.children.size() == 1);
auto &strct = schema_field.children[0];
Assert(strct.children.size() == 2);
auto &key = strct.children[0]; auto &item = strct.children[1];
Oid pg_key_type = to_postgres_type(key.field->type()->id());
Oid pg_item_type = to_postgres_type(item.field->type()->id());
typinfo.children.emplace_back(key.field->type(), pg_key_type);
typinfo.children.emplace_back(item.field->type(), pg_item_type);
PG_TRY();
typinfo.children[0].outfunc = find_outfunc(pg_key_type);
typinfo.children[1].outfunc = find_outfunc(pg_item_type);
PG_CATCH();
error = true;
PG_END_TRY();
if (error) throw Error("failed to initialize output function for Map column %s", attname);
this->indices.push_back(key.column_index);
this->indices.push_back(item.column_index);
/* JSONB might need cast (e.g. to TEXT) */
initialize_cast(typinfo, attname);
break;
default:
initialize_cast(typinfo, attname);
typinfo.index = schema_field.column_index;
this->indices.push_back(schema_field.column_index);
this->types.push_back(std::move(typinfo));
break;
do_cast函数依据TypeInfo提供的arrow类型向postgres类型转换的信息对Datum进行数据类型转换。
Datum ParquetReader::do_cast(Datum val, const TypeInfo &typinfo)
MemoryContext ccxt = CurrentMemoryContext;
bool error = false;
char errstr[ERROR_STR_LEN];
PG_TRY(); /* du, du cast, du cast mich... */
if (typinfo.castfunc != NULL) // 首先使用cast函数
val = FunctionCall1(typinfo.castfunc, val);
else if (typinfo.outfunc && typinfo.infunc) // 然后使用output和Input函数
char *str = OutputFunctionCall(typinfo.outfunc, val);
/* TODO: specify typioparam and typmod */
val = InputFunctionCall(typinfo.infunc, str, 0, 0);
PG_CATCH();
ErrorData *errdata;
MemoryContextSwitchTo(ccxt);
error = true; errdata = CopyErrorData();
FlushErrorState();
strncpy(errstr, errdata->message, ERROR_STR_LEN - 1);
FreeErrorData(errdata);
PG_END_TRY();
if (error) throw std::runtime_error(errstr);
return val;
read_primitive_type
read_primitive_type函数从arrow array中返回第i个元素的primitive type值,其实就是获取array元素值并包装为Datum。
/* read_primitive_type Returns primitive type value from arrow array */
Datum ParquetReader::read_primitive_type(arrow::Array *array, const TypeInfo &typinfo, int64_t i)
Datum res;
switch (typinfo.arrow.type_id) /* Get datum depending on the column type */
case arrow::Type::BOOL:
arrow::BooleanArray *boolarray = (arrow::BooleanArray *) array;
res = BoolGetDatum(boolarray->Value(i));
break;
case arrow::Type::INT8:
arrow::Int8Array *intarray = (arrow::Int8Array *) array;
int value = intarray->Value(i);
res = Int8GetDatum(value);
break;
case arrow::Type::INT16:
arrow::Int16Array *intarray = (arrow::Int16Array *) array;
int value = intarray->Value(i);
res = Int16GetDatum(value);
break;
case arrow::Type::INT32:
arrow::Int32Array *intarray = (arrow::Int32Array *) array;
int value = intarray->Value(i);
res = Int32GetDatum(value);
break;
case arrow::Type::INT64:
arrow::Int64Array *intarray = (arrow::Int64Array *) array;
int64 value = intarray->Value(i);
res = Int64GetDatum(value);
break;
case arrow::Type::FLOAT:
arrow::FloatArray *farray = (arrow::FloatArray *) array;
float value = farray->Value(i);
res = Float4GetDatum(value);
break;
case arrow::Type::DOUBLE:
arrow::DoubleArray *darray = (arrow::DoubleArray *) array;
double value = darray->Value(i);
res = Float8GetDatum(value);
break;
case arrow::Type::STRING:
case arrow::Type::BINARY:
arrow::BinaryArray *binarray = (arrow::BinaryArray *) array;
int32_t vallen = 0;
const char *value = reinterpret_cast<const char*>(binarray->GetValue(i, &vallen));
int64 bytea_len = vallen + VARHDRSZ; /* Build bytea */
bytea *b = (bytea *) this->allocator->fast_alloc(bytea_len);
SET_VARSIZE(b, bytea_len);
memcpy(VARDATA(b), value, vallen);
res = PointerGetDatum(b);
break;
case arrow::Type::TIMESTAMP: /* TODO: deal with timezones */
TimestampTz ts;
arrow::TimestampArray *tsarray = (arrow::TimestampArray *) array;
auto tstype = (arrow::TimestampType *) array->type().get();
to_postgres_timestamp(tstype, tsarray->Value(i), ts);
res = TimestampGetDatum(ts);
break;
case arrow::Type::DATE32:
arrow::Date32Array *tsarray = (arrow::Date32Array *) array;
int32 d = tsarray->Value(i);
/* Postgres date starts with 2000-01-01 while unix date (which* Parquet is using) starts with 1970-01-01. So we need to do simple calculations here. */
res = DateADTGetDatum(d + (UNIX_EPOCH_JDATE - POSTGRES_EPOCH_JDATE));
break;
/* TODO: add other types */
default: throw Error("parquet_s3_fdw: unsupported column type: %s", typinfo.arrow.type_name.c_str());
if (typinfo.need_cast) res = do_cast(res, typinfo); /* Call cast function if needed */
return res;
GetPrimitiveValues函数从arrow::Array中获取C语言T类型的数组。copy_to_c_array函数将arrow::Array中elem_size个元素的数据拷贝到T类型数组中。
/* copy_to_c_array memcpy plain values from Arrow array to a C array. */
template<typename T> inline void ParquetReader::copy_to_c_array(T *values, const arrow::Array *array, int elem_size)
const T *in = GetPrimitiveValues<T>(*array);
memcpy(values, in, elem_size * array->length());
/* GetPrimitiveValues Get plain C value array. Copy-pasted from Arrow. */
template <typename T> inline const T*ParquetReader::GetPrimitiveValues(const arrow::Array& arr)
if (arr.length() == 0) return nullptr;
const auto& prim_arr = arrow::internal::checked_cast<const arrow::PrimitiveArray&>(arr);
const T* raw_values = reinterpret_cast<const T*>(prim_arr.values()->data());
return raw_values + arr.offset();
nested list和map数据转为datum
nested_list_to_datum函数将arrow array元素组装成postgres array,仅支持一维array。通过ListArray和pos获取指定位置的Array进行元素获取和组装成postgres array。
/* nested_list_to_datum Returns postgres array build from elements of array. Only one dimensional arrays are supported. */
Datum ParquetReader::nested_list_to_datum(arrow::ListArray *larray, int pos, const TypeInfo &typinfo)
MemoryContext oldcxt;
bool *nulls = NULL;
bool error = false;
std::shared_ptr<arrow::Array> array = larray->values()->Slice(larray->value_offset(pos), larray->value_length(pos));
const TypeInfo &elemtypinfo = typinfo.children[0];
Datum *values = (Datum *) this->allocator->fast_alloc(sizeof(Datum) * array->length()); // 分配空间
#if SIZEOF_DATUM == 8 /* Fill values and nulls arrays */
if (array->null_count() == 0 && typinfo.arrow.type_id == arrow::Type::INT64)
/* Ok, there are no nulls, so probably we could just memcpy the entire array.
* Warning: the code below is based on the assumption that Datum is 8 bytes long, which is true for most contemporary systems but this will not work on some exotic or really old systems. */
copy_to_c_array<int64_t>((int64_t *) values, array.get(), elemtypinfo.pg.len);
goto construct_array;
#endif
for (int64_t i = 0; i < array->length(); ++i)
if (!array->IsNull(i)) values[i] = this->read_primitive_type(array.get(), elemtypinfo, i);
else
if (!nulls)
Size size = sizeof(bool) * array->length();
nulls = (bool *) this->allocator->fast_alloc(size);
memset(nulls, 0, size);
nulls[i] = true; // 对应位置设置null为true,表明数据为null值
construct_array: /* Construct one dimensional array. We have to use PG_TRY / PG_CATCH to prevent any kind leaks of resources allocated by c++ in case of errors. */
int dims[1]; int lbs[1];
dims[0] = array->length(); lbs[0] = 1;
PG_TRY();
oldcxt = MemoryContextSwitchTo(allocator->context());
ArrayType *res = construct_md_array(values, nulls, 1, dims, lbs, elemtypinfo.pg.oid, elemtypinfo.pg.len, elemtypinfo.pg.byval, elemtypinfo.pg.align);
MemoryContextSwitchTo(oldcxt);
PG_CATCH();
error = true;
PG_END_TRY();
if (error) throw std::runtime_error("failed to constuct an array");
return PointerGetDatum(res);
nested_list_to_jsonb_datum函数将arrow array元素组装成postgres jsonb,仅支持一维array。通过ListArray和pos获取指定位置的Array进行元素获取和组装成postgres jsonb。
/* nested_list_to_jsonb_datum Returns postgres JSONB array build from elements of array. Only one dimensional arrays are supported. */
Datum ParquetReader::nested_list_to_jsonb_datum(arrow::ListArray *larray, int pos, const TypeInfo &typinfo)
JsonbParseState *parseState = NULL;
Datum *values;
bool *nulls = NULL;
bool error = false;
std::shared_ptr<arrow::Array> array = larray->values()->Slice(larray->value_offset(pos), larray->value_length(pos));
const TypeInfo &elemtypinfo = typinfo.children[0];
values = (Datum *) this->allocator->fast_alloc(sizeof(Datum) * array->length());
nulls = (bool *) this->allocator->fast_alloc(sizeof(bool) * array->length());
memset(nulls, 0, sizeof(bool) * array->length());
#if SIZEOF_DATUM == 8 /* Fill values and nulls arrays */
if (array->null_count() == 0 && typinfo.arrow.type_id == arrow::Type::INT64)
/* Ok, there are no nulls, so probably we could just memcpy the entire array. Warning: the code below is based on the assumption that Datum is 8 bytes long, which is true for most contemporary systems but this will not work on some exotic or really old systems. */
copy_to_c_array<int64_t>((int64_t *) values, array.get(), elemtypinfo.pg.len);
goto construct_jsonb;
#endif
for (int64_t i = 0; i < array->length(); ++i)
if (!array->IsNull(i)) values[i] = this->read_primitive_type(array.get(), elemtypinfo, i);
else nulls[i] = true;
construct_jsonb: /* Construct one dimensional jsonb array. We have to use PG_TRY / PG_CATCH to prevent any kind leaks of resources allocated by c++ in case of errors. */
PG_TRY();
JsonbValue *res = pushJsonbValue(&parseState, WJB_BEGIN_ARRAY, NULL);
for (int64_t i = 0; i < array->length(); ++i)
datum_to_jsonb(values[i], elemtypinfo.pg.oid, nulls[i], elemtypinfo.outfunc, parseState, WJB_ELEM);
res = pushJsonbValue(&parseState, WJB_END_ARRAY, NULL);
PG_CATCH();
error = true;
PG_END_TRY();
if (error) throw std::runtime_error("failed to constuct an jsonb");
return JsonbPGetDatum(JsonbValueToJsonb(res));
map_to_datum函数将arrow MapArray类型中相应位置pos的map key item转换为postgres jsonb。
Datum ParquetReader::map_to_datum(arrow::MapArray *maparray, int pos, const TypeInfo &typinfo)
JsonbParseState *parseState = NULL;
JsonbValue *jb;
auto keys = maparray->keys()->Slice(maparray->value_offset(pos), maparray->value_length(pos));
auto values = maparray->items()->Slice(maparray->value_offset(pos), maparray->value_length(pos));
Assert(keys->length() == values->length());
Assert(typinfo.children.size() == 2);
jb = pushJsonbValue(&parseState, WJB_BEGIN_OBJECT, NULL);
for (int i = 0; i < keys->length(); ++i)
Datum key = (Datum) 0, value = (Datum) 0;
bool isnull = false;
const TypeInfo &key_typinfo = typinfo.children[0];
const TypeInfo &val_typinfo = typinfo.children[1];
if (keys->IsNull(i)) throw std::runtime_error("key is null");
if (!values->IsNull(i))
key = this->read_primitive_type(keys.get(), key_typinfo, i);
value = this->read_primitive_type(values.get(), val_typinfo, i);
else isnull = true;
/* TODO: adding cstring would be cheaper than adding text */
datum_to_jsonb(key, key_typinfo.pg.oid, false, key_typinfo.outfunc, parseState, WJB_KEY);
datum_to_jsonb(value, val_typinfo.pg.oid, isnull, val_typinfo.outfunc, parseState, WJB_VALUE);
jb = pushJsonbValue(&parseState, WJB_END_OBJECT, NULL);
Datum res = JsonbPGetDatum(JsonbValueToJsonb(jb));
if (typinfo.need_cast) res = do_cast(res, typinfo);
return res;
DefaultParquetReader
CachingParquetReader
https://github.com/adjust/parquet_fdw
以上是关于PostgreSQL数据库FDW——Parquet S3 ParquetReader类的主要内容,如果未能解决你的问题,请参考以下文章
PostgreSQL使用clickhousedb_fdw访问ClickHouse
Postgresql 外部表插件postgres_fdw的安装和使用
NoSQLRDS和大数据异构融合实战详解PostgreSQL FDW功能原理