Skip to content

Commit 96b2606

Browse files
IGNITE-27431 SQL Calcite: Optimize scans with filter
1 parent 3534e59 commit 96b2606

File tree

14 files changed

+404
-114
lines changed

14 files changed

+404
-114
lines changed

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheColumnsScan.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.ignite.internal.processors.query.calcite.exec;
1919

20+
import java.util.Iterator;
2021
import org.apache.calcite.rel.type.RelDataType;
2122
import org.apache.calcite.util.ImmutableBitSet;
2223
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
2324
import org.apache.ignite.internal.processors.query.calcite.schema.CacheTableDescriptor;
2425
import org.jetbrains.annotations.Nullable;
2526

2627
/** */
27-
public abstract class AbstractCacheColumnsScan<Row> extends AbstractCacheScan<Row> {
28+
public abstract class AbstractCacheColumnsScan<TableRow, Row> extends AbstractCacheScan<Row>
29+
implements TableRowIterable<TableRow, Row> {
2830
/** */
2931
protected final CacheTableDescriptor desc;
3032

@@ -34,8 +36,8 @@ public abstract class AbstractCacheColumnsScan<Row> extends AbstractCacheScan<Ro
3436
/** */
3537
protected final RelDataType rowType;
3638

37-
/** Participating columns. */
38-
protected final ImmutableBitSet requiredColumns;
39+
/** Row field to column mapping. */
40+
protected final int[] fieldColMapping;
3941

4042
/** */
4143
AbstractCacheColumnsScan(
@@ -47,9 +49,30 @@ public abstract class AbstractCacheColumnsScan<Row> extends AbstractCacheScan<Ro
4749
super(ectx, desc.cacheContext(), parts);
4850

4951
this.desc = desc;
50-
this.requiredColumns = requiredColumns;
5152

5253
rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns);
5354
factory = ectx.rowHandler().factory(ectx.getTypeFactory(), rowType);
55+
56+
ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, rowType.getFieldCount())
57+
: requiredColumns;
58+
59+
fieldColMapping = reqCols.toArray();
5460
}
61+
62+
/** {@inheritDoc} */
63+
@Override public final Iterator<TableRow> tableRowIterator() {
64+
reserve();
65+
66+
try {
67+
return createTableRowIterator();
68+
}
69+
catch (Exception e) {
70+
release();
71+
72+
throw e;
73+
}
74+
}
75+
76+
/** Table row iterator.*/
77+
protected abstract Iterator<TableRow> createTableRowIterator();
5578
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractCacheScan.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public abstract class AbstractCacheScan<Row> implements Iterable<Row>, AutoClose
100100
}
101101

