是否可以在运行时定义要在 STXXL 中排序的类型的长度?
Posted
技术标签:
【中文标题】是否可以在运行时定义要在 STXXL 中排序的类型的长度?【英文标题】:Is it possible to define the length of the type to sort in STXXL at run time? 【发布时间】:2014-01-15 11:34:36 【问题描述】:我有一个需要内置排序的应用程序,我希望用 STXXL 提供的排序替换现有的排序机制。我已经使用 STXXL 成功地对其进行了测试,但我的问题是,虽然特定的排序运行需要对固定长度的字符串进行操作,但长度是在运行时确定的,并且可以在 10 字节到 4000 字节之间的任何位置。如果实际长度很小,总是允许 4000 字节显然效率很低。 对于那些不熟悉 STXXL 的人,我相信这个问题大致等同于在编译时不知道对象大小的情况下定义一个 std::vector。但是,我不是 C++ 专家 - 该应用程序是用 C 编写的。 在我的测试中,这是我正在排序的类型:
struct string80
char x[80];
;
这是 STXXL 分类器的类型定义:
typedef stxxl::sorter<string80, sort_comparator80> stxxl_sorter80;
问题是我不想将数组大小硬编码为“80”。 我能想出的唯一解决方案是定义许多不同长度的结构并在运行时选择最接近的结构。我错过了一个技巧吗?我在思考 C 而不是 C++?
【问题讨论】:
您能添加一个您想要排序的最小 stxxl 容器吗? 我很困惑,你使用的是 C 还是 C++? @Zeta 添加了代码片段。这是你的意思吗? @Jeffrey 嗯,我正在编写的新代码是 C++,所以我可以使用 STXXL。现有的应用程序是 C。我将添加 STXXL 排序器的类型定义 - 这可能会更清楚。 【参考方案1】:如果我们将大小为 n 的对象(记录)存储在一个扁平的 stxxl::vector 字符中会怎样。然后,定义一个基于 stxxl::vector::iterator 的自定义迭代器,它在每次增量时仅跳过 n 个字节。当使用 std::vector 而不是 STXXL 时,这将适用于 std::sort 甚至 tbb::sort。我看到 STXXL 的 ExtIterator 有很多额外的特征。是否可以为这样的迭代器正确定义它们?
#include <vector>
#include <cassert>
#include <cstdlib>
#include <stxxl.h>
#include <iostream>
#include <algorithm>
typedef std::vector<char>::iterator It;
class ObjectValue;
//This class defines a reference object that handles assignment operations
//during a sorting
class ObjectReference
public:
ObjectReference() : recordSize_(0)
ObjectReference(It ptr, size_t recordSize) : ptr_(ptr), recordSize_(recordSize)
void operator = (ObjectReference source) const
std::copy(source.ptr_, source.ptr_ + recordSize_, ptr_);
void operator = (const ObjectValue & source) const;
It GetIterator() const
return ptr_;
size_t GetRecordSize() const
return recordSize_;
private:
It ptr_;
size_t recordSize_;
;
//This class defines a value object that is used when a temporary value of a
//record is required somewhere
class ObjectValue
public:
ObjectValue()
ObjectValue(ObjectReference prx) : object_(prx.GetIterator(), prx.GetIterator() + prx.GetRecordSize())
ObjectValue(It ptr, size_t recordSize) : object_(ptr, ptr + recordSize)
std::vector<char>::const_iterator GetIterator() const
return object_.begin();
private:
std::vector<char> object_;
;
//We need to support copying from a reference to an object
void ObjectReference::operator = (const ObjectValue & source) const
std::copy(source.GetIterator(), source.GetIterator() + recordSize_, ptr_);
//The comparator passed to a sorting algorithm. It recieves iterators, converts
//them to char pointers, that are passed to the actual comparator tha handles
//object comparison
template<class Cmp>
class Comparator
public:
Comparator()
Comparator(Cmp cmp) : cmp_(cmp)
bool operator () (const ObjectReference & a, const ObjectReference & b) const
return cmp_(&*a.GetIterator(), &*b.GetIterator());
bool operator () (const ObjectValue & a, const ObjectReference & b) const
return cmp_(&*a.GetIterator(), &*b.GetIterator());
bool operator () (const ObjectReference & a, const ObjectValue & b) const
return cmp_(&*a.GetIterator(), &*b.GetIterator());
bool operator () (const ObjectValue & a, const ObjectValue & b) const
return cmp_(&*a.GetIterator(), &*b.GetIterator());
private:
Cmp cmp_;
;
//The iterator that operates on flat byte area. If the record size is $n$, it
//just skips $n$ bytes on each increment operation to jump to the next record
class RecordIterator : public std::iterator<std::random_access_iterator_tag, ObjectValue, size_t, RecordIterator, ObjectReference>
public:
RecordIterator() : recordSize_(0)
RecordIterator(It ptr, size_t recordSize) : ptr_(ptr), recordSize_(recordSize)
ObjectReference operator * () const
return ObjectReference(ptr_, recordSize_);
ObjectReference operator [] (size_t diff) const
return *(*this + diff);
It GetIterator() const
return ptr_;
size_t GetRecordSize() const
return recordSize_;
RecordIterator& operator ++()
ptr_ += recordSize_;
return *this;
RecordIterator& operator --()
ptr_ -= recordSize_;
return *this;
RecordIterator operator ++(int)
RecordIterator ret = *this;
ptr_ += recordSize_;
return ret;
RecordIterator operator --(int)
RecordIterator ret = *this;
ptr_ -= recordSize_;
return ret;
friend bool operator < (RecordIterator it1, RecordIterator it2);
friend bool operator > (RecordIterator it1, RecordIterator it2);
friend bool operator == (RecordIterator it1, RecordIterator it2);
friend bool operator != (RecordIterator it1, RecordIterator it2);
friend size_t operator - (RecordIterator it1, RecordIterator it2);
friend RecordIterator operator - (RecordIterator it1, size_t shift);
friend RecordIterator operator + (RecordIterator it1, size_t shift);
private:
It ptr_;
size_t recordSize_;
;
bool operator < (RecordIterator it1, RecordIterator it2)
return it1.ptr_ < it2.ptr_;
bool operator > (RecordIterator it1, RecordIterator it2)
return it1.ptr_ > it2.ptr_;
bool operator == (RecordIterator it1, RecordIterator it2)
return it1.ptr_ == it2.ptr_;
bool operator != (RecordIterator it1, RecordIterator it2)
return !(it1 == it2);
RecordIterator operator - (RecordIterator it1, size_t shift)
return RecordIterator(it1.ptr_ - shift * it1.recordSize_, it1.recordSize_);
RecordIterator operator + (RecordIterator it1, size_t shift)
return RecordIterator(it1.ptr_ + shift * it1.recordSize_, it1.recordSize_);
size_t operator - (RecordIterator it1, RecordIterator it2)
return (it1.ptr_ - it2.ptr_) / it1.recordSize_;
namespace std
//We need to specialize the swap for the sorting to work correctly
template<>
void swap(ObjectReference & it1, ObjectReference & it2)
ObjectValue buf(it1.GetIterator(), it1.GetRecordSize());
std::copy(it2.GetIterator(), it2.GetIterator() + it2.GetRecordSize(), it1.GetIterator());
std::copy(buf.GetIterator(), buf.GetIterator() + it1.GetRecordSize(), it2.GetIterator());
//Finally, here is the "user"-defined code. In the example, "records" are
//4-byte integers, although actual size of a record can be changed at runtime
class RecordComparer
public:
bool operator ()(const char * aRawPtr, const char * bRawPtr) const
const int * aPtr = reinterpret_cast<const int*>(aRawPtr);
const int * bPtr = reinterpret_cast<const int*>(bRawPtr);
return *aPtr < *bPtr;
;
int main(int, char*[])
size_t size = 100500;
//Although it is a constant, it is easy to change to in runtime
size_t recordSize = sizeof(int);
std::vector<int> intVector(size);
std::generate(intVector.begin(), intVector.end(), rand);
const char * source = reinterpret_cast<const char*>(&intVector[0]);
std::vector<char> recordVector;
std::copy(source, source + recordVector.size(), &recordVector[0]);
RecordIterator begin(recordVector.begin(), recordSize);
RecordIterator end(recordVector.end(), recordSize);
//Sort "records" as blocks of bytes
std::sort(begin, end, Comparator<RecordComparer>());
//Sort "records" as usual
std::sort(intVector.begin(), intVector.end());
//Checking that arrays are the same:
for (; begin != end; ++begin)
size_t i = begin - RecordIterator(recordVector.begin(), recordSize);
It it = (*(begin)).GetIterator();
int* value = reinterpret_cast<int*>(&(*it));
assert(*value == intVector[i]);
return 0;
【讨论】:
【参考方案2】:这里没有好的解决方案,至少 STXXL 没有。
STXXL 排序器经过高度优化,代码需要在编译时通过模板参数提供数据类型的大小。我认为这不会,甚至应该改变。
为许多不同的参数实例化类的方法并不好,但很常见。想想简单 C++ 程序中使用的所有不同的 std::vector 实例,它们都可以通过 C 中的 void* 函数来处理。
根据您要推出多少代码,尝试实例化 2 的幂,然后为您的常用参数设置更精细的粒度。
【讨论】:
谢谢,Timo,这对我来说是一个令人放心的答案。我计划从 2 的幂开始,但随着时间的推移,我可以收集信息以找出实际使用的尺寸,并可能在以后对其进行优化。供您参考,我正在使用 STXXL 排序器来替换 Syncsort(现为 DMExpress)。这是一个可以做各种与排序相关的奇妙事情的包,但考虑到我们的需求只是一个可调用的外部排序,它非常昂贵。到目前为止,在我的时序测试中,STXXL 比 Syncsort 慢了大约 5% 到 10%,对此我很满意。以上是关于是否可以在运行时定义要在 STXXL 中排序的类型的长度?的主要内容,如果未能解决你的问题,请参考以下文章
GraphQL 和 Prisma:当它们已经是 Prisma 数据库模式的一部分时,为啥要在应用程序模式中重新定义类型?