WIP: try to make group_concat w/ order by works

This commit is contained in:
Aleksei Antipovskii
2025-02-13 00:44:08 +01:00
parent 45f74da660
commit 23821636fd
7 changed files with 399 additions and 159 deletions

View File

@ -114,6 +114,7 @@ void GroupConcatInfo::prepGroupConcat(JobInfo& jobInfo)
groupConcat->fOrderCols.push_back(make_pair(key, k->get()->asc()));
}
groupConcat->id = fGroupConcat.size();
fGroupConcat.push_back(groupConcat);
i++;
@ -353,10 +354,12 @@ void GroupConcatAgUM::processRow(const rowgroup::Row& inRow)
fConcator->processRow(fRow);
}
void GroupConcatAgUM::merge(const rowgroup::Row& inRow, int64_t i)
void GroupConcatAgUM::merge(const rowgroup::Row& inRow, uint64_t i)
{
uint8_t* data = inRow.getData();
joblist::GroupConcatAgUM* gccAg = *((joblist::GroupConcatAgUM**)(data + inRow.getOffset(i)));
auto* gcc = inRow.getAggregateData(i);
auto* gccAg = dynamic_cast<GroupConcatAgUM*>(gcc);
idbassert(gccAg != nullptr);
fConcator->merge(gccAg->concator().get());
}
@ -366,6 +369,19 @@ uint8_t* GroupConcatAgUM::getResult()
return fConcator->getResult(fGroupConcat->fSeparator);
}
void GroupConcatAgUM::serialize(messageqcpp::ByteStream &bs) const {
fConcator->serialize(bs);
}
void GroupConcatAgUM::deserialize(messageqcpp::ByteStream &bs, span<SP_GroupConcat> groupConcats) {
initialize();
fConcator->deserialize(bs, groupConcats);
}
size_t GroupConcatAgUM::getDataSize() const {
return sizeof(*this) + fConcator->getDataSize();
}
void GroupConcatAgUM::applyMapping(const std::shared_ptr<int[]>& mapping, const Row& row)
{
// For some reason the rowgroup mapping fcns don't work right in this class.
@ -418,7 +434,7 @@ void GroupConcator::initialize(const rowgroup::SP_GroupConcat& gcc)
// too high(1MB or 3MB by default) to allocate it for every instance.
fGroupConcatLen = gcc->fSize;
size_t sepSize = gcc->fSeparator.size();
fCurrentLength -= sepSize; // XXX Yet I have to find out why spearator has c_str() as nullptr here.
fCurrentLength -= sepSize; // XXX Yet I have to find out why separator has c_str() as nullptr here.
fTimeZone = gcc->fTimeZone;
fConstCols = gcc->fConstCols;
@ -431,7 +447,7 @@ void GroupConcator::initialize(const rowgroup::SP_GroupConcat& gcc)
void GroupConcator::outputRow(std::ostringstream& oss, const rowgroup::Row& row)
{
const CalpontSystemCatalog::ColDataType* types = row.getColTypes();
vector<uint32_t>::iterator i = fConcatColumns.begin();
auto i = fConcatColumns.begin();
auto j = fConstCols.begin();
uint64_t groupColCount = fConcatColumns.size() + fConstCols.size();
@ -441,7 +457,7 @@ void GroupConcator::outputRow(std::ostringstream& oss, const rowgroup::Row& row)
if (j != fConstCols.end() && k == j->second)
{
oss << j->first.safeString();
j++;
++j;
continue;
}
@ -548,7 +564,7 @@ void GroupConcator::outputRow(std::ostringstream& oss, const rowgroup::Row& row)
}
}
i++;
++i;
}
}
@ -556,9 +572,9 @@ bool GroupConcator::concatColIsNull(const rowgroup::Row& row)
{
bool ret = false;
for (vector<uint32_t>::iterator i = fConcatColumns.begin(); i != fConcatColumns.end(); i++)
for (const unsigned int& concatColumn : fConcatColumns)
{
if (row.isNullValue(*i))
if (row.isNullValue(concatColumn))
{
ret = true;
break;
@ -574,14 +590,14 @@ int64_t GroupConcator::lengthEstimate(const rowgroup::Row& row)
const CalpontSystemCatalog::ColDataType* types = row.getColTypes();
// null values are already skipped.
for (vector<uint32_t>::iterator i = fConcatColumns.begin(); i != fConcatColumns.end(); i++)
for (const unsigned int& concatColumn : fConcatColumns)
{
if (row.isNullValue(*i))
if (row.isNullValue(concatColumn))
continue;
int64_t fieldLen = 0;
switch (types[*i])
switch (types[concatColumn])
{
case CalpontSystemCatalog::TINYINT:
case CalpontSystemCatalog::SMALLINT:
@ -589,7 +605,7 @@ int64_t GroupConcator::lengthEstimate(const rowgroup::Row& row)
case CalpontSystemCatalog::INT:
case CalpontSystemCatalog::BIGINT:
{
int64_t v = row.getIntField(*i);
int64_t v = row.getIntField(concatColumn);
if (v < 0)
fieldLen++;
@ -607,7 +623,7 @@ int64_t GroupConcator::lengthEstimate(const rowgroup::Row& row)
case CalpontSystemCatalog::UINT:
case CalpontSystemCatalog::UBIGINT:
{
uint64_t v = row.getUintField(*i);
uint64_t v = row.getUintField(concatColumn);
while ((v /= 10) != 0)
fieldLen++;
@ -628,7 +644,7 @@ int64_t GroupConcator::lengthEstimate(const rowgroup::Row& row)
case CalpontSystemCatalog::VARCHAR:
case CalpontSystemCatalog::TEXT:
{
fieldLen += row.getConstString(*i).length();
fieldLen += row.getConstString(concatColumn).length();
break;
}
@ -653,7 +669,7 @@ int64_t GroupConcator::lengthEstimate(const rowgroup::Row& row)
{
fieldLen = 19; // YYYY-MM-DD HH24:MI:SS
// Decimal point and milliseconds
uint64_t colPrecision = row.getPrecision(*i);
uint64_t colPrecision = row.getPrecision(concatColumn);
if (colPrecision > 0 && colPrecision < 7)
{
@ -667,7 +683,7 @@ int64_t GroupConcator::lengthEstimate(const rowgroup::Row& row)
{
fieldLen = 10; // -HHH:MI:SS
// Decimal point and milliseconds
uint64_t colPrecision = row.getPrecision(*i);
uint64_t colPrecision = row.getPrecision(concatColumn);
if (colPrecision > 0 && colPrecision < 7)
{
@ -733,8 +749,8 @@ void GroupConcatOrderBy::initialize(const rowgroup::SP_GroupConcat& gcc)
fOrderByCond.resize(0);
for (uint64_t i = 0; i < gcc->fOrderCond.size(); i++)
fOrderByCond.push_back(IdbSortSpec(gcc->fOrderCond[i].first, gcc->fOrderCond[i].second));
for (auto [idx, asc] : gcc->fOrderCond)
fOrderByCond.emplace_back(idx, asc);
fDistinct = gcc->fDistinct;
fRowsPerRG = 128;
@ -742,17 +758,14 @@ void GroupConcatOrderBy::initialize(const rowgroup::SP_GroupConcat& gcc)
fRm = gcc->fRm;
fSessionMemLimit = gcc->fSessionMemLimit;
vector<std::pair<uint32_t, uint32_t> >::iterator i = gcc->fGroupCols.begin();
while (i != gcc->fGroupCols.end())
{
auto x = (*i).second;
fConcatColumns.push_back(x);
i++;
for (auto [_, pos] : gcc->fGroupCols) {
fConcatColumns.emplace_back(pos);
}
IdbOrderBy::initialize(gcc->fRowGroup);
}
uint64_t GroupConcatOrderBy::getKeyLength() const
{
// only distinct the concatenated columns
@ -776,32 +789,33 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
// the RID is no meaning here, use it to store the estimated length.
int16_t estLen = lengthEstimate(fRow0);
fRow0.setRid(estLen);
OrderByRow newRow(fRow0, fRule);
OrderByRow newRow(fRow0, getCurrentRowPos(), fRule);
fOrderByQueue.push(newRow);
fCurrentLength += estLen;
// add to the distinct map
if (fDistinct)
fDistinctMap->insert(fRow0.getPointer());
fDistinctMap->emplace(fRow0.getPointer(), getCurrentRowPos());
fRowGroup.incRowCount();
fRow0.nextRow();
if (fRowGroup.getRowCount() >= fRowsPerRG)
{
fDataQueue.push(fData);
// A "postfix" but accurate RAM accounting that sums up sizes of RGDatas.
uint64_t newSize = fRowGroup.getSizeWithStrings();
RGData newRGData(fRowGroup, fRowsPerRG);
fDataQueue.push(newRGData);
if (!fRm->getMemory(newSize, fSessionMemLimit))
if (fRm && !fRm->getMemory(newSize, fSessionMemLimit))
{
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
throw IDBExcept(fErrorCode);
}
fMemSize += newSize;
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
fRowGroup.setData(&fDataQueue.back()); // it is safe to use a pointer here because we always work with
// the last element of the queue
fRowGroup.resetRowGroup(0);
fRowGroup.getRow(0, &fRow0);
}
@ -824,13 +838,14 @@ void GroupConcatOrderBy::processRow(const rowgroup::Row& row)
// only the copyRow does useful work here
fDistinctMap->erase(swapRow.fData);
copyRow(row, &fRow2);
fDistinctMap->insert(swapRow.fData);
fDistinctMap->emplace(swapRow.fData, swapRow.fPos);
}
int16_t estLen = lengthEstimate(fRow2);
fRow2.setRid(estLen);
fCurrentLength += estLen;
swapRow.fPos = getCurrentRowPos();
fOrderByQueue.push(swapRow);
}
}
@ -839,16 +854,27 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
{
GroupConcatOrderBy* go = dynamic_cast<GroupConcatOrderBy*>(gc);
while (go->fOrderByQueue.empty() == false)
size_t mySz = fDataQueue.size();
while (!go->fDataQueue.empty()) {
fDataQueue.emplace(std::move(go->fDataQueue.front()));
go->fDataQueue.pop();
}
while (!go->fOrderByQueue.empty())
{
const OrderByRow& row = go->fOrderByQueue.top();
OrderByRow row = go->fOrderByQueue.top();
uint64_t new_rgid = (row.fPos >> 16) + mySz;
row.fPos = (new_rgid << 16) + (row.fPos & 0xffff);
fRowGroup.setData(&fDataQueue[new_rgid]);
Row tmp_row;
fRowGroup.getRow(row.fPos & 0xffff, &tmp_row);
row.fData = tmp_row.getPointer();
// check if the distinct row already exists
if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end())
{
; // no op;
}
// if the row count is less than the limit
else if (fCurrentLength < fGroupConcatLen)
{
@ -858,9 +884,8 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
// add to the distinct map
if (fDistinct)
fDistinctMap->insert(row.fData);
fDistinctMap->emplace(row.fData, row.fPos);
}
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData))
{
OrderByRow swapRow = fOrderByQueue.top();
@ -871,7 +896,7 @@ void GroupConcatOrderBy::merge(GroupConcator* gc)
if (fDistinct)
{
fDistinctMap->erase(swapRow.fData);
fDistinctMap->insert(row.fData);
fDistinctMap->emplace(row.fData, row.fPos);
}
row1.setData(row.fData);
@ -916,7 +941,7 @@ uint8_t* GroupConcatOrderBy::getResultImpl(const string& sep)
{
size_t sizeDiff = oss.str().size() - prevResultSize;
prevResultSize = oss.str().size();
if (!fRm->getMemory(sizeDiff, fSessionMemLimit))
if (fRm && !fRm->getMemory(sizeDiff, fSessionMemLimit))
{
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
throw IDBExcept(fErrorCode);
@ -951,6 +976,54 @@ uint8_t* GroupConcator::swapStreamWithStringAndReturnBuf(ostringstream& oss, boo
return reinterpret_cast<uint8_t*>(outputBuf_->data());
}
void GroupConcator::serialize(messageqcpp::ByteStream &bs) const {
messageqcpp::serializeInlineVector(bs, fConcatColumns);
bs << fConstCols.size();
for (const auto& [str, pos]: fConstCols) {
bs << str << pos;
}
bs << fCurrentLength;
bs << fGroupConcatLen;
bs << fConstantLen;
uint8_t tmp8;
if (outputBuf_) {
tmp8 = 1;
bs << tmp8;
bs << *outputBuf_;
}
else {
tmp8 = 0;
bs << tmp8;
}
bs << fTimeZone;
}
void GroupConcator::deserialize(messageqcpp::ByteStream &bs, span<SP_GroupConcat> groupConcats) {
messageqcpp::deserializeInlineVector(bs, fConcatColumns);
size_t sz;
bs >> sz;
fConstCols.clear();
fConstCols.reserve(sz);
for (size_t i = 0; i < sz; ++i) {
utils::NullString str;
bs >> str;
uint32_t pos;
bs >> pos;
fConstCols.emplace_back(std::move(str), pos);
}
bs >> fCurrentLength;
bs >> fGroupConcatLen;
bs >> fConstantLen;
outputBuf_.reset();
uint8_t tmp8;
bs >> tmp8;
if (tmp8) {
outputBuf_.reset(new std::string());
bs >> (*outputBuf_);
}
bs >> fTimeZone;
}
uint8_t* GroupConcator::getResult(const string& sep)
{
return getResultImpl(sep);
@ -976,9 +1049,66 @@ const string GroupConcatOrderBy::toString() const
return (baseStr + oss.str());
}
void GroupConcatOrderBy::serialize(messageqcpp::ByteStream &bs) const {
GroupConcator::serialize(bs);
size_t sz = fDataQueue.size();
bs << sz;
for (const auto& rgdata : fDataQueue)
{
rgdata.serialize(bs, fRowGroup.getDataSize(fRowsPerRG));
}
sz = fDistinctMap->size();
bs << sz;
for (const auto& [_, pos] : *fDistinctMap) {
bs << pos;
}
sz = fOrderByQueue.size();
bs << sz;
for (const auto& row : fOrderByQueue) {
bs << row.fPos;
}
}
void GroupConcatOrderBy::deserialize(messageqcpp::ByteStream &bs, span<rowgroup::SP_GroupConcat> groupConcats) {
GroupConcator::deserialize(bs, groupConcats);
size_t sz;
bs >> sz;
for (size_t i = 0; i < sz; ++i) {
RGData rgdata;
rgdata.deserialize(bs, fRowGroup.getDataSize(fRowsPerRG));
fDataQueue.emplace(std::move(rgdata));
}
bs >> sz;
fDistinctMap->clear();
fDistinctMap->reserve(sz);
for (size_t i = 0; i < sz; ++i) {
uint64_t pos;
bs >> pos;
Row row;
fRowGroup.setData(&fDataQueue[pos >> 16]);
fRowGroup.getRow(pos & 0xffff, &row);
fDistinctMap->emplace(row.getPointer(), pos);
}
bs >> sz;
fOrderByQueue.reserve(sz);
for (size_t i = 0; i < sz; ++i) {
uint64_t pos;
bs >> pos;
Row row;
fRowGroup.setData(&fDataQueue[pos >> 16]);
fRowGroup.getRow(pos & 0xffff, &row);
fOrderByQueue.emplace(row, pos, fRule);
}
fRowGroup.setData(&fDataQueue.back());
}
uint64_t GroupConcatOrderBy::getCurrentRowPos() const {
return ((fDataQueue.size() - 1) << 16) + fRowGroup.getRowCount() - 1;
}
// GroupConcatNoOrder class implementation
GroupConcatNoOrder::GroupConcatNoOrder()
: fRowsPerRG(128), fErrorCode(ERR_AGGREGATION_TOO_BIG), fMemSize(0), fRm(NULL)
: fRowsPerRG(128), fErrorCode(ERR_AGGREGATION_TOO_BIG), fMemSize(0), fRm(nullptr)
{
}
@ -995,10 +1125,10 @@ void GroupConcatNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc)
fRowGroup = gcc->fRowGroup;
fRowsPerRG = 128;
fErrorCode = ERR_AGGREGATION_TOO_BIG;
fRm = gcc->fRm;
// FIXME kemm: fRm = gcc->fRm;
fSessionMemLimit = gcc->fSessionMemLimit;
vector<std::pair<uint32_t, uint32_t> >::iterator i = gcc->fGroupCols.begin();
auto i = gcc->fGroupCols.begin();
while (i != gcc->fGroupCols.end())
fConcatColumns.push_back((*(i++)).second);
@ -1013,9 +1143,10 @@ void GroupConcatNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc)
fMemSize += newSize;
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
fRowGroup.setUseOnlyLongString(true);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0);
fRowGroup.initRow(&fRow);
fRowGroup.getRow(0, &fRow);
@ -1040,16 +1171,16 @@ void GroupConcatNoOrder::processRow(const rowgroup::Row& row)
// A "postfix" but accurate RAM accounting that sums up sizes of RGDatas.
uint64_t newSize = fRowGroup.getSizeWithStrings();
if (!fRm->getMemory(newSize, fSessionMemLimit))
if (fRm && !fRm->getMemory(newSize, fSessionMemLimit))
{
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
throw IDBExcept(fErrorCode);
}
fMemSize += newSize;
fDataQueue.push(fData);
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0);
fRowGroup.getRow(0, &fRow);
}
@ -1058,15 +1189,15 @@ void GroupConcatNoOrder::processRow(const rowgroup::Row& row)
void GroupConcatNoOrder::merge(GroupConcator* gc)
{
GroupConcatNoOrder* in = dynamic_cast<GroupConcatNoOrder*>(gc);
auto* in = dynamic_cast<GroupConcatNoOrder*>(gc);
while (in->fDataQueue.size() > 0)
while (!in->fDataQueue.empty())
{
fDataQueue.push(in->fDataQueue.front());
fDataQueue.push(std::move(in->fDataQueue.front()));
in->fDataQueue.pop();
}
fDataQueue.push(in->fData);
fRowGroup.setData(&fDataQueue.back());
fMemSize += in->fMemSize;
in->fMemSize = 0;
}
@ -1076,11 +1207,10 @@ uint8_t* GroupConcatNoOrder::getResultImpl(const string& sep)
ostringstream oss;
bool addSep = false;
fDataQueue.push(fData);
size_t prevResultSize = 0;
bool isNull = true;
while (fDataQueue.size() > 0)
while (!fDataQueue.empty())
{
fRowGroup.setData(&fDataQueue.front());
fRowGroup.getRow(0, &fRow);
@ -1098,18 +1228,42 @@ uint8_t* GroupConcatNoOrder::getResultImpl(const string& sep)
}
size_t sizeDiff = oss.str().size() - prevResultSize;
prevResultSize = oss.str().size();
if (!fRm->getMemory(sizeDiff, fSessionMemLimit))
if (fRm && !fRm->getMemory(sizeDiff, fSessionMemLimit))
{
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
throw IDBExcept(fErrorCode);
}
fMemSize += sizeDiff;
fDataQueue.pop();
}
return swapStreamWithStringAndReturnBuf(oss, isNull);
}
void GroupConcatNoOrder::serialize(messageqcpp::ByteStream &bs) const {
GroupConcator::serialize(bs);
size_t sz = fDataQueue.size();
bs << sz;
for (const auto& rgdata : fDataQueue) {
rgdata.serialize(bs, fRowGroup.getDataSize(fRowsPerRG));
}
}
void GroupConcatNoOrder::deserialize(messageqcpp::ByteStream &bs, span<SP_GroupConcat> groupConcats) {
GroupConcator::deserialize(bs, groupConcats);
size_t sz;
bs >> sz;
fDataQueue = {};
for (size_t i = 0; i < sz; i++) {
RGData rgdata;
rgdata.deserialize(bs, fRowGroup.getDataSize(fRowsPerRG), groupConcats);
fDataQueue.push(std::move(rgdata));
}
// reinit
fRowGroup.setUseOnlyLongString(true);
fRowGroup.setData(&fDataQueue.back());
}
const string GroupConcatNoOrder::toString() const
{
return GroupConcator::toString();

View File

@ -48,7 +48,8 @@ class GroupConcatInfo
virtual ~GroupConcatInfo();
void prepGroupConcat(JobInfo&);
void mapColumns(const rowgroup::RowGroup&);
virtual void mapColumns(const rowgroup::RowGroup&);
std::set<uint32_t>& columns()
{
@ -59,11 +60,12 @@ class GroupConcatInfo
return fGroupConcat;
}
const std::string toString() const;
virtual const std::string toString() const;
protected:
uint32_t getColumnKey(const execplan::SRCP& srcp, JobInfo& jobInfo);
std::shared_ptr<int[]> makeMapping(const rowgroup::RowGroup&, const rowgroup::RowGroup&);
virtual uint32_t getColumnKey(const execplan::SRCP& srcp, JobInfo& jobInfo);
virtual std::shared_ptr<int[]> makeMapping(const rowgroup::RowGroup&, const rowgroup::RowGroup&);
std::set<uint32_t> fColumns;
std::vector<rowgroup::SP_GroupConcat> fGroupConcat;
@ -72,22 +74,27 @@ class GroupConcatInfo
class GroupConcatAgUM : public rowgroup::GroupConcatAg
{
public:
EXPORT GroupConcatAgUM(rowgroup::SP_GroupConcat&);
EXPORT ~GroupConcatAgUM();
EXPORT explicit GroupConcatAgUM(rowgroup::SP_GroupConcat&);
EXPORT ~GroupConcatAgUM() override;
using rowgroup::GroupConcatAg::merge;
void initialize();
void processRow(const rowgroup::Row&);
EXPORT void merge(const rowgroup::Row&, int64_t);
void initialize() override;
void processRow(const rowgroup::Row&) override;
EXPORT void merge(const rowgroup::Row&, uint64_t) override;
boost::scoped_ptr<GroupConcator>& concator()
{
return fConcator;
}
EXPORT uint8_t* getResult();
EXPORT uint8_t* getResult() override;
void serialize(messageqcpp::ByteStream &) const override;
void deserialize(messageqcpp::ByteStream &, std::span<rowgroup::SP_GroupConcat>) override;
size_t getDataSize() const override;
protected:
void applyMapping(const std::shared_ptr<int[]>&, const rowgroup::Row&);
virtual void applyMapping(const std::shared_ptr<int[]>&, const rowgroup::Row&);
boost::scoped_ptr<GroupConcator> fConcator;
boost::scoped_array<uint8_t> fData;
@ -112,7 +119,13 @@ class GroupConcator
virtual uint8_t* getResult(const std::string& sep);
uint8_t* swapStreamWithStringAndReturnBuf(ostringstream& oss, bool isNull);
virtual void serialize(messageqcpp::ByteStream &) const;
virtual void deserialize(messageqcpp::ByteStream &, std::span<rowgroup::SP_GroupConcat> groupConcats);
virtual const std::string toString() const;
virtual rowgroup::RGDataSizeType getDataSize() const {
return sizeof(*this);
}
protected:
virtual bool concatColIsNull(const rowgroup::Row&);
@ -128,28 +141,35 @@ class GroupConcator
long fTimeZone;
};
// For GROUP_CONCAT withour distinct or orderby
// For GROUP_CONCAT without distinct or orderby
class GroupConcatNoOrder : public GroupConcator
{
public:
GroupConcatNoOrder();
virtual ~GroupConcatNoOrder();
void initialize(const rowgroup::SP_GroupConcat&);
void processRow(const rowgroup::Row&);
~GroupConcatNoOrder() override;
void merge(GroupConcator*);
void initialize(const rowgroup::SP_GroupConcat&) override;
void processRow(const rowgroup::Row&) override;
void merge(GroupConcator*) override;
using GroupConcator::getResult;
uint8_t* getResultImpl(const std::string& sep);
uint8_t* getResultImpl(const std::string& sep) override;
//uint8_t* getResult(const std::string& sep);
void serialize(messageqcpp::ByteStream &) const override;
void deserialize(messageqcpp::ByteStream &, span<rowgroup::SP_GroupConcat> groupConcats) override;
const std::string toString() const;
const std::string toString() const override;
rowgroup::RGDataSizeType getDataSize() const override
{
return GroupConcator::getDataSize() + fRowGroup.getSizeWithStrings() + fMemSize;
}
protected:
rowgroup::RowGroup fRowGroup;
rowgroup::Row fRow;
rowgroup::RGData fData;
std::queue<rowgroup::RGData> fDataQueue;
//rowgroup::RGData fData;
ordering::iterableQueue<rowgroup::RGData> fDataQueue;
uint64_t fRowsPerRG;
uint64_t fErrorCode;
uint64_t fMemSize;
@ -163,21 +183,28 @@ class GroupConcatOrderBy : public GroupConcator, public ordering::IdbOrderBy
{
public:
GroupConcatOrderBy();
virtual ~GroupConcatOrderBy();
~GroupConcatOrderBy() override;
using ordering::IdbOrderBy::initialize;
void initialize(const rowgroup::SP_GroupConcat&);
void processRow(const rowgroup::Row&);
uint64_t getKeyLength() const;
void initialize(const rowgroup::SP_GroupConcat&) override;
void processRow(const rowgroup::Row&) override;
uint64_t getKeyLength() const override;
void merge(GroupConcator*);
void merge(GroupConcator*) override;
using GroupConcator::getResult;
uint8_t* getResultImpl(const std::string& sep);
uint8_t* getResultImpl(const std::string& sep) override;
//uint8_t* getResult(const std::string& sep);
const std::string toString() const;
const std::string toString() const override;
void serialize(messageqcpp::ByteStream &) const override;
void deserialize(messageqcpp::ByteStream &, span<rowgroup::SP_GroupConcat> groupConcats) override;
rowgroup::RGDataSizeType getDataSize() const override
{
return GroupConcator::getDataSize() + fRowGroup.getSizeWithStrings() + fMemSize;
}
protected:
uint64_t getCurrentRowPos() const;
};
} // namespace joblist

View File

@ -769,32 +769,31 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
// the RID is no meaning here, use it to store the estimated length.
int16_t estLen = lengthEstimate(fRow0);
fRow0.setRid(estLen);
OrderByRow newRow(fRow0, fRule);
OrderByRow newRow(fRow0, getCurrentRowPos(), fRule);
fOrderByQueue.push(newRow);
fCurrentLength += estLen;
// add to the distinct map
if (fDistinct)
fDistinctMap->insert(fRow0.getPointer());
fDistinctMap->emplace(fRow0.getPointer(), getCurrentRowPos());
fRowGroup.incRowCount();
fRow0.nextRow();
if (fRowGroup.getRowCount() >= fRowsPerRG)
{
fDataQueue.push(fData);
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
if (!fRm->getMemory(newSize, fSessionMemLimit))
if (fRm && !fRm->getMemory(newSize, fSessionMemLimit))
{
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
throw IDBExcept(fErrorCode);
}
fMemSize += newSize;
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0);
fRowGroup.getRow(0, &fRow0);
}
@ -817,7 +816,7 @@ void JsonArrayAggOrderBy::processRow(const rowgroup::Row& row)
// only the copyRow does useful work here
fDistinctMap->erase(swapRow.fData);
copyRow(row, &fRow2);
fDistinctMap->insert(swapRow.fData);
fDistinctMap->emplace(swapRow.fData, swapRow.fPos);
}
int16_t estLen = lengthEstimate(fRow2);
@ -832,9 +831,21 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
{
JsonArrayAggOrderBy* go = dynamic_cast<JsonArrayAggOrderBy*>(gc);
size_t mySz = fDataQueue.size();
while (!go->fDataQueue.empty()) {
fDataQueue.emplace(std::move(go->fDataQueue.front()));
go->fDataQueue.pop();
}
while (go->fOrderByQueue.empty() == false)
{
const OrderByRow& row = go->fOrderByQueue.top();
OrderByRow row = go->fOrderByQueue.top();
uint64_t new_rgid = (row.fPos >> 16) + mySz;
row.fPos = (new_rgid << 16) + (row.fPos & 0xffff);
fRowGroup.setData(&fDataQueue[new_rgid]);
Row tmp_row;
fRowGroup.getRow(row.fPos & 0xffff, &tmp_row);
row.fData = tmp_row.getPointer();
// check if the distinct row already exists
if (fDistinct && fDistinctMap->find(row.fData) != fDistinctMap->end())
@ -851,7 +862,7 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
// add to the distinct map
if (fDistinct)
fDistinctMap->insert(row.fData);
fDistinctMap->emplace(row.fData, row.fPos);
}
else if (fOrderByCond.size() > 0 && fRule.less(row.fData, fOrderByQueue.top().fData))
@ -864,7 +875,7 @@ void JsonArrayAggOrderBy::merge(GroupConcator* gc)
if (fDistinct)
{
fDistinctMap->erase(swapRow.fData);
fDistinctMap->insert(row.fData);
fDistinctMap->emplace(row.fData, row.fPos);
}
row1.setData(row.fData);
@ -931,6 +942,10 @@ const string JsonArrayAggOrderBy::toString() const
return (baseStr + oss.str());
}
uint64_t JsonArrayAggOrderBy::getCurrentRowPos() const {
return ((fDataQueue.size() - 1) << 16) + fRowGroup.getRowCount() - 1;
}
JsonArrayAggNoOrder::JsonArrayAggNoOrder()
: fRowsPerRG(128), fErrorCode(ERR_AGGREGATION_TOO_BIG), fMemSize(0), fRm(NULL)
{
@ -959,15 +974,16 @@ void JsonArrayAggNoOrder::initialize(const rowgroup::SP_GroupConcat& gcc)
uint64_t newSize = fRowsPerRG * fRowGroup.getRowSize();
if (!fRm->getMemory(newSize, fSessionMemLimit))
if (fRm && !fRm->getMemory(newSize, fSessionMemLimit))
{
cerr << IDBErrorInfo::instance()->errorMsg(fErrorCode) << " @" << __FILE__ << ":" << __LINE__;
throw IDBExcept(fErrorCode);
}
fMemSize += newSize;
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0);
fRowGroup.initRow(&fRow);
fRowGroup.getRow(0, &fRow);
@ -998,9 +1014,9 @@ void JsonArrayAggNoOrder::processRow(const rowgroup::Row& row)
}
fMemSize += newSize;
fDataQueue.push(fData);
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0);
fRowGroup.getRow(0, &fRow);
}
@ -1017,7 +1033,7 @@ void JsonArrayAggNoOrder::merge(GroupConcator* gc)
in->fDataQueue.pop();
}
fDataQueue.push(in->fData);
fRowGroup.setData(&fDataQueue.back());
fMemSize += in->fMemSize;
in->fMemSize = 0;
}
@ -1029,7 +1045,6 @@ uint8_t* JsonArrayAggNoOrder::getResultImpl(const string&)
if (fRowGroup.getRowCount() > 0)
{
oss << '[';
fDataQueue.push(fData);
while (fDataQueue.size() > 0)
{
@ -1046,8 +1061,6 @@ uint8_t* JsonArrayAggNoOrder::getResultImpl(const string&)
outputRow(oss, fRow);
fRow.nextRow();
}
fDataQueue.pop();
}
oss << ']';
}