102102
/** */
103-
private synchronized void reserve() {
103+
protected synchronized void reserve() {
104104
if (reservation != null)
105105
return;
106106

@@ -151,7 +151,7 @@ private synchronized void reserve() {
151151
}
152152

153153
/** */
154-
private synchronized void release() {
154+
protected synchronized void release() {
155155
if (reservation != null)
156156
reservation.release();
157157

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/IndexScan.java

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
/**
5757
* Scan on index.
5858
*/
59-
public class IndexScan<Row> extends AbstractCacheColumnsScan<Row> {
59+
public class IndexScan<Row> extends AbstractCacheColumnsScan<IndexRow, Row> {
6060
/** */
6161
private final GridKernalContext kctx;
6262

@@ -109,7 +109,7 @@ public IndexScan(
109109
for (int i = 0; i < srcRowType.getFieldCount(); i++)
110110
fieldsStoreTypes[i] = typeFactory.getResultClass(srcRowType.getFieldList().get(i).getType());
111111

112-
fieldIdxMapping = fieldToInlinedKeysMapping(srcRowType.getFieldCount());
112+
fieldIdxMapping = fieldToInlinedKeysMapping();
113113

114114
if (!F.isEmpty(ectx.getQryTxEntries())) {
115115
InlineIndexRowHandler rowHnd = idx.segment(0).rowHandler();
@@ -131,7 +131,7 @@ public IndexScan(
131131
* @return Mapping from target row fields to inlined index keys, or {@code null} if inlined index keys
132132
* should not be used.
133133
*/
134-
private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
134+
private int[] fieldToInlinedKeysMapping() {
135135
List<InlineIndexKeyType> inlinedKeys = idx.segment(0).rowHandler().inlineIndexKeyTypes();
136136

137137
// Since inline scan doesn't check expire time, allow it only if expired entries are eagerly removed.
@@ -141,7 +141,7 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
141141
// Even if we need some subset of inlined keys we are required to the read full inlined row, since this row
142142
// is also participated in comparison with other rows when cursor processing the next index page.
143143
if (inlinedKeys.size() < idx.segment(0).rowHandler().indexKeyDefinitions().size() ||
144-
inlinedKeys.size() < (requiredColumns == null ? srcFieldsCnt : requiredColumns.cardinality()))
144+
inlinedKeys.size() < fieldColMapping.length)
145145
return null;
146146

147147
for (InlineIndexKeyType keyType : inlinedKeys) {
@@ -153,14 +153,11 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
153153
return null;
154154
}
155155

156-
ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, srcFieldsCnt) :
157-
requiredColumns;
158-
159156
int[] fieldIdxMapping = new int[rowType.getFieldCount()];
160157

161-
for (int i = 0, j = reqCols.nextSetBit(0); j != -1; j = reqCols.nextSetBit(j + 1), i++) {
158+
for (int i = 0; i < fieldColMapping.length; i++) {
162159
// j = source field index, i = target field index.
163-
int keyIdx = idxFieldMapping.indexOf(j);
160+
int keyIdx = idxFieldMapping.indexOf(fieldColMapping[i]);
164161

165162
if (keyIdx >= 0 && keyIdx < inlinedKeys.size())
166163
fieldIdxMapping[i] = keyIdx;
@@ -173,14 +170,32 @@ private int[] fieldToInlinedKeysMapping(int srcFieldsCnt) {
173170

174171
/** {@inheritDoc} */
175172
@Override protected Iterator<Row> createIterator() {
173+
return F.iterator(createTableRowIterator(), this::indexRow2Row, true);
174+
}
175+
176+
/** {@inheritDoc} */
177+
@Override protected Iterator<IndexRow> createTableRowIterator() {
176178
RangeIterable<IndexRow> ranges0 = ranges == null ? null : new TransformRangeIterable<>(ranges, this::row2indexRow);
177179

178180
TreeIndex<IndexRow> treeIdx = treeIndex();
179181

180182
if (!txChanges.changedKeysEmpty())
181183
treeIdx = new TxAwareTreeIndexWrapper(treeIdx);
182184

183-
return F.iterator(new TreeIndexIterable<>(treeIdx, ranges0), this::indexRow2Row, true);
185+
return new TreeIndexIterable<>(treeIdx, ranges0).iterator();
186+
}
187+
188+
/** {@inheritDoc} */
189+
@Override public Row enrichRow(IndexRow idxRow, Row row, int[] fieldColMapping) {
190+
try {
191+
if (idxRow.indexPlainRow())
192+
return inlineIndexRow2Row(idxRow, row, fieldColMapping);
193+
else
194+
return desc.toRow(ectx, idxRow.cacheDataRow(), row, fieldColMapping);
195+
}
196+
catch (IgniteCheckedException e) {
197+
throw new IgniteException(e);
198+
}
184199
}
185200

186201
/** */
@@ -221,28 +236,23 @@ protected IndexRow row2indexRow(Row bound) {
221236
}
222237

223238
/** From IndexRow to Row convertor. */
224-
protected Row indexRow2Row(IndexRow row) {
225-
try {
226-
if (row.indexPlainRow())
227-
return inlineIndexRow2Row(row);
228-
else
229-
return desc.toRow(ectx, row.cacheDataRow(), factory, requiredColumns);
230-
}
231-
catch (IgniteCheckedException e) {
232-
throw new IgniteException(e);
233-
}
239+
protected Row indexRow2Row(IndexRow idxRow) {
240+
Row row = factory.create();
241+
242+
return enrichRow(idxRow, row, fieldColMapping);
234243
}
235244

236245
/** */
237-
private Row inlineIndexRow2Row(IndexRow row) {
246+
private Row inlineIndexRow2Row(IndexRow idxRow, Row row, int[] fieldColMapping) {
238247
RowHandler<Row> hnd = ectx.rowHandler();
239248

240-
Row res = factory.create();
241-
242-
for (int i = 0; i < fieldIdxMapping.length; i++)
243-
hnd.set(i, res, TypeUtils.toInternal(ectx, row.key(fieldIdxMapping[i]).key()));
249+
for (int i = 0; i < fieldColMapping.length; i++) {
250+
// Skip not required fields.
251+
if (fieldColMapping[i] >= 0)
252+
hnd.set(i, row, TypeUtils.toInternal(ectx, idxRow.key(fieldIdxMapping[i]).key()));
253+
}
244254

245-
return res;
255+
return row;
246256
}
247257

248258
/** Query context. */

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ProjectNode;
6565
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
6666
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanStorageNode;
67+
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanTableRowNode;
6768
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateNode;
6869
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SortNode;
6970
import org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolNode;
@@ -116,6 +117,7 @@
116117
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
117118
import org.apache.ignite.internal.processors.query.calcite.util.RexUtils;
118119
import org.apache.ignite.internal.util.typedef.F;
120+
import org.jetbrains.annotations.Nullable;
119121

120122
import static org.apache.calcite.rel.RelDistribution.Type.HASH_DISTRIBUTED;
121123
import static org.apache.ignite.internal.processors.query.calcite.util.TypeUtils.combinedRowType;
@@ -368,10 +370,10 @@ private boolean hasExchange(RelNode rel) {
368370
ImmutableBitSet requiredColumns = rel.requiredColumns();
369371
List<SearchBounds> searchBounds = rel.searchBounds();
370372

371-
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
373+
RelDataType inputRowType = tbl.getRowType(typeFactory, requiredColumns);
372374

373-
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
374-
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
375+
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, inputRowType);
376+
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, inputRowType);
375377
RangeIterable<Row> ranges = searchBounds == null ? null :
376378
expressionFactory.ranges(searchBounds, rel.collation(), tbl.getRowType(typeFactory));
377379

@@ -382,7 +384,8 @@ private boolean hasExchange(RelNode rel) {
382384
if (idx != null && !tbl.isIndexRebuildInProgress()) {
383385
Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges, requiredColumns);
384386

385-
return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx, rowType, rowsIter, filters, prj);
387+
return createStorageScan(tbl.name() + '.' + idx.name(), rel.getRowType(), inputRowType,
388+
rowsIter, filters, prj, requiredColumns, rel.conditionColumns());
386389
}
387390
else {
388391
// Index was invalidated after planning, workaround through table-scan -> sort -> index spool.
@@ -402,12 +405,10 @@ private boolean hasExchange(RelNode rel) {
402405
requiredColumns
403406
);
404407

405-
// If there are projects in the scan node - after the scan we already have target row type.
406-
if (!spoolNodeRequired && projects != null)
407-
rowType = rel.getRowType();
408+
RelDataType rowType = projNodeRequired ? rel.getRowType() : inputRowType;
408409

409-
Node<Row> node = new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filterHasCorrelation ? null : filters,
410-
projNodeRequired ? null : prj);
410+
Node<Row> node = createStorageScan(tbl.name(), rowType, inputRowType, rowsIter,
411+
filterHasCorrelation ? null : filters, projNodeRequired ? null : prj, requiredColumns, rel.conditionColumns());
411412

412413
RelCollation collation = rel.collation();
413414

@@ -438,7 +439,7 @@ private boolean hasExchange(RelNode rel) {
438439
remappedSearchBounds.add(searchBounds.get(i));
439440

440441
// Collation and row type are already remapped taking into account requiredColumns.
441-
ranges = expressionFactory.ranges(remappedSearchBounds, collation, rowType);
442+
ranges = expressionFactory.ranges(remappedSearchBounds, collation, inputRowType);
442443
}
443444

444445
IndexSpoolNode<Row> spoolNode = IndexSpoolNode.createTreeSpool(
@@ -548,10 +549,10 @@ private boolean hasExchange(RelNode rel) {
548549
IgniteTable tbl = rel.getTable().unwrap(IgniteTable.class);
549550
IgniteTypeFactory typeFactory = ctx.getTypeFactory();
550551

551-
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
552+
RelDataType inputRowType = tbl.getRowType(typeFactory, requiredColumns);
552553

553-
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
554-
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
554+
Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, inputRowType);
555+
Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, inputRowType);
555556

556557
ColocationGroup grp = ctx.group(rel.sourceId());
557558

@@ -560,12 +561,14 @@ private boolean hasExchange(RelNode rel) {
560561
if (idx != null && !tbl.isIndexRebuildInProgress()) {
561562
Iterable<Row> rowsIter = idx.scan(ctx, grp, null, requiredColumns);
562563

563-
return new ScanStorageNode<>(tbl.name() + '.' + idx.name(), ctx, rowType, rowsIter, filters, prj);
564+
return createStorageScan(tbl.name() + '.' + idx.name(), rel.getRowType(), inputRowType,
565+
rowsIter, filters, prj, requiredColumns, rel.conditionColumns());
564566
}
565567
else {
566568
Iterable<Row> rowsIter = tbl.scan(ctx, grp, requiredColumns);
567569

568-
return new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filters, prj);
570+
return createStorageScan(tbl.name(), rel.getRowType(), inputRowType, rowsIter, filters, prj,
571+
requiredColumns, rel.conditionColumns());
569572
}
570573
}
571574

@@ -943,4 +946,46 @@ private Node<Row> visit(RelNode rel) {
943946
public <T extends Node<Row>> T go(IgniteRel rel) {
944947
return (T)visit(rel);
945948
}
949+
950+
/** */
951+
private ScanStorageNode<Row> createStorageScan(
952+
String storageName,
953+
RelDataType outputRowType,
954+
RelDataType inputRowType,
955+
Iterable<Row> rowsIter,
956+
@Nullable Predicate<Row> filter,
957+
@Nullable Function<Row, Row> rowTransformer,
958+
@Nullable ImmutableBitSet requiredColumns,
959+
@Nullable ImmutableBitSet filterColumns
960+
) {
961+
int fieldsCnt = outputRowType.getFieldCount();
962+
963+
if (filter == null || filterColumns == null || filterColumns.cardinality() == fieldsCnt
964+
|| !(rowsIter instanceof TableRowIterable))
965+
return new ScanStorageNode<>(storageName, ctx, outputRowType, rowsIter, filter, rowTransformer);
966+
967+
ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, fieldsCnt) : requiredColumns;
968+
969+
int[] filterColMapping = reqCols.toArray();
970+
int[] otherColMapping = filterColMapping.clone();
971+
972+
for (int i = 0; i < filterColMapping.length; i++) {
973+
if (filterColumns.get(i))
974+
otherColMapping[i] = -1;
975+
else
976+
filterColMapping[i] = -1;
977+
}
978+
979+
return new ScanTableRowNode<>(
980+
storageName,
981+
ctx,
982+
outputRowType,
983+
inputRowType,
984+
(TableRowIterable<Object, Row>)rowsIter,
985+
filter,
986+
rowTransformer,
987+
filterColMapping,
988+
otherColMapping
989+
);
990+
}
946991
}

modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/SystemViewScan.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public class SystemViewScan<Row, ViewRow> implements Iterable<Row> {
4747
/** */
4848
private final RangeIterable<Row> ranges;
4949

50-
/** Participating colunms. */
51-
private final ImmutableBitSet requiredColumns;
50+
/** Row field to view column mapping. */
51+
protected final int[] fieldColMapping;
5252

5353
/** System view field names (for filtering). */
5454
private final String[] filterableFieldNames;
@@ -66,7 +66,6 @@ public SystemViewScan(
6666
this.ectx = ectx;
6767
this.desc = desc;
6868
this.ranges = ranges;
69-
this.requiredColumns = requiredColumns;
7069

7170
RelDataType rowType = desc.rowType(ectx.getTypeFactory(), requiredColumns);
7271

@@ -83,6 +82,11 @@ public SystemViewScan(
8382
}
8483
}
8584
}
85+
86+
ImmutableBitSet reqCols = requiredColumns == null ? ImmutableBitSet.range(0, rowType.getFieldCount())
87+
: requiredColumns;
88+
89+
fieldColMapping = reqCols.toArray();
8690
}
8791

8892
/** {@inheritDoc} */
@@ -123,6 +127,6 @@ public SystemViewScan(
123127
else
124128
viewIter = view.iterator();
125129

126-
return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory, requiredColumns), true);
130+
return F.iterator(viewIter, row -> desc.toRow(ectx, row, factory.create(), fieldColMapping), true);
127131
}
128132
}

0 commit comments

Comments
 (0)