View File

@ -39,31 +39,30 @@ class JsonArrayInfo : public GroupConcatInfo
{
public:
void prepJsonArray(JobInfo&);
void mapColumns(const rowgroup::RowGroup&);
void mapColumns(const rowgroup::RowGroup&) override;
const std::string toString() const;
const std::string toString() const override;
protected:
uint32_t getColumnKey(const execplan::SRCP& srcp, JobInfo& jobInfo);
std::shared_ptr<int[]> makeMapping(const rowgroup::RowGroup&, const rowgroup::RowGroup&);
uint32_t getColumnKey(const execplan::SRCP& srcp, JobInfo& jobInfo) override;
std::shared_ptr<int[]> makeMapping(const rowgroup::RowGroup&, const rowgroup::RowGroup&) override;
};
class JsonArrayAggregatAgUM : public GroupConcatAgUM
{
public:
EXPORT JsonArrayAggregatAgUM(rowgroup::SP_GroupConcat&);
EXPORT ~JsonArrayAggregatAgUM();
EXPORT explicit JsonArrayAggregatAgUM(rowgroup::SP_GroupConcat&);
EXPORT ~JsonArrayAggregatAgUM() override;
using rowgroup::GroupConcatAg::merge;
void initialize();
void processRow(const rowgroup::Row&);
void initialize() override;
void processRow(const rowgroup::Row&) override;
EXPORT void merge(const rowgroup::Row&, int64_t);
EXPORT void getResult(uint8_t*);
EXPORT uint8_t* getResult();
EXPORT uint8_t* getResult() override;
protected:
void applyMapping(const std::shared_ptr<int[]>&, const rowgroup::Row&);
void applyMapping(const std::shared_ptr<int[]>&, const rowgroup::Row&) override;
};
// JSON_ARRAYAGG base
@ -71,17 +70,18 @@ class JsonArrayAggregator : public GroupConcator
{
public:
JsonArrayAggregator();
virtual ~JsonArrayAggregator();
virtual void initialize(const rowgroup::SP_GroupConcat&);
~JsonArrayAggregator() override;
void initialize(const rowgroup::SP_GroupConcat&) override;
virtual void processRow(const rowgroup::Row&) = 0;
virtual const std::string toString() const;
const std::string toString() const override;
protected:
virtual bool concatColIsNull(const rowgroup::Row&);
virtual void outputRow(std::ostringstream&, const rowgroup::Row&);
virtual int64_t lengthEstimate(const rowgroup::Row&);
bool concatColIsNull(const rowgroup::Row&) override;
void outputRow(std::ostringstream&, const rowgroup::Row&) override;
int64_t lengthEstimate(const rowgroup::Row&) override;
};
// For JSON_ARRAYAGG withour distinct or orderby
@ -89,22 +89,21 @@ class JsonArrayAggNoOrder : public JsonArrayAggregator
{
public:
JsonArrayAggNoOrder();
virtual ~JsonArrayAggNoOrder();
~JsonArrayAggNoOrder() override;
void initialize(const rowgroup::SP_GroupConcat&);
void processRow(const rowgroup::Row&);
void initialize(const rowgroup::SP_GroupConcat&) override;
void processRow(const rowgroup::Row&) override;
using GroupConcator::merge;
void merge(GroupConcator*);
void merge(GroupConcator*) override;
using GroupConcator::getResult;
uint8_t* getResultImpl(const std::string& sep);
uint8_t* getResultImpl(const std::string& sep) override;
const std::string toString() const;
const std::string toString() const override;
protected:
rowgroup::RowGroup fRowGroup;
rowgroup::Row fRow;
rowgroup::RGData fData;
std::queue<rowgroup::RGData> fDataQueue;
uint64_t fRowsPerRG;
uint64_t fErrorCode;
@ -118,21 +117,21 @@ class JsonArrayAggOrderBy : public JsonArrayAggregator, public ordering::IdbOrde
{
public:
JsonArrayAggOrderBy();
virtual ~JsonArrayAggOrderBy();
~JsonArrayAggOrderBy() override;
using ordering::IdbOrderBy::initialize;
void initialize(const rowgroup::SP_GroupConcat&);
void processRow(const rowgroup::Row&);
uint64_t getKeyLength() const;
void initialize(const rowgroup::SP_GroupConcat&) override;
void processRow(const rowgroup::Row&) override;
uint64_t getKeyLength() const override;
using GroupConcator::merge;
void merge(GroupConcator*);
void merge(GroupConcator*) override;
using GroupConcator::getResult;
uint8_t* getResultImpl(const std::string& sep);
const std::string toString() const;
uint8_t* getResultImpl(const std::string& sep) override;
const std::string toString() const override;
protected:
uint64_t getCurrentRowPos() const;
};
} // namespace joblist

View File

@ -113,7 +113,7 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
if (fOrderByQueue.size() < fStart + fCount)
{
copyRow(row, &fRow0);
OrderByRow newRow(fRow0, fRule);
OrderByRow newRow(fRow0, 0, fRule);
fOrderByQueue.push(newRow);
uint64_t memSizeInc = sizeof(newRow);
@ -131,14 +131,13 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
// add to the distinct map
if (fDistinct)
fDistinctMap->insert(fRow0.getPointer());
fDistinctMap->emplace(fRow0.getPointer(), 0);
fRowGroup.incRowCount();
fRow0.nextRow();
if (fRowGroup.getRowCount() >= fRowsPerRG)
{
fDataQueue.push(fData);
uint64_t newSize = fRowGroup.getSizeWithStrings() - fRowGroup.getHeaderSize();
if (!fRm->getMemory(newSize, fSessionMemLimit))
@ -148,8 +147,9 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
}
fMemSize += newSize;
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0);
fRowGroup.getRow(0, &fRow0);
}
@ -164,7 +164,7 @@ void LimitedOrderBy::processRow(const rowgroup::Row& row)
if (fDistinct)
{
fDistinctMap->erase(fOrderByQueue.top().fData);
fDistinctMap->insert(row1.getPointer());
fDistinctMap->emplace(row1.getPointer(), 0);
}
fOrderByQueue.pop();
@ -190,9 +190,11 @@ void LimitedOrderBy::finalize()
fUncommitedMemory = 0;
}
queue<RGData> tempQueue;
iterableQueue<RGData> tempQueue;
#if 0 // FIXME kemm
if (fRowGroup.getRowCount() > 0)
fDataQueue.push(fData);
#endif
if (fOrderByQueue.size() > 0)
{
@ -231,8 +233,9 @@ void LimitedOrderBy::finalize()
i = 0;
uint32_t rSize = fRow0.getSize();
uint64_t preLastRowNumb = fRowsPerRG - 1;
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0);
// *DRRTUY This approach won't work with
// OFSET > fRowsPerRG
@ -252,7 +255,7 @@ void LimitedOrderBy::finalize()
// if RG has fRowsPerRG rows
if (offset == (uint64_t)-1)
{
tempRGDataList.push_front(fData);
tempRGDataList.push_front(fDataQueue.back());
if (!fRm->getMemory(memSizeInc, fSessionMemLimit))
{
@ -261,8 +264,9 @@ void LimitedOrderBy::finalize()
}
fMemSize += memSizeInc;
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0); // ?
fRowGroup.getRow(preLastRowNumb, &fRow0);
offset = preLastRowNumb;
@ -270,7 +274,7 @@ void LimitedOrderBy::finalize()
}
// Push the last/only group into the queue.
if (fRowGroup.getRowCount() > 0)
tempRGDataList.push_front(fData);
tempRGDataList.push_front(fDataQueue.back());
for (tempListIter = tempRGDataList.begin(); tempListIter != tempRGDataList.end(); tempListIter++)
tempQueue.push(*tempListIter);

View File

@ -760,8 +760,9 @@ void IdbOrderBy::initialize(const RowGroup& rg)
throw IDBExcept(fErrorCode);
}
fMemSize += newSize;
fData.reinit(fRowGroup, fRowsPerRG);
fRowGroup.setData(&fData);
RGData rgdata(fRowGroup, fRowsPerRG);
fDataQueue.emplace(std::move(rgdata));
fRowGroup.setData(&fDataQueue.back());
fRowGroup.resetRowGroup(0);
fRowGroup.initRow(&fRow0);
fRowGroup.getRow(0, &fRow0);

View File

@ -29,7 +29,7 @@
#include <boost/scoped_ptr.hpp>
#include <tr1/unordered_set>
#include <unordered_map>
#include "rowgroup.h"
#include "hasher.h"
@ -48,7 +48,9 @@ template <typename _Tp, typename _Sequence = std::vector<_Tp>,
class reservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare>
{
public:
typedef typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type size_type;
using base_type = std::priority_queue<_Tp, _Sequence, _Compare>;
using size_type = typename std::priority_queue<_Tp, _Sequence, _Compare>::size_type;
using value_type = typename std::priority_queue<_Tp, _Sequence, _Compare>::value_type;
reservablePQ(size_type capacity = 0)
{
reserve(capacity);
@ -61,13 +63,53 @@ class reservablePQ : private std::priority_queue<_Tp, _Sequence, _Compare>
{
return this->c.capacity();
}
typename base_type::container_type::const_iterator begin() const {
return this->c.begin();
}
typename base_type::container_type::const_iterator end() const {
return this->c.end();
}
const value_type& operator[](size_type i) const {
return this->c.at(i);
}
using std::priority_queue<_Tp, _Sequence, _Compare>::size;
using std::priority_queue<_Tp, _Sequence, _Compare>::top;
using std::priority_queue<_Tp, _Sequence, _Compare>::pop;
using std::priority_queue<_Tp, _Sequence, _Compare>::push;
using std::priority_queue<_Tp, _Sequence, _Compare>::emplace;
using std::priority_queue<_Tp, _Sequence, _Compare>::empty;
};
template<typename _Tp, typename _Container = std::deque<_Tp>>
class iterableQueue: private std::queue<_Tp, _Container> {
public:
using base_type = std::queue<_Tp, _Container>;
using size_type = typename std::queue<_Tp, _Container>::size_type;
using value_type = typename std::queue<_Tp, _Container>::value_type;
typename base_type::container_type::const_iterator begin() const {
return this->c.begin();
}
typename base_type::container_type::const_iterator end() const {
return this->c.end();
}
const value_type& operator[](size_type i) const {
return this->c.at(i);
}
value_type& operator[](size_type i) {
return this->c.at(i);
}
using std::queue<_Tp, _Container>::size;
using std::queue<_Tp, _Container>::empty;
using std::queue<_Tp, _Container>::front;
using std::queue<_Tp, _Container>::back;
using std::queue<_Tp, _Container>::pop;
using std::queue<_Tp, _Container>::push;
using std::queue<_Tp, _Container>::emplace;
using std::queue<_Tp, _Container>::swap;
};
// forward reference
class IdbCompare;
class OrderByRow;
@ -346,7 +388,7 @@ class IdbCompare
class OrderByRow
{
public:
OrderByRow(const rowgroup::Row& r, CompareRule& c) : fData(r.getPointer()), fRule(&c)
OrderByRow(const rowgroup::Row& r, uint64_t pos, CompareRule& c) : fData(r.getPointer()), fPos(pos), fRule(&c)
{
}
@ -356,6 +398,7 @@ class OrderByRow
}
rowgroup::Row::Pointer fData;
uint64_t fPos;
CompareRule* fRule;
};
@ -434,8 +477,7 @@ class IdbOrderBy : public IdbCompare
rowgroup::Row fRow0;
CompareRule fRule;
rowgroup::RGData fData;
std::queue<rowgroup::RGData> fDataQueue;
iterableQueue<rowgroup::RGData> fDataQueue;
struct Hasher
{
@ -457,8 +499,8 @@ class IdbOrderBy : public IdbCompare
bool operator()(const rowgroup::Row::Pointer&, const rowgroup::Row::Pointer&) const;
};
typedef std::tr1::unordered_set<rowgroup::Row::Pointer, Hasher, Eq,
utils::STLPoolAllocator<rowgroup::Row::Pointer> >
typedef std::unordered_map<rowgroup::Row::Pointer, uint64_t /* FIXME kemm */, Hasher, Eq,
utils::STLPoolAllocator<std::pair<const rowgroup::Row::Pointer, uint64_t>>>
DistinctMap_t;
boost::scoped_ptr<DistinctMap_t> fDistinctMap;
rowgroup::Row row1, row2; // scratch space for Hasher & Eq