Перейти к содержанию

API Reference

This section provides detailed documentation for the main classes and methods of the indexed-parquet-dataset library.

IndexedParquetDataset

The main class for indexing and data access. Inherits from torch.utils.data.Dataset (if PyTorch is installed).

indexed_parquet_dataset.dataset.IndexedParquetDataset

Bases: Dataset

High-performance Parquet dataset with O(1) random access and Schema Evolution support.

Source code in src/indexed_parquet_dataset/dataset.py
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
class IndexedParquetDataset(Dataset):
    """High-performance Parquet dataset with O(1) random access and Schema Evolution support."""

    def __init__(
        self,
        index: BaseIndex,
        indices: Optional[np.ndarray] = None,
        mapper: Optional[SchemaMapper] = None,
        include_source_column: bool = False,
        source_column_name: str = "__source_file__",
        default_fill_value: Any = None,
        fill_values_by_type: Optional[Dict[str, Any]] = None,
        fill_values_by_column: Optional[Dict[str, Any]] = None,
        auto_fill: bool = False,
        max_open_files: int = 128,
        _type_casts: Optional[Dict[str, type]] = None,
        selected_columns: Optional[List[str]] = None,
        _pending_filter: Optional[Dict[str, Any]] = None,
    ):
        """Initializes the dataset.

        Args:
            index: A BaseIndex object containing dataset metadata.
            indices: Array of global indices to expose (for subsetting/shuffling).
            mapper: SchemaMapper for column renaming.
            include_source_column: If True, adds a virtual column with the source file path.
            source_column_name: Name of the virtual source column.
            default_fill_value: Value to use for missing data if no specific rule matches.
            fill_values_by_type: Dict mapping PyArrow types to default values.
            fill_values_by_column: Dict mapping column names to default values.
            auto_fill: If True, automatically populates fill_values_by_type with defaults.
                Note: auto_fill does NOT overwrite values already present in fill_values_by_type.
            max_open_files: Maximum number of simultaneously open Parquet file handles (LRU cache).
            _type_casts: Internal. Per-column cast functions used by concat upcasting.
        """
        self.index = index
        self.mapper = mapper or SchemaMapper()
        self.include_source_column = include_source_column
        self.source_column_name = source_column_name

        self.default_fill_value = default_fill_value
        self.fill_values_by_type = fill_values_by_type or {}

        if auto_fill:
            self._apply_auto_fill()

        self.fill_values_by_column = fill_values_by_column or {}
        self._type_casts: Dict[str, type] = _type_casts or {}  # Internal: concat upcasting
        self.selected_columns = selected_columns

        # indices allows for shuffling, filtering, and subsets without modifying the base index
        self._indices: Optional[np.ndarray] = indices
        self._pending_filter: Optional[Dict[str, Any]] = _pending_filter

        if indices is None:
            # We don't set _indices to arange here if we want to support lazy construction,
            # but usually from_folder/scan_directory gives us total_rows.
            # If indices is None and no _pending_filter, it means 'full dataset'.
            pass

        # Cumulative row counts for fast lookups
        self.file_offsets = np.cumsum([0] + [f.num_rows for f in self.index.files])

        # LRU cache for open file handles (Lazy Loading)
        self.max_open_files = max_open_files
        self._file_handles: OrderedDict[int, pq.ParquetFile] = OrderedDict()

    @property
    def indices(self) -> np.ndarray:
        """Returns the array of active indices, materializing them if necessary."""
        if self._indices is None:
            if self._pending_filter:
                self._materialize_filter()
            else:
                self._indices = np.arange(self.index.total_rows)
        return self._indices

    @indices.setter
    def indices(self, value: np.ndarray):
        """Sets the array of active indices, clearing any pending filters."""
        self._indices = value
        self._pending_filter = None

    def _materialize_filter(self) -> None:
        """Executes the pending filter with potential early stopping (Operator Fusion)."""
        if not self._pending_filter:
            return

        params = self._pending_filter
        f_row = params.get('filter_row')
        f_batch = params.get('filter_batch')
        transform_batch = params.get('transform_batch')
        column_conditions = params.get('column_conditions')
        path_pattern = params.get('path_pattern')
        path_filter = params.get('path_filter')
        limit = params.get('limit')
        batch_size = params.get('batch_size', 1024)
        show_progress = params.get('show_progress', False)
        mapper = params.get('mapper_snapshot', self.mapper)

        # Initial indices to filter
        current_indices = params.get('input_indices', np.arange(self.index.total_rows))

        # 1. File Path Filtering (Fast)
        if path_pattern or path_filter:
            valid_file_indices = []
            filters = [path_filter] if isinstance(path_filter, str) else (path_filter or [])
            for i, f in enumerate(self.index.files):
                match = (path_pattern and isinstance(path_pattern, str) and path_pattern in f.path)
                if not match:
                    for pattern in filters:
                        if fnmatch.fnmatch(f.path, pattern): match = True; break
                if match: valid_file_indices.append(i)

            mask = np.zeros(len(current_indices), dtype=bool)
            for f_idx in valid_file_indices:
                start, end = self.file_offsets[f_idx], self.file_offsets[f_idx + 1]
                mask |= (current_indices >= start) & (current_indices < end)
            current_indices = current_indices[mask]

        # 2. Column Conditions (PyArrow-based)
        if len(current_indices) > 0 and column_conditions:
            file_to_indices = {}
            for idx in current_indices:
                f_idx = np.searchsorted(self.file_offsets, idx, side='right') - 1
                if f_idx not in file_to_indices: file_to_indices[f_idx] = []
                file_to_indices[f_idx].append(idx)

            new_indices_list = []
            for f_idx, f_global_indices in sorted(file_to_indices.items()):
                f_info = self.index.files[f_idx]
                pf = self._get_file_handle(f_idx)

                table = pf.read(columns=[mapper.get_source_column(c) for c in column_conditions.keys() if mapper.get_source_column(c) in f_info.columns])
                file_mask = None
                for col, cond in column_conditions.items():
                    src_col = mapper.get_source_column(col, f_info.path)
                    if src_col not in table.column_names:
                        c_mask = pa.scalar(None, type=pa.bool_())
                    else:
                        arr = table.column(src_col)
                        if isinstance(cond, tuple):
                            op, val = cond
                            if op == '==': c_mask = pc.equal(arr, val)
                            elif op == '>': c_mask = pc.greater(arr, val)
                            elif op == '>=': c_mask = pc.greater_equal(arr, val)
                            elif op == '<': c_mask = pc.less(arr, val)
                            elif op == '<=': c_mask = pc.less_equal(arr, val)
                            else: c_mask = None
                        else:
                            c_mask = pc.equal(arr, cond)

                    if c_mask is not None:
                        if file_mask is None: file_mask = c_mask
                        else: file_mask = pc.and_(file_mask, c_mask)

                if file_mask is None:
                    file_mask_np = np.ones(len(table), dtype=bool)
                else:
                    file_mask_np = pc.fill_null(file_mask, False).to_numpy().astype(bool)

                f_local_indices = (np.array(f_global_indices) - self.file_offsets[f_idx]).astype(int)
                new_indices_list.append(np.array(f_global_indices)[file_mask_np[f_local_indices]])
            current_indices = np.concatenate(new_indices_list) if new_indices_list else np.array([], dtype=int)

        # 3. Predicate / Batch Filter (Python-based) with EARLY STOPPING
        if len(current_indices) > 0 and (f_row or f_batch or transform_batch):
            file_to_indices = {}
            for idx in current_indices:
                f_idx = np.searchsorted(self.file_offsets, idx, side='right') - 1
                if f_idx not in file_to_indices: file_to_indices[f_idx] = []
                file_to_indices[f_idx].append(idx)

            new_indices_list = []
            matches_found = 0
            iterable = sorted(file_to_indices.items())

            for i, (f_idx, f_global_indices) in enumerate(iterable):
                if limit is not None and matches_found >= limit:
                    break

                f_info = self.index.files[f_idx]
                f_local_indices = (np.array(f_global_indices) - self.file_offsets[f_idx]).astype(int)

                desc = f"[File {i+1}/{len(iterable)}] {os.path.basename(f_info.path)}"
                pbar = tqdm(total=len(f_local_indices), desc=desc, disable=not show_progress, leave=False)

                file_results_mask = []
                for start_batch in range(0, len(f_local_indices), batch_size):
                    if limit is not None and matches_found >= limit:
                        break

                    end_batch = min(start_batch + batch_size, len(f_local_indices))
                    batch_local_indices = f_local_indices[start_batch:end_batch].tolist()

                    # Note: _read_rows_from_file uses internal self.mapper, but we should 
                    # ideally use the snapshot mapper for the filter phase if it's different.
                    # However, _read_rows_from_file is designed for normal access.
                    # We'll temporarily point self.mapper to snapshot if needed, 
                    # or just rely on the fact that Mapper is usually stable or merged.
                    rows = self._read_rows_from_file(f_idx, batch_local_indices)

                    if transform_batch:
                        rows = transform_batch(rows)

                    if f_batch:
                        batch_mask = f_batch(rows)
                    elif f_row:
                        batch_mask = [f_row(row) for row in rows]
                    else:
                        batch_mask = [True] * len(rows)

                    # Early stopping within batch
                    valid_in_batch = sum(batch_mask)
                    if limit is not None and matches_found + valid_in_batch > limit:
                        # Slice the mask to stop exactly at limit
                        trimmed_mask = []
                        temp_count = matches_found
                        for val in batch_mask:
                            if val:
                                if temp_count < limit:
                                    trimmed_mask.append(True)
                                    temp_count += 1
                                else:
                                    trimmed_mask.append(False)
                            else:
                                trimmed_mask.append(False)
                        batch_mask = trimmed_mask
                        valid_in_batch = limit - matches_found

                    file_results_mask.extend(batch_mask)
                    matches_found += valid_in_batch

                    if limit is not None:
                        pbar.set_postfix({"found": f"{matches_found}/{limit}"})
                    pbar.update(len(batch_local_indices))

                pbar.close()
                new_indices_list.append(np.array(f_global_indices)[:len(file_results_mask)][np.array(file_results_mask)])

            current_indices = np.concatenate(new_indices_list) if new_indices_list else np.array([], dtype=int)

        self._indices = current_indices
        self._pending_filter = None

    def __getstate__(self):
        """Returns the state for pickling, excluding non-picklable file handles."""
        state = self.__dict__.copy()
        state['_file_handles'] = OrderedDict() # Don't pickle open handles, but keep type
        return state

    def __setstate__(self, state):
        """Restores the state after unpickling."""
        self.__dict__.update(state)
        self._file_handles = OrderedDict() # Re-initialize empty cache as OrderedDict

    @classmethod
    def from_folder(
        cls, 
        directory: str, 
        pattern: str = "*.parquet", 
        recursive: bool = True, 
        strict_schema: bool = False,
        auto_fill: bool = False,
        **kwargs
    ) -> 'IndexedParquetDataset':
        """Creates an IndexedParquetDataset by scanning a directory."""
        index = scan_directory(directory, pattern, recursive, strict_schema)
        return cls(index, auto_fill=auto_fill, **kwargs)

    @property
    def schema(self) -> List[str]:
        """Returns the list of column names available in the dataset (after mapping)."""
        if self.selected_columns is not None:
            return self.selected_columns

        all_cols = set()

        # 1. Base columns from original files
        for col in self.index.all_columns:
            target = self.mapper.mapping.get(col, col)
            if target != col:
                # Global mapping shadows the original name
                all_cols.add(target)
            else:
                # Local mapping check: is it shadowed in ALL files it appears in?
                is_visible_as_original = False
                for f_info in self.index.files:
                    if col in f_info.columns:
                        f_map = self.mapper.file_mappings.get(f_info.path, {})
                        if col not in f_map:
                            is_visible_as_original = True
                            break
                if is_visible_as_original:
                    all_cols.add(col)

        # 2. Add file-specific mapping targets
        for f_map in self.mapper.file_mappings.values():
            for target_col in f_map.values():
                all_cols.add(target_col)

        # 3. Add computed columns (row-level)
        for col in self.mapper.transforms.keys():
            all_cols.add(col)

        # 4. Add computed columns (batch-level)
        if hasattr(self.mapper, 'batch_column_transforms'):
            for col in self.mapper.batch_column_transforms.keys():
                all_cols.add(col)

        if self.include_source_column:
            all_cols.add(self.source_column_name)

        return sorted(list(all_cols))

    def __len__(self) -> int:
        return len(self.indices)

    def __repr__(self) -> str:
        return (
            f"IndexedParquetDataset("
            f"rows={len(self):,}, "
            f"files={len(self.index.files)}, "
            f"columns={len(self.schema)}"
            f")"
        )

    def _get_file_and_local_idx(self, global_idx: int) -> tuple[int, int]:
        actual_idx = self.indices[global_idx]
        file_idx = np.searchsorted(self.file_offsets, actual_idx, side='right') - 1
        local_idx = actual_idx - self.file_offsets[file_idx]
        return int(file_idx), int(local_idx)

    def _get_file_handle(self, file_idx: int) -> pq.ParquetFile:
        if file_idx in self._file_handles:
            self._file_handles.move_to_end(file_idx)  # LRU touch
        else:
            self._file_handles[file_idx] = pq.ParquetFile(self.index.files[file_idx].path)
            if len(self._file_handles) > self.max_open_files:
                self._file_handles.popitem(last=False)  # evict least recently used
        return self._file_handles[file_idx]

    def _get_fill_value(self, column_name: str) -> Any:
        """Determines the fill value for a missing column based on hierarchy."""
        if column_name in self.fill_values_by_column:
            return self.fill_values_by_column[column_name]

        # Find original name (best effort)
        orig_name = None
        for k, v in self.mapper.mapping.items():
            if v == column_name:
                orig_name = k
                break
        if orig_name is None: orig_name = column_name

        col_type = self.index.column_types.get(orig_name)
        if col_type in self.fill_values_by_type:
            return self.fill_values_by_type[col_type]

        return self.default_fill_value

    def _deep_fill_nones(self, value: Any, fill: Any) -> Any:
        """Recursively replaces None values inside nested dicts and lists.

        PyArrow struct columns with null-typed fields (e.g. ``seed: null``)
        yield Python dicts containing ``None`` values at arbitrary depth.
        ``default_collate`` cannot handle ``NoneType`` anywhere in a batch,
        so we must sanitize the entire nested structure.

        Args:
            value: The value returned by PyArrow (may be dict, list, scalar or None).
            fill:  The replacement value to use wherever None is found.

        Returns:
            A sanitized copy of *value* with all Nones replaced by *fill*.
        """
        if value is None:
            return fill
        if isinstance(value, dict):
            return {k: self._deep_fill_nones(v, fill) for k, v in value.items()}
        if isinstance(value, list):
            return [self._deep_fill_nones(v, fill) for v in value]
        return value

    def _apply_auto_fill(self):
        """Populates fill_values_by_type with default values for common types."""
        defaults = {
            # Integers
            'int8': 0, 'int16': 0, 'int32': 0, 'int64': 0,
            'uint8': 0, 'uint16': 0, 'uint32': 0, 'uint64': 0,
            # Floats
            'float16': 0.0, 'float32': 0.0, 'float64': 0.0, 'double': 0.0, 'halffloat': 0.0,
            # Strings
            'string': "", 'large_string': "", 'utf8': "", 'large_utf8': "",
            # Bool
            'bool': False,
            # Dictionary/Categorical (best effort)
            'dictionary': ""
        }
        for t in set(self.index.column_types.values()):
            clean_t = t.lower().split('[')[0] # Handle complex types like list[int64]
            if clean_t in defaults and t not in self.fill_values_by_type:
                self.fill_values_by_type[t] = defaults[clean_t]

    def _read_rows_from_file(self, file_idx: int, local_indices: List[int]) -> List[Dict[str, Any]]:
        """Reads multiple rows from a single file efficiently."""
        pf = self._get_file_handle(file_idx)
        file_info = self.index.files[file_idx]
        file_path = file_info.path

        rg_to_indices = {}
        cumulative_rg_rows = 0
        for i, rg_rows in enumerate(file_info.row_groups):
            mask = (np.array(local_indices) >= cumulative_rg_rows) & (np.array(local_indices) < cumulative_rg_rows + rg_rows)
            rg_indices = np.array(local_indices)[mask]
            if len(rg_indices) > 0:
                rg_to_indices[i] = rg_indices - cumulative_rg_rows
            cumulative_rg_rows += rg_rows

        local_idx_to_result = {}
        target_schema = self.schema

        # Performance optimization: determine source columns to read from disk
        # We only read columns that are present in the target_schema and have a 
        # direct mapping to original columns, OR if we have transforms (which might
        # depend on anything, so we read all if transforms are active).
        # Actually, let's just map target_schema back to source columns.
        requested_source_cols = None
        if self.selected_columns is not None and not self.mapper.transforms:
            # We can optimize only if there are no arbitrary row transforms
            requested_source_cols = []
            for col in target_schema:
                if col == self.source_column_name:
                    continue
                src_col = self.mapper.get_source_column(col, file_path)
                if src_col in file_info.columns:
                    requested_source_cols.append(src_col)

            # If no columns were found but we need some, we must read at least one 
            # to keep the row count correct, but PyArrow handles empty column lists.
            # However, check for virtual columns.
            if not requested_source_cols and target_schema:
                # Fallback to reading all if optimization feels risky
                requested_source_cols = None

        for rg_idx, rg_local_indices in rg_to_indices.items():
            table = pf.read_row_group(rg_idx, columns=requested_source_cols)
            rg_start_offset = sum(file_info.row_groups[:rg_idx])

            # Optimization: Use take() to get only needed rows if indexing is sparse or out of order
            is_sequential = len(rg_local_indices) == len(table) and all(rg_local_indices[i] == i for i in range(len(rg_local_indices)))
            if not is_sequential:
                subset_table = table.take(pa.array(rg_local_indices))
                batch_data = subset_table.to_pylist()
            else:
                batch_data = table.to_pylist()

            # Post-process the batch
            for l_idx_in_rg, item in zip(rg_local_indices, batch_data):
                # 0. Ensure all columns from index are present (as None/Fill)
                for col in self.index.all_columns:
                    if col not in item:
                        item[col] = self._get_fill_value(self.mapper.mapping.get(col, col))

                # 1. Inject virtual source column if needed
                if self.include_source_column:
                    item[self.source_column_name] = file_path

                # 2. Apply global then file-specific mapping logic & Computed Columns (via Mapper)
                item = self.mapper.map_columns(item, file_path)

                # Ensure all columns in final schema are present
                mapped_item = {}
                for col in target_schema:
                    if col not in item:
                        val = self._get_fill_value(col)
                    else:
                        val = item[col]

                    # Handle None if still None and default is set
                    if val is None:
                        val = self._get_fill_value(col)

                    if col in self._type_casts and val is not None:
                        try:
                            val = self._type_casts[col](val)
                        except (ValueError, TypeError):
                            pass

                    # Recursively sanitize nested dicts/lists
                    if isinstance(val, (dict, list)):
                        fill = self._get_fill_value(col)
                        val = self._deep_fill_nones(val, fill)

                    mapped_item[col] = val

                # 3. Apply row-level transformations
                for row_fn in self.mapper.row_transforms:
                    mapped_item = row_fn(mapped_item)

                local_idx_to_result[l_idx_in_rg + rg_start_offset] = mapped_item

        results_ordered = [local_idx_to_result[l_idx] for l_idx in local_indices]

        # Apply batch column transformations (e.g., batch-calculated aliases/transforms)
        if hasattr(self.mapper, 'batch_column_transforms') and self.mapper.batch_column_transforms:
            for col_name, (batch_fn, _) in self.mapper.batch_column_transforms.items():
                col_values = batch_fn(results_ordered)
                for row, val in zip(results_ordered, col_values):
                    row[col_name] = val

        # Apply whole-batch transformations
        if hasattr(self.mapper, 'batch_transforms') and self.mapper.batch_transforms:
            for batch_fn, _ in self.mapper.batch_transforms:
                results_ordered = batch_fn(results_ordered)

        return results_ordered

    def __getitem__(self, idx: Union[int, List[int], slice, np.ndarray]) -> Any:
        if isinstance(idx, (int, np.integer)):
            if idx < 0: idx += len(self)
            if idx < 0 or idx >= len(self): raise IndexError("Index out of range")
            return self.__getitems__([int(idx)])[0]
        elif isinstance(idx, (list, np.ndarray)):
            return self.__getitems__(list(idx))
        elif isinstance(idx, slice):
            return self.select(idx)
        else:
            raise TypeError(f"Invalid index type: {type(idx)}")

    def __getitems__(self, indices: List[int]) -> List[Dict[str, Any]]:
        file_to_local_indices = {}
        for i, global_idx in enumerate(indices):
            f_idx, l_idx = self._get_file_and_local_idx(global_idx)
            if f_idx not in file_to_local_indices: file_to_local_indices[f_idx] = []
            file_to_local_indices[f_idx].append((i, l_idx))

        results = [None] * len(indices)
        for f_idx, indexed_l_indices in file_to_local_indices.items():
            original_positions = [x[0] for x in indexed_l_indices]
            l_indices = [x[1] for x in indexed_l_indices]
            file_results = self._read_rows_from_file(f_idx, l_indices)
            for pos, res in zip(original_positions, file_results):
                results[pos] = res
        return results # type: ignore

    def shuffle(self, seed: Optional[int] = None, rg_buffer: Optional[int] = None) -> 'IndexedParquetDataset':
        """Shuffles the dataset indices.

        Args:
            seed: Random seed for reproducibility.
            rg_buffer: If provided, enables locality-aware shuffling. Instead of a global 
                shuffle, it shuffles row groups and then shuffles rows within a window 
                of `rg_buffer` row groups. This significantly Improves I/O performance 
                by reducing the number of row groups that need to be read/cached 
                simultaneously.

        Returns:
            A new IndexedParquetDataset instance with shuffled indices.
        """
        rng = np.random.default_rng(seed)

        if rg_buffer is None:
            new_indices = self.indices.copy()
            rng.shuffle(new_indices)
            return self._clone_with_indices(new_indices)

        if rg_buffer < 1:
            raise ValueError("rg_buffer must be at least 1")

        # 1. Map current active indices to their respective row groups
        actual_indices = self.indices

        # Find which file each index belongs to
        file_indices = np.searchsorted(self.file_offsets, actual_indices, side='right') - 1

        # Grouping by (file_idx, rg_idx)
        # Note: dict preserves insertion order of first appearance in Python 3.7+
        from collections import defaultdict
        rg_to_indices = defaultdict(list)

        # Pre-calculate RG cumulative boundaries for each file for speed
        file_rg_bounds = [np.cumsum([0] + f.row_groups) for f in self.index.files]

        # Process indices by file to use vector searchsorted logic
        unique_f_indices = np.unique(file_indices)
        for f_idx in unique_f_indices:
            f_mask = (file_indices == f_idx)
            f_global_indices = actual_indices[f_mask]
            f_local_indices = f_global_indices - self.file_offsets[f_idx]

            # Map local row index to row group index within this file
            f_rg_indices = np.searchsorted(file_rg_bounds[f_idx], f_local_indices, side='right') - 1

            # Group global indices by RG
            for rg_idx, global_idx in zip(f_rg_indices, f_global_indices):
                rg_to_indices[(f_idx, rg_idx)].append(global_idx)

        # 2. Get the list of all unique row group IDs present in current indices
        rg_keys = list(rg_to_indices.keys())
        # Shuffle the sequence of row groups themselves
        rng.shuffle(rg_keys)

        # 3. Flatten and shuffle within windows of rg_buffer
        final_indices = []
        for i in range(0, len(rg_keys), rg_buffer):
            window_slice = rg_keys[i : i + rg_buffer]
            window_pool = []
            for key in window_slice:
                window_pool.extend(rg_to_indices[key])

            # Shuffle rows within the current window of row groups
            window_pool_arr = np.array(window_pool)
            rng.shuffle(window_pool_arr)
            final_indices.append(window_pool_arr)

        if not final_indices:
            return self._clone_with_indices(np.array([], dtype=int))

        return self._clone_with_indices(np.concatenate(final_indices))

    def select(self, range_or_indices: Union[slice, List[int], np.ndarray]) -> 'IndexedParquetDataset':
        new_indices = self.indices[range_or_indices]
        return self._clone_with_indices(new_indices)

    def limit(self, n: int) -> 'IndexedParquetDataset':
        """Limits the dataset to the first n rows.

        Args:
            n: Maximum number of rows to keep.

        Returns:
            A new IndexedParquetDataset instance.
        """
        if self._indices is None and self._pending_filter:
            # Operator Fusion: Push the limit into the pending filter
            # Create a copy of the pending filter to avoid side effects on the original
            new_pending = self._pending_filter.copy()
            new_pending['limit'] = n
            return self._clone_with_indices(None, _pending_filter=new_pending)

        return self.select(slice(0, n))

    def _clone_with_indices(self, new_indices: np.ndarray, _pending_filter: Optional[Dict] = None) -> 'IndexedParquetDataset':
        return IndexedParquetDataset(
            self.index, new_indices, self.mapper,
            self.include_source_column, self.source_column_name,
            self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column,
            max_open_files=self.max_open_files,
            _type_casts=self._type_casts.copy(),
            selected_columns=self.selected_columns,
            _pending_filter=_pending_filter if _pending_filter is not None else (self._pending_filter.copy() if self._pending_filter else None)
        )

    def _clone_with_mapper(self, new_mapper: SchemaMapper) -> 'IndexedParquetDataset':
        return IndexedParquetDataset(
            self.index, self.indices.copy(), new_mapper,
            self.include_source_column, self.source_column_name,
            self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column,
            max_open_files=self.max_open_files,
            _type_casts=self._type_casts.copy(),
            selected_columns=self.selected_columns,
            _pending_filter=self._pending_filter.copy() if self._pending_filter else None
        )

    def map(
        self, 
        fn: Callable, 
        *, 
        remove_columns: Optional[List[str]] = None,
        output_schema: Optional[List[str]] = None,
        is_batch: bool = False,
        batch_size: int = 1024
    ) -> 'IndexedParquetDataset':
        """Applies a transformation to the dataset.

        Args:
            fn: A function that takes a row (dict) and returns a transformed row (dict),
                or if is_batch=True, takes a List[dict] and returns a List[dict].
            remove_columns: Optional list of columns to remove from the result.
            output_schema: Optional explicit list of columns for the new schema.
            is_batch: If True, fn is treated as a batch transformation.
            batch_size: Suggested batch size for processing (if is_batch=True).
        """
        new_mapper = SchemaMapper(
            mapping=self.mapper.mapping.copy(),
            file_mappings=self.mapper.file_mappings.copy(),
            transforms=self.mapper.transforms.copy(),
            row_transforms=self.mapper.row_transforms.copy(),
            batch_transforms=getattr(self.mapper, 'batch_transforms', []).copy(),
            batch_column_transforms=getattr(self.mapper, 'batch_column_transforms', {}).copy()
        )
        effective_fn = fn
        if remove_columns:
            if is_batch:
                def _wrapped_batch_fn(batch, _fn=fn, _cols=remove_columns):
                    results = _fn(batch)
                    for row in results:
                        for c in _cols:
                            row.pop(c, None)
                    return results
                effective_fn = _wrapped_batch_fn
            else:
                def _wrapped_row_fn(row, _fn=fn, _cols=remove_columns):
                    result = _fn(row)
                    for c in _cols:
                        result.pop(c, None)
                    return result
                effective_fn = _wrapped_row_fn

        if is_batch:
            new_mapper.batch_transforms.append((effective_fn, batch_size))
        else:
            new_mapper.row_transforms.append(effective_fn)

        ds = self._clone_with_mapper(new_mapper)
        if output_schema:
            ds.selected_columns = output_schema
        return ds

    def map_batches(
        self, 
        fn: Callable[[List[dict]], List[dict]], 
        *, 
        batch_size: int = 1024,
        remove_columns: Optional[List[str]] = None,
        output_schema: Optional[List[str]] = None
    ) -> 'IndexedParquetDataset':
        """Applies a batch-level transformation to the dataset.

        Args:
            fn: A function that takes a batch (List[dict]) and returns a transformed batch (List[dict]).
            batch_size: Suggested batch size for processing.
            remove_columns: Optional list of columns to remove from the result.
            output_schema: Optional explicit list of columns for the new schema.
        """
        def _wrapper_fn(batch, _fn=fn, _cols=remove_columns):
            results = _fn(batch)
            if _cols:
                for row in results:
                    for c in _cols:
                        row.pop(c, None)
            return results

        # This logic is tricky because mapper.row_transforms expects a single row.
        # We need to add a special kind of transform or a wrapper.
        # For now, let's implement this by wrapping the batch function into a row-level 
        # stateful transform or just adding a new list for batch transforms.
        # But wait, IndexedParquetDataset is designed for random access. 
        # Batch transforms are harder to support for random access if they are stateful.
        # Assuming they are stateless per batch.

        # Implementation: Add it to a new 'batch_transforms' list in SchemaMapper.
        new_mapper = SchemaMapper(
            mapping=self.mapper.mapping.copy(),
            file_mappings=self.mapper.file_mappings.copy(),
            transforms=self.mapper.transforms.copy(),
            row_transforms=self.mapper.row_transforms.copy(),
            batch_transforms=getattr(self.mapper, 'batch_transforms', []).copy()
        )
        new_mapper.batch_transforms.append((effective_fn := _wrapper_fn, batch_size))

        ds = self._clone_with_mapper(new_mapper)
        if output_schema:
            ds.selected_columns = output_schema
        return ds

    def filter_batches(
        self, 
        fn: Optional[Callable[[List[Dict[str, Any]]], List[bool]]] = None, 
        batch_size: int = 1024,
        show_progress: bool = False,
        transform_batch: Optional[Callable[[List[Dict[str, Any]]], List[Dict[str, Any]]]] = None,
        filter_batch: Optional[Callable[[List[Dict[str, Any]]], List[bool]]] = None
    ) -> 'IndexedParquetDataset':
        """Filters the dataset using a batch-level filter function.

        Args:
            fn: Alias for filter_batch.
            batch_size: Batch size for processing.
            show_progress: If True, shows progress for each file.
            transform_batch: Optional batch-level transformation before filtering.
            filter_batch: A function that takes a batch and returns a list of booleans.
        """
        f_batch = filter_batch or fn
        return self.filter(filter_batch=f_batch, transform_batch=transform_batch, batch_size=batch_size, show_progress=show_progress)

    def train_test_split(
        self, 
        test_size: Union[float, int], 
        shuffle: bool = True, 
        seed: Optional[int] = None, 
        stratify_by: Optional[str] = None
    ) -> tuple['IndexedParquetDataset', 'IndexedParquetDataset']:
        """Splits the dataset into train and test sets."""
        n = len(self)
        if isinstance(test_size, float):
            n_test = int(n * test_size)
        else:
            n_test = test_size
        n_train = n - n_test

        if stratify_by:
            # Read labels for all indices (required for stratification)
            labels = []
            for i in range(len(self)):
                labels.append(self[i][stratify_by])
            labels = np.array(labels)

            unique_labels, inverse = np.unique(labels, return_inverse=True)
            train_indices_list = []
            test_indices_list = []

            rng = np.random.default_rng(seed)

            for i in range(len(unique_labels)):
                idx_in_group = np.where(inverse == i)[0]
                if shuffle:
                    rng.shuffle(idx_in_group)

                group_n_test = int(len(idx_in_group) * (n_test / n))
                test_indices_list.extend(idx_in_group[:group_n_test])
                train_indices_list.extend(idx_in_group[group_n_test:])

            train_indices = self.indices[np.array(train_indices_list)]
            test_indices = self.indices[np.array(test_indices_list)]
        else:
            indices = self.indices.copy()
            if shuffle:
                rng = np.random.default_rng(seed)
                rng.shuffle(indices)

            train_indices = indices[:n_train]
            test_indices = indices[n_train:]

        return self._clone_with_indices(train_indices), self._clone_with_indices(test_indices)

    def copy(self) -> 'IndexedParquetDataset':
        """Returns a copy of the dataset."""
        return self._clone_with_indices(self.indices.copy())

    def select_columns(self, columns: List[str]) -> 'IndexedParquetDataset':
        """Selects a subset of columns to be returned.

        Args:
            columns: List of column names to keep.

        Returns:
            A new IndexedParquetDataset instance with updated schema.
        """
        # Validate columns exist in current schema
        current_schema = self.schema
        for col in columns:
            if col not in current_schema:
                warnings.warn(f"Column '{col}' not found in current schema.")

        ds = self.copy()
        ds.selected_columns = columns
        return ds

    def sample(self, n: int, seed: Optional[int] = None) -> 'IndexedParquetDataset':
        """Returns a random sample of n rows from the dataset.

        Args:
            n: Number of rows to sample.
            seed: Random seed for reproducibility.

        Returns:
            A new IndexedParquetDataset instance.
        """
        if n > len(self):
            n = len(self)

        rng = np.random.default_rng(seed)
        indices = rng.choice(len(self), size=n, replace=False)
        return self.select(indices)

    def iter_batches(self, batch_size: int, shuffle: bool = False, seed: Optional[int] = None):
        """Yields batches of data from the dataset.

        Args:
            batch_size: Number of rows per batch.
            shuffle: Whether to shuffle before iterating.
            seed: Random seed for shuffling.
        """
        ds = self.shuffle(seed) if shuffle else self
        n = len(ds)
        for i in range(0, n, batch_size):
            end = min(i + batch_size, n)
            yield ds[i:end]

    def alias(self, name: str, source: Union[str, Callable], is_batch: bool = False, batch_size: int = 1024) -> 'IndexedParquetDataset':
        """Creates a new alias for a column or a new computed column.

        Args:
            name: The target name of the column.
            source: Either an original column name (string), 
                   a function function(row) -> value (if is_batch=False),
                   or a function function(batch) -> List[values] (if is_batch=True).
            is_batch: If True, source is treated as a batch transformation.
            batch_size: Suggested batch size for processing (if is_batch=True).

        Returns:
            A new IndexedParquetDataset instance.
        """
        new_mapper = SchemaMapper(
            mapping=self.mapper.mapping.copy(),
            file_mappings=self.mapper.file_mappings.copy(),
            transforms=self.mapper.transforms.copy(),
            row_transforms=self.mapper.row_transforms.copy(),
            batch_transforms=getattr(self.mapper, 'batch_transforms', []).copy(),
            batch_column_transforms=getattr(self.mapper, 'batch_column_transforms', {}).copy()
        )
        if isinstance(source, str):
            new_mapper.mapping[source] = name
            # Remove transform if we are re-aliasing to a source column
            if name in new_mapper.transforms:
                del new_mapper.transforms[name]
            if name in new_mapper.batch_column_transforms:
                del new_mapper.batch_column_transforms[name]
        elif callable(source):
            if is_batch:
                new_mapper.batch_column_transforms[name] = (source, batch_size)
                if name in new_mapper.transforms:
                    del new_mapper.transforms[name]
            else:
                new_mapper.transforms[name] = source
                if name in new_mapper.batch_column_transforms:
                    del new_mapper.batch_column_transforms[name]
        else:
            raise TypeError("Alias source must be a string or a callable.")

        return self._clone_with_mapper(new_mapper)

    def set_file_mapping(self, file_path: str, mapping: Dict[str, str]) -> 'IndexedParquetDataset':
        """Sets a file-specific column mapping.

        Args:
            file_path: Absolute path to the file.
            mapping: Dict mapping source column names to target names.
        """
        new_mapper = SchemaMapper(
            mapping=self.mapper.mapping.copy(),
            file_mappings=self.mapper.file_mappings.copy(),
            transforms=self.mapper.transforms.copy()
        )
        # Ensure absolute path
        abs_path = os.path.abspath(file_path)
        new_mapper.file_mappings[abs_path] = mapping
        return self._clone_with_mapper(new_mapper)

    def rename(self, old_name: str, new_name: str) -> 'IndexedParquetDataset':
        """Renames a column."""
        return self.alias(new_name, old_name)


    def cast(self, column: str, target_type: Union[type, str, Callable]) -> 'IndexedParquetDataset':
        """Changes the type of a column using an alias transformation.

        Args:
            column: The name of the column to cast.
            target_type: The target type (int, float, str, etc.) or a callable.
        """
        if isinstance(target_type, str):
            if target_type == 'int': cast_fn = int
            elif target_type in ('float', 'double'): cast_fn = float
            elif target_type in ('str', 'string'): cast_fn = str
            else: raise ValueError(f"Unsupported type string: {target_type}")
        elif callable(target_type):
            cast_fn = target_type # type: ignore
        else:
            raise TypeError("target_type must be a string (int, float, str) or a callable.")

        def transform(row):
            val = row.get(column)
            if val is None:
                return None
            try:
                return cast_fn(val)
            except (ValueError, TypeError):
                return val

        return self.alias(column, transform)

    def get_arrow_schema(self) -> pa.Schema:
        """Derives the PyArrow schema for the dataset from source files and metadata.

        This is used during materialize operations (to_parquet) to ensure consistent 
        types even if some batches contain only None values (avoiding null-type inference).
        """
        fields = []
        file_schemas = {}

        # Mapping from common type strings to Arrow types
        # Note: mapping 'float' and 'double' to pa.float64()
        basic_types = {
            'int64': pa.int64(),
            'int32': pa.int32(),
            'float64': pa.float64(),
            'double': pa.float64(),
            'float': pa.float64(),
            'float32': pa.float32(),
            'string': pa.string(),
            'bool': pa.bool_(),
            'binary': pa.binary(),
            'timestamp[ns]': pa.timestamp('ns'),
            'timestamp[us]': pa.timestamp('us'),
            'timestamp[ms]': pa.timestamp('ms'),
            'timestamp[s]': pa.timestamp('s'),
        }

        for col in self.schema:
            dataType = None

            # 1. Check current index column types (already accounts for merge upcasts)
            # Find the original column name if mapped
            src_col = None
            for k, v in self.mapper.mapping.items():
                if v == col:
                    src_col = k
                    break
            if src_col is None: src_col = col

            type_str = self.index.column_types.get(src_col)
            if type_str and type_str.lower() in basic_types:
                dataType = basic_types[type_str.lower()]

            # 2. If not a basic type or not found in index, look in source files
            if dataType is None:
                for f_info in self.index.files:
                    if src_col in f_info.columns:
                        if f_info.path not in file_schemas:
                            try:
                                file_schemas[f_info.path] = pq.read_schema(f_info.path)
                            except: continue
                        f_schema = file_schemas.get(f_info.path)
                        if f_schema and src_col in f_schema.names:
                            found_type = f_schema.field(src_col).type
                            # If it's a null type, keep looking for a better one
                            if not pa.types.is_null(found_type):
                                dataType = found_type
                                break
                            else:
                                dataType = found_type # Fallback if no better found

            # 3. Virtual, computed or null column?
            # If we only found a 'null' type, or found nothing, try to infer from sample
            if dataType is None or pa.types.is_null(dataType):
                # Sample rows to infer type.
                sample_size = min(100, len(self))
                if sample_size > 0:
                    try:
                        # Using list slicing to get a batch
                        sample_batch = self[:sample_size]
                        sample_vals = [row.get(col) for row in sample_batch]
                        non_nones = [v for v in sample_vals if v is not None]
                        if non_nones:
                            # Use ALL non-nones in sample to infer the BROADEST type
                            inferred_type = pa.array(non_nones).type
                            if not pa.types.is_null(inferred_type):
                                dataType = inferred_type
                    except:
                        pass

            # 4. Final safety fallbacks
            if dataType is None or pa.types.is_null(dataType):
                if col == self.source_column_name:
                    dataType = pa.string()
                else:
                    dataType = pa.string() # pa.null() is dangerous for ParquetWriter

            fields.append(pa.field(col, dataType))

        return pa.schema(fields)

    def to_parquet(
        self, 
        path: str, 
        chunk_size: int = 1024, 
        shard_size: Optional[int] = None,
        optimize_by_reorder: bool = True,
        progress: bool = True
    ):
        """Materializes the dataset to one or more Parquet files.

        Args:
            path: Output file path or directory (if sharding).
            chunk_size: Cache size for intermediate batch collection.
            shard_size: If set, splits the dataset into multiple files of this many rows.
            optimize_by_reorder: If True, reads data in source-linear order (fastest),
                but potentially changes the row order in the output file.
            progress: Whether to show a progress bar.
        """
        # Ensure .parquet extension for single file output
        effective_path = str(path)
        if not shard_size and not effective_path.lower().endswith('.parquet'):
            effective_path += '.parquet'

        if shard_size:
            os.makedirs(effective_path, exist_ok=True)

        writer = None
        rows_in_current_shard = 0
        shard_idx = 0

        # Derive target schema upfront to avoid type inference issues
        target_schema = self.get_arrow_schema()

        def get_shard_path():
            if shard_size:
                return os.path.join(effective_path, f"part_{shard_idx:04d}.parquet")
            return effective_path

        def write_table_with_shards(table):
            nonlocal writer, rows_in_current_shard, shard_idx

            if not shard_size:
                if writer is None:
                    writer = pq.ParquetWriter(get_shard_path(), target_schema)
                writer.write_table(table)
                rows_in_current_shard += len(table)
                return

            offset = 0
            while offset < len(table):
                # If current shard is full, move to next
                if rows_in_current_shard >= shard_size:
                    if writer: writer.close()
                    shard_idx += 1
                    writer = None
                    rows_in_current_shard = 0

                remaining_in_shard = shard_size - rows_in_current_shard
                to_write = min(len(table) - offset, remaining_in_shard)

                chunk = table.slice(offset, to_write)
                if writer is None:
                    writer = pq.ParquetWriter(get_shard_path(), target_schema)

                writer.write_table(chunk)
                rows_in_current_shard += to_write
                offset += to_write

        total_rows = len(self)
        pbar = tqdm(total=total_rows, desc="Writing Parquet", disable=not progress)

        try:
            if not optimize_by_reorder:
                # Original slow path (preserves order)
                effective_chunk_size = chunk_size 
                for i in range(0, total_rows, effective_chunk_size):
                    batch_indices = list(range(i, min(i + effective_chunk_size, total_rows)))
                    batch_data = self[batch_indices]
                    if not batch_data: continue

                    table = pa.Table.from_pylist(batch_data, schema=target_schema)
                    write_table_with_shards(table)
                    pbar.update(len(batch_data))
            else:
                # Optimized path: Group by (file, RG) to minimize reads
                # ...
                from collections import defaultdict
                file_to_rg_to_rows = defaultdict(lambda: defaultdict(list))

                # Pre-calculate RG boundaries for speed
                file_rg_bounds = [np.cumsum([0] + f.row_groups) for f in self.index.files]

                # Process active indices
                for idx in self.indices:
                    f_idx = np.searchsorted(self.file_offsets, idx, side='right') - 1
                    l_idx = idx - self.file_offsets[f_idx]
                    rg_idx = np.searchsorted(file_rg_bounds[f_idx], l_idx, side='right') - 1
                    l_idx_in_rg = l_idx - file_rg_bounds[f_idx][rg_idx]
                    file_to_rg_to_rows[f_idx][rg_idx].append(l_idx_in_rg)

                # 2. Iterate and write
                buffer = []

                # Sort files and RGs for linear access
                for f_idx in sorted(file_to_rg_to_rows.keys()):
                    pf = self._get_file_handle(f_idx)
                    file_info = self.index.files[f_idx]

                    # Columns optimization
                    requested_columns = None
                    if self.selected_columns is not None and not self.mapper.transforms:
                        requested_columns = []
                        for col in self.schema:
                            if col == self.source_column_name: continue
                            src_col = self.mapper.get_source_column(col, file_info.path)
                            if src_col in file_info.columns: requested_columns.append(src_col)
                        if not requested_columns and self.schema: requested_columns = None

                    rgs = file_to_rg_to_rows[f_idx]
                    for rg_idx in sorted(rgs.keys()):
                        rows_in_rg = rgs[rg_idx]
                        table = pf.read_row_group(rg_idx, columns=requested_columns)

                        # Batch conversion for speed
                        is_sequential = len(rows_in_rg) == len(table) and all(rows_in_rg[i] == i for i in range(len(rows_in_rg)))
                        if not is_sequential:
                            batch_data = table.take(pa.array(rows_in_rg)).to_pylist()
                        else:
                            batch_data = table.to_pylist()

                        processed_batch = []
                        for l_idx_in_rg, item in zip(rows_in_rg, batch_data):
                            for col in self.index.all_columns:
                                if col not in item: item[col] = self._get_fill_value(self.mapper.mapping.get(col, col))
                            if self.include_source_column: item[self.source_column_name] = file_info.path
                            item = self.mapper.map_columns(item, file_info.path)

                            mapped_item = {}
                            for col in self.schema:
                                val = item.get(col, self._get_fill_value(col))
                                if val is None: val = self._get_fill_value(col)
                                if col in self._type_casts and val is not None:
                                    try: val = self._type_casts[col](val)
                                    except: pass
                                if isinstance(val, (dict, list)):
                                    val = self._deep_fill_nones(val, self._get_fill_value(col))
                                mapped_item[col] = val
                            for row_fn in self.mapper.row_transforms:
                                mapped_item = row_fn(mapped_item)

                            processed_batch.append(mapped_item)

                        # Apply batch column transforms
                        if hasattr(self.mapper, 'batch_column_transforms') and self.mapper.batch_column_transforms:
                            for col_name, (batch_fn, _) in self.mapper.batch_column_transforms.items():
                                col_values = batch_fn(processed_batch)
                                for row, val in zip(processed_batch, col_values):
                                    row[col_name] = val

                        # Apply whole-batch transforms
                        if hasattr(self.mapper, 'batch_transforms') and self.mapper.batch_transforms:
                            for batch_fn, _ in self.mapper.batch_transforms:
                                processed_batch = batch_fn(processed_batch)

                        buffer.extend(processed_batch)

                        if len(buffer) >= chunk_size:
                            out_table = pa.Table.from_pylist(buffer, schema=target_schema)
                            write_table_with_shards(out_table)
                            pbar.update(len(buffer))
                            buffer = []

                if buffer:
                    out_table = pa.Table.from_pylist(buffer, schema=target_schema)
                    write_table_with_shards(out_table)
                    pbar.update(len(buffer))
        finally:
            if writer: writer.close()
            pbar.close()

    def clone(
        self, 
        path: str, 
        optimize_by_reorder: bool = True, 
        progress: bool = True
    ) -> 'IndexedParquetDataset':
        """Materializes all computations and returns a new dataset instance."""
        # to_parquet will append .parquet if needed, but we need to know the effective path
        effective_path = str(path)
        if not effective_path.lower().endswith('.parquet'):
            effective_path += '.parquet'

        self.to_parquet(effective_path, optimize_by_reorder=optimize_by_reorder, progress=progress)
        return IndexedParquetDataset.from_folder(effective_path)

    def filter(
        self, 
        path_pattern: Optional[Union[str, Callable]] = None,
        path_filter: Optional[Union[str, List[str]]] = None,
        column_conditions: Optional[Dict[str, Any]] = None,
        filter_row: Optional[Callable[[Dict[str, Any]], bool]] = None,
        filter_batch: Optional[Callable[[List[Dict[str, Any]]], List[bool]]] = None,
        transform_batch: Optional[Callable[[List[Dict[str, Any]]], List[Dict[str, Any]]]] = None,
        batch_size: int = 1024,
        show_progress: bool = False,
        limit: Optional[int] = None,
        # Legacy names
        predicate: Optional[Callable[[Dict[str, Any]], bool]] = None,
        predicate_batch: Optional[Callable[[List[Dict[str, Any]]], List[bool]]] = None
    ) -> 'IndexedParquetDataset':
        """Filters the dataset based on file paths, column conditions, or a custom filter.

        This method is lazy and returns a new IndexedParquetDataset instance with updated indices.
        It supports three levels of filtering:
        1. File-level (fastest): via `path_pattern` or `path_filter`.
        2. Column-level (fast, PyArrow-based): via `column_conditions`.
        3. Row-level/Batch-level (flexible, Python-based): via `filter_row` or `filter_batch`.

        Args:
            path_pattern: A string to search for as a substring in the absolute file path.
                If a callable is provided, it is automatically treated as the `filter_row` argument.
            path_filter: A glob pattern (e.g., `"**/2023/*.parquet"`) or a list of glob patterns.
                Only files matching at least one pattern will be kept.
            column_conditions: A dictionary mapping column names to filter conditions.
            filter_row: A callable that takes a row (dict) and returns a boolean.
                Example: `lambda row: len(row["text"]) > 100`
            show_progress: If True, displays progress bars (using tqdm) during file processing 
                for column conditions and predicates.

        Returns:
            A new IndexedParquetDataset instance containing only the matching rows.

        Note:
            If multiple filter types are provided, they are applied in the following order:
            File filters -> Column conditions -> Predicate.
        """
        if callable(path_pattern): filter_row = path_pattern; path_pattern = None

        # Handle legacy names
        f_row = filter_row or predicate
        f_batch = filter_batch or predicate_batch

        if predicate or predicate_batch:
            warnings.warn("Arguments 'predicate' and 'predicate_batch' are deprecated. "
                          "Use 'filter_row' and 'filter_batch' instead.", DeprecationWarning, stacklevel=2)

        # Lazy implementation: 
        # We don't filter immediately. We return a clone that will filter on first access.
        # This allows operator fusion with .limit()
        pending = {
            'filter_row': f_row,
            'filter_batch': f_batch,
            'transform_batch': transform_batch,
            'column_conditions': column_conditions,
            'path_pattern': path_pattern,
            'path_filter': path_filter,
            'limit': limit,
            'batch_size': batch_size,
            'show_progress': show_progress,
            'input_indices': self.indices.copy(), # Snapshot of current active indices
            'mapper_snapshot': self.mapper # Logic isolation
        }

        return self._clone_with_indices(None, _pending_filter=pending)

    def merge(self, other: 'IndexedParquetDataset') -> 'IndexedParquetDataset':
        """Merges this dataset with another one, deduplicating identical rows.

        A row is considered identical if it comes from the same file and has
        the same local row index.
        """
        # 1. Unified unique files
        self_paths = {f.path: i for i, f in enumerate(self.index.files)}
        other_paths = {f.path: i for i, f in enumerate(other.index.files)}

        all_unique_paths = sorted(list(set(self_paths.keys()) | set(other_paths.keys())))
        path_to_new_idx = {path: i for i, path in enumerate(all_unique_paths)}

        new_files_info = []
        for path in all_unique_paths:
            if path in self_paths:
                new_files_info.append(self.index.files[self_paths[path]])
            else:
                new_files_info.append(other.index.files[other_paths[path]])

        # 2. Map indices to row identity (new_file_idx, local_idx)
        def get_row_identities(ds, path_map):
            ids = []
            for i in range(len(ds)):
                f_idx, l_idx = ds._get_file_and_local_idx(i)
                f_path = ds.index.files[f_idx].path
                new_f_idx = path_map[f_path]
                ids.append((new_f_idx, l_idx))
            return ids

        self_ids = get_row_identities(self, path_to_new_idx)
        other_ids = get_row_identities(other, path_to_new_idx)

        # 3. Deduplicate preserving order (self first, then new from other)
        # We use a dictionary as an ordered set
        seen = {}
        unified_ids = []
        for row_id in (self_ids + other_ids):
            if row_id not in seen:
                seen[row_id] = True
                unified_ids.append(row_id)

        # 4. Create new BaseIndex metadata
        new_total_rows_meta = sum(f.num_rows for f in new_files_info)
        new_all_columns = sorted(list(set(self.index.all_columns) | set(other.index.all_columns)))

        # 5. Type merging logic (upcasting conflicts)
        new_column_types = self.index.column_types.copy()
        type_casts = self._type_casts.copy()
        for col, o_type in other.index.column_types.items():
            if col in new_column_types:
                s_type = new_column_types[col]
                if s_type != o_type:
                    common_type = 'string'
                    s_is_float = 'float' in s_type or 'double' in s_type
                    o_is_float = 'float' in o_type or 'double' in o_type
                    s_is_int = 'int' in s_type
                    o_is_int = 'int' in o_type

                    if (s_is_int and o_is_float) or (s_is_float and o_is_int) or (s_is_float and o_is_float):
                        common_type = 'double'

                    warnings.warn(f"Type mismatch for column '{col}': {s_type} vs {o_type}. Upcasting to {common_type}.")
                    new_column_types[col] = common_type
                    cast_fn = str if common_type == 'string' else (float if common_type == 'double' else None)
                    if cast_fn: type_casts[col] = cast_fn
            else:
                new_column_types[col] = o_type

        new_index = BaseIndex(new_files_info, new_total_rows_meta, new_all_columns, new_column_types)

        # 6. Re-calculate global indices based on new file sequence
        new_file_offsets = np.zeros(len(new_files_info) + 1, dtype=int)
        current_offset = 0
        for i, f in enumerate(new_files_info):
            new_file_offsets[i] = current_offset
            current_offset += f.num_rows
        new_file_offsets[-1] = current_offset

        new_indices = np.array([new_file_offsets[f_idx] + l_idx for f_idx, l_idx in unified_ids], dtype=int)

        # 7. Merge mappers
        new_mapper = self.mapper.merge(
            other.mapper,
            list(self_paths.keys()),
            list(other_paths.keys())
        )

        return IndexedParquetDataset(
            new_index, new_indices, new_mapper,
            self.include_source_column, self.source_column_name,
            self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column,
            max_open_files=self.max_open_files,
            _type_casts=type_casts,
            selected_columns=self.selected_columns
        )

    def get_supported_types(self) -> Dict[str, Any]:
        """Returns types and their current defaults."""
        res = {}
        for col, t in self.index.column_types.items():
            default = self.fill_values_by_column.get(col) or self.fill_values_by_type.get(t) or self.default_fill_value
            res[t] = {"example_column": col, "default": default}
        return res

    def generate_collate_fn(self, on_none: str = 'raise') -> Callable:
        """Returns a DataLoader-compatible collate function with robust None handling.

        Args:
            on_none: Strategy for handling None values.
                'raise' (default): Raises a descriptive TypeError.
                'drop': Drops samples containing None from the batch.
                'fill': Replaces None with 0/"" based on fill_values configuration.

        Raises:
            ImportError: If PyTorch is not installed.
        """
        if not _TORCH_AVAILABLE:
            raise ImportError(
                "PyTorch is required for generate_collate_fn. "
                "Install it with: pip install torch"
            )
        # Pre-compute fill map: {column_name -> fill_value} for all known columns.
        # This avoids storing a reference to the full dataset inside CollateHandler.
        fill_map = {col: self._get_fill_value(col) for col in self.schema}
        return CollateHandler(fill_map, on_none)

    def info(self) -> None:
        """Prints summary statistics and metadata for the dataset."""
        total_indexed_rows = self.index.total_rows
        visible_rows = len(self)
        num_indexed_files = len(self.index.files)

        # Calculate visible count per file
        if visible_rows > 0:
            file_indices = np.searchsorted(self.file_offsets, self.indices, side='right') - 1
            unique_f_idx, counts = np.unique(file_indices, return_counts=True)
            f_idx_to_visible_count = dict(zip(unique_f_idx, counts))
        else:
            f_idx_to_visible_count = {}

        active_files = len(f_idx_to_visible_count)

        # Calculate storage size
        total_bytes = 0
        for f in self.index.files:
            if os.path.exists(f.path):
                total_bytes += os.path.getsize(f.path)

        if total_bytes < 1024**2:
            storage_str = f"{total_bytes / 1024:.2f} KB"
        elif total_bytes < 1024**3:
            storage_str = f"{total_bytes / (1024**2):.2f} MB"
        else:
            storage_str = f"{total_bytes / (1024**3):.2f} GB"

        print(f"\n{'='*95}")
        print(f" IndexedParquetDataset Summary")
        print(f"{'='*95}")
        print(f" Files (Active/Indexed):  {active_files}/{num_indexed_files:<6}  |  Storage Size:  {storage_str}")
        print(f" Rows (Visible/Total):    {visible_rows:,}/{total_indexed_rows:,} ({visible_rows/total_indexed_rows:.1%})")
        print(f"{'-'*95}")

        # Files Table
        print("\nFiles in Index:")
        file_header = f"{'#':<3} | {'Visible':>12} | {'Total':>12} | {'%':>6} | {'Groups':>6} | {'Path':<}"
        print(file_header)
        print("-" * 95)
        for i, f in enumerate(self.index.files):
            vis_count = f_idx_to_visible_count.get(i, 0)
            vis_ratio = (vis_count / f.num_rows) if f.num_rows > 0 else 0
            display_path = (f"...{f.path[-45:]}" if len(f.path) > 45 else f.path)
            print(f"{i:<3} | {vis_count:>12,} | {f.num_rows:>12,} | {vis_ratio:>6.1%} | {len(f.row_groups):>6} | {display_path}")

        # Columns Table
        print("\nColumn Statistics (Active State):")
        col_header = f"{'Column':<25} | {'Type':<12} | {'Files':>6} | {'Presence':>8} | {'Visible Rows':>12} | {'Coverage'}"
        print(col_header)
        print("-" * 95)

        visible_schema = self.schema
        for col in visible_schema:
            files_present = 0
            visible_rows_with_col = 0
            orig_name = next((k for k, v in self.mapper.mapping.items() if v == col), col)
            ctype = self.index.column_types.get(orig_name, "n/a")

            for f_idx, f in enumerate(self.index.files):
                src_col = self.mapper.get_source_column(col, f.path)
                if src_col in f.columns:
                    files_present += 1
                    visible_rows_with_col += f_idx_to_visible_count.get(f_idx, 0)

            presence_pct = files_present / num_indexed_files
            coverage_pct = visible_rows_with_col / visible_rows if visible_rows > 0 else 0

            # Truncate column name if too long
            col_display = (col[:22] + "...") if len(col) > 25 else col
            print(f"{col_display:<25} | {ctype:<12} | {files_present:>6} | {presence_pct:>8.1%} | {visible_rows_with_col:>12,} | {coverage_pct:>8.1%}")

        # Mappings
        if self.mapper.mapping or self.mapper.file_mappings or self.mapper.transforms:
            print("\nActive Mappings & Transforms:")
            if self.mapper.mapping:
                print(f"  Aliases: {self.mapper.mapping}")
            if self.mapper.transforms:
                print(f"  Computed Columns: {list(self.mapper.transforms.keys())}")
            if self.mapper.file_mappings:
                print(f"  File-specific overrides active for {len(self.mapper.file_mappings)} files")

        print(f"{'='*95}\n")

    def save_index(self, path: str):
        import pickle
        with open(path, "wb") as f:
            pickle.dump({
                "index": self.index, 
                "indices": self.indices, 
                "mapper": self.mapper.to_dict(), 
                "source": (self.include_source_column, self.source_column_name),
                "fill": (self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column)
            }, f)

    @classmethod
    def load_index(cls, path: str):
        import pickle
        with open(path, "rb") as f:
            d = pickle.load(f)

        inc_source, source_name = d.get("source", (False, "__source_file__"))

        return cls(
            d['index'], 
            d['indices'], 
            SchemaMapper.from_dict(d['mapper']), 
            include_source_column=inc_source,
            source_column_name=source_name,
            default_fill_value=d['fill'][0], 
            fill_values_by_type=d['fill'][1], 
            fill_values_by_column=d['fill'][2]
        )

Attributes

schema property

Returns the list of column names available in the dataset (after mapping).

Functions

from_folder(directory, pattern='*.parquet', recursive=True, strict_schema=False, auto_fill=False, **kwargs) classmethod

Creates an IndexedParquetDataset by scanning a directory.

Source code in src/indexed_parquet_dataset/dataset.py
@classmethod
def from_folder(
    cls, 
    directory: str, 
    pattern: str = "*.parquet", 
    recursive: bool = True, 
    strict_schema: bool = False,
    auto_fill: bool = False,
    **kwargs
) -> 'IndexedParquetDataset':
    """Creates an IndexedParquetDataset by scanning a directory."""
    index = scan_directory(directory, pattern, recursive, strict_schema)
    return cls(index, auto_fill=auto_fill, **kwargs)

load_index(path) classmethod

Source code in src/indexed_parquet_dataset/dataset.py
@classmethod
def load_index(cls, path: str):
    import pickle
    with open(path, "rb") as f:
        d = pickle.load(f)

    inc_source, source_name = d.get("source", (False, "__source_file__"))

    return cls(
        d['index'], 
        d['indices'], 
        SchemaMapper.from_dict(d['mapper']), 
        include_source_column=inc_source,
        source_column_name=source_name,
        default_fill_value=d['fill'][0], 
        fill_values_by_type=d['fill'][1], 
        fill_values_by_column=d['fill'][2]
    )

save_index(path)

Source code in src/indexed_parquet_dataset/dataset.py
def save_index(self, path: str):
    import pickle
    with open(path, "wb") as f:
        pickle.dump({
            "index": self.index, 
            "indices": self.indices, 
            "mapper": self.mapper.to_dict(), 
            "source": (self.include_source_column, self.source_column_name),
            "fill": (self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column)
        }, f)

info()

Prints summary statistics and metadata for the dataset.

Source code in src/indexed_parquet_dataset/dataset.py
def info(self) -> None:
    """Prints summary statistics and metadata for the dataset."""
    total_indexed_rows = self.index.total_rows
    visible_rows = len(self)
    num_indexed_files = len(self.index.files)

    # Calculate visible count per file
    if visible_rows > 0:
        file_indices = np.searchsorted(self.file_offsets, self.indices, side='right') - 1
        unique_f_idx, counts = np.unique(file_indices, return_counts=True)
        f_idx_to_visible_count = dict(zip(unique_f_idx, counts))
    else:
        f_idx_to_visible_count = {}

    active_files = len(f_idx_to_visible_count)

    # Calculate storage size
    total_bytes = 0
    for f in self.index.files:
        if os.path.exists(f.path):
            total_bytes += os.path.getsize(f.path)

    if total_bytes < 1024**2:
        storage_str = f"{total_bytes / 1024:.2f} KB"
    elif total_bytes < 1024**3:
        storage_str = f"{total_bytes / (1024**2):.2f} MB"
    else:
        storage_str = f"{total_bytes / (1024**3):.2f} GB"

    print(f"\n{'='*95}")
    print(f" IndexedParquetDataset Summary")
    print(f"{'='*95}")
    print(f" Files (Active/Indexed):  {active_files}/{num_indexed_files:<6}  |  Storage Size:  {storage_str}")
    print(f" Rows (Visible/Total):    {visible_rows:,}/{total_indexed_rows:,} ({visible_rows/total_indexed_rows:.1%})")
    print(f"{'-'*95}")

    # Files Table
    print("\nFiles in Index:")
    file_header = f"{'#':<3} | {'Visible':>12} | {'Total':>12} | {'%':>6} | {'Groups':>6} | {'Path':<}"
    print(file_header)
    print("-" * 95)
    for i, f in enumerate(self.index.files):
        vis_count = f_idx_to_visible_count.get(i, 0)
        vis_ratio = (vis_count / f.num_rows) if f.num_rows > 0 else 0
        display_path = (f"...{f.path[-45:]}" if len(f.path) > 45 else f.path)
        print(f"{i:<3} | {vis_count:>12,} | {f.num_rows:>12,} | {vis_ratio:>6.1%} | {len(f.row_groups):>6} | {display_path}")

    # Columns Table
    print("\nColumn Statistics (Active State):")
    col_header = f"{'Column':<25} | {'Type':<12} | {'Files':>6} | {'Presence':>8} | {'Visible Rows':>12} | {'Coverage'}"
    print(col_header)
    print("-" * 95)

    visible_schema = self.schema
    for col in visible_schema:
        files_present = 0
        visible_rows_with_col = 0
        orig_name = next((k for k, v in self.mapper.mapping.items() if v == col), col)
        ctype = self.index.column_types.get(orig_name, "n/a")

        for f_idx, f in enumerate(self.index.files):
            src_col = self.mapper.get_source_column(col, f.path)
            if src_col in f.columns:
                files_present += 1
                visible_rows_with_col += f_idx_to_visible_count.get(f_idx, 0)

        presence_pct = files_present / num_indexed_files
        coverage_pct = visible_rows_with_col / visible_rows if visible_rows > 0 else 0

        # Truncate column name if too long
        col_display = (col[:22] + "...") if len(col) > 25 else col
        print(f"{col_display:<25} | {ctype:<12} | {files_present:>6} | {presence_pct:>8.1%} | {visible_rows_with_col:>12,} | {coverage_pct:>8.1%}")

    # Mappings
    if self.mapper.mapping or self.mapper.file_mappings or self.mapper.transforms:
        print("\nActive Mappings & Transforms:")
        if self.mapper.mapping:
            print(f"  Aliases: {self.mapper.mapping}")
        if self.mapper.transforms:
            print(f"  Computed Columns: {list(self.mapper.transforms.keys())}")
        if self.mapper.file_mappings:
            print(f"  File-specific overrides active for {len(self.mapper.file_mappings)} files")

    print(f"{'='*95}\n")

shuffle(seed=None, rg_buffer=None)

Shuffles the dataset indices.

Parameters:

Name Type Description Default
seed Optional[int]

Random seed for reproducibility.

None
rg_buffer Optional[int]

If provided, enables locality-aware shuffling. Instead of a global shuffle, it shuffles row groups and then shuffles rows within a window of rg_buffer row groups. This significantly Improves I/O performance by reducing the number of row groups that need to be read/cached simultaneously.

None

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance with shuffled indices.

Source code in src/indexed_parquet_dataset/dataset.py
def shuffle(self, seed: Optional[int] = None, rg_buffer: Optional[int] = None) -> 'IndexedParquetDataset':
    """Shuffles the dataset indices.

    Args:
        seed: Random seed for reproducibility.
        rg_buffer: If provided, enables locality-aware shuffling. Instead of a global 
            shuffle, it shuffles row groups and then shuffles rows within a window 
            of `rg_buffer` row groups. This significantly Improves I/O performance 
            by reducing the number of row groups that need to be read/cached 
            simultaneously.

    Returns:
        A new IndexedParquetDataset instance with shuffled indices.
    """
    rng = np.random.default_rng(seed)

    if rg_buffer is None:
        new_indices = self.indices.copy()
        rng.shuffle(new_indices)
        return self._clone_with_indices(new_indices)

    if rg_buffer < 1:
        raise ValueError("rg_buffer must be at least 1")

    # 1. Map current active indices to their respective row groups
    actual_indices = self.indices

    # Find which file each index belongs to
    file_indices = np.searchsorted(self.file_offsets, actual_indices, side='right') - 1

    # Grouping by (file_idx, rg_idx)
    # Note: dict preserves insertion order of first appearance in Python 3.7+
    from collections import defaultdict
    rg_to_indices = defaultdict(list)

    # Pre-calculate RG cumulative boundaries for each file for speed
    file_rg_bounds = [np.cumsum([0] + f.row_groups) for f in self.index.files]

    # Process indices by file to use vector searchsorted logic
    unique_f_indices = np.unique(file_indices)
    for f_idx in unique_f_indices:
        f_mask = (file_indices == f_idx)
        f_global_indices = actual_indices[f_mask]
        f_local_indices = f_global_indices - self.file_offsets[f_idx]

        # Map local row index to row group index within this file
        f_rg_indices = np.searchsorted(file_rg_bounds[f_idx], f_local_indices, side='right') - 1

        # Group global indices by RG
        for rg_idx, global_idx in zip(f_rg_indices, f_global_indices):
            rg_to_indices[(f_idx, rg_idx)].append(global_idx)

    # 2. Get the list of all unique row group IDs present in current indices
    rg_keys = list(rg_to_indices.keys())
    # Shuffle the sequence of row groups themselves
    rng.shuffle(rg_keys)

    # 3. Flatten and shuffle within windows of rg_buffer
    final_indices = []
    for i in range(0, len(rg_keys), rg_buffer):
        window_slice = rg_keys[i : i + rg_buffer]
        window_pool = []
        for key in window_slice:
            window_pool.extend(rg_to_indices[key])

        # Shuffle rows within the current window of row groups
        window_pool_arr = np.array(window_pool)
        rng.shuffle(window_pool_arr)
        final_indices.append(window_pool_arr)

    if not final_indices:
        return self._clone_with_indices(np.array([], dtype=int))

    return self._clone_with_indices(np.concatenate(final_indices))

train_test_split(test_size, shuffle=True, seed=None, stratify_by=None)

Splits the dataset into train and test sets.

Source code in src/indexed_parquet_dataset/dataset.py
def train_test_split(
    self, 
    test_size: Union[float, int], 
    shuffle: bool = True, 
    seed: Optional[int] = None, 
    stratify_by: Optional[str] = None
) -> tuple['IndexedParquetDataset', 'IndexedParquetDataset']:
    """Splits the dataset into train and test sets."""
    n = len(self)
    if isinstance(test_size, float):
        n_test = int(n * test_size)
    else:
        n_test = test_size
    n_train = n - n_test

    if stratify_by:
        # Read labels for all indices (required for stratification)
        labels = []
        for i in range(len(self)):
            labels.append(self[i][stratify_by])
        labels = np.array(labels)

        unique_labels, inverse = np.unique(labels, return_inverse=True)
        train_indices_list = []
        test_indices_list = []

        rng = np.random.default_rng(seed)

        for i in range(len(unique_labels)):
            idx_in_group = np.where(inverse == i)[0]
            if shuffle:
                rng.shuffle(idx_in_group)

            group_n_test = int(len(idx_in_group) * (n_test / n))
            test_indices_list.extend(idx_in_group[:group_n_test])
            train_indices_list.extend(idx_in_group[group_n_test:])

        train_indices = self.indices[np.array(train_indices_list)]
        test_indices = self.indices[np.array(test_indices_list)]
    else:
        indices = self.indices.copy()
        if shuffle:
            rng = np.random.default_rng(seed)
            rng.shuffle(indices)

        train_indices = indices[:n_train]
        test_indices = indices[n_train:]

    return self._clone_with_indices(train_indices), self._clone_with_indices(test_indices)

select(range_or_indices)

Source code in src/indexed_parquet_dataset/dataset.py
def select(self, range_or_indices: Union[slice, List[int], np.ndarray]) -> 'IndexedParquetDataset':
    new_indices = self.indices[range_or_indices]
    return self._clone_with_indices(new_indices)

limit(n)

Limits the dataset to the first n rows.

Parameters:

Name Type Description Default
n int

Maximum number of rows to keep.

required

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance.

Source code in src/indexed_parquet_dataset/dataset.py
def limit(self, n: int) -> 'IndexedParquetDataset':
    """Limits the dataset to the first n rows.

    Args:
        n: Maximum number of rows to keep.

    Returns:
        A new IndexedParquetDataset instance.
    """
    if self._indices is None and self._pending_filter:
        # Operator Fusion: Push the limit into the pending filter
        # Create a copy of the pending filter to avoid side effects on the original
        new_pending = self._pending_filter.copy()
        new_pending['limit'] = n
        return self._clone_with_indices(None, _pending_filter=new_pending)

    return self.select(slice(0, n))

sample(n, seed=None)

Returns a random sample of n rows from the dataset.

Parameters:

Name Type Description Default
n int

Number of rows to sample.

required
seed Optional[int]

Random seed for reproducibility.

None

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance.

Source code in src/indexed_parquet_dataset/dataset.py
def sample(self, n: int, seed: Optional[int] = None) -> 'IndexedParquetDataset':
    """Returns a random sample of n rows from the dataset.

    Args:
        n: Number of rows to sample.
        seed: Random seed for reproducibility.

    Returns:
        A new IndexedParquetDataset instance.
    """
    if n > len(self):
        n = len(self)

    rng = np.random.default_rng(seed)
    indices = rng.choice(len(self), size=n, replace=False)
    return self.select(indices)

select_columns(columns)

Selects a subset of columns to be returned.

Parameters:

Name Type Description Default
columns List[str]

List of column names to keep.

required

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance with updated schema.

Source code in src/indexed_parquet_dataset/dataset.py
def select_columns(self, columns: List[str]) -> 'IndexedParquetDataset':
    """Selects a subset of columns to be returned.

    Args:
        columns: List of column names to keep.

    Returns:
        A new IndexedParquetDataset instance with updated schema.
    """
    # Validate columns exist in current schema
    current_schema = self.schema
    for col in columns:
        if col not in current_schema:
            warnings.warn(f"Column '{col}' not found in current schema.")

    ds = self.copy()
    ds.selected_columns = columns
    return ds

map(fn, *, remove_columns=None, output_schema=None, is_batch=False, batch_size=1024)

Applies a transformation to the dataset.

Parameters:

Name Type Description Default
fn Callable

A function that takes a row (dict) and returns a transformed row (dict), or if is_batch=True, takes a List[dict] and returns a List[dict].

required
remove_columns Optional[List[str]]

Optional list of columns to remove from the result.

None
output_schema Optional[List[str]]

Optional explicit list of columns for the new schema.

None
is_batch bool

If True, fn is treated as a batch transformation.

False
batch_size int

Suggested batch size for processing (if is_batch=True).

1024
Source code in src/indexed_parquet_dataset/dataset.py
def map(
    self, 
    fn: Callable, 
    *, 
    remove_columns: Optional[List[str]] = None,
    output_schema: Optional[List[str]] = None,
    is_batch: bool = False,
    batch_size: int = 1024
) -> 'IndexedParquetDataset':
    """Applies a transformation to the dataset.

    Args:
        fn: A function that takes a row (dict) and returns a transformed row (dict),
            or if is_batch=True, takes a List[dict] and returns a List[dict].
        remove_columns: Optional list of columns to remove from the result.
        output_schema: Optional explicit list of columns for the new schema.
        is_batch: If True, fn is treated as a batch transformation.
        batch_size: Suggested batch size for processing (if is_batch=True).
    """
    new_mapper = SchemaMapper(
        mapping=self.mapper.mapping.copy(),
        file_mappings=self.mapper.file_mappings.copy(),
        transforms=self.mapper.transforms.copy(),
        row_transforms=self.mapper.row_transforms.copy(),
        batch_transforms=getattr(self.mapper, 'batch_transforms', []).copy(),
        batch_column_transforms=getattr(self.mapper, 'batch_column_transforms', {}).copy()
    )
    effective_fn = fn
    if remove_columns:
        if is_batch:
            def _wrapped_batch_fn(batch, _fn=fn, _cols=remove_columns):
                results = _fn(batch)
                for row in results:
                    for c in _cols:
                        row.pop(c, None)
                return results
            effective_fn = _wrapped_batch_fn
        else:
            def _wrapped_row_fn(row, _fn=fn, _cols=remove_columns):
                result = _fn(row)
                for c in _cols:
                    result.pop(c, None)
                return result
            effective_fn = _wrapped_row_fn

    if is_batch:
        new_mapper.batch_transforms.append((effective_fn, batch_size))
    else:
        new_mapper.row_transforms.append(effective_fn)

    ds = self._clone_with_mapper(new_mapper)
    if output_schema:
        ds.selected_columns = output_schema
    return ds

alias(name, source, is_batch=False, batch_size=1024)

Creates a new alias for a column or a new computed column.

Parameters:

Name Type Description Default
name str

The target name of the column.

required
source Union[str, Callable]

Either an original column name (string), a function function(row) -> value (if is_batch=False), or a function function(batch) -> List[values] (if is_batch=True).

required
is_batch bool

If True, source is treated as a batch transformation.

False
batch_size int

Suggested batch size for processing (if is_batch=True).

1024

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance.

Source code in src/indexed_parquet_dataset/dataset.py
def alias(self, name: str, source: Union[str, Callable], is_batch: bool = False, batch_size: int = 1024) -> 'IndexedParquetDataset':
    """Creates a new alias for a column or a new computed column.

    Args:
        name: The target name of the column.
        source: Either an original column name (string), 
               a function function(row) -> value (if is_batch=False),
               or a function function(batch) -> List[values] (if is_batch=True).
        is_batch: If True, source is treated as a batch transformation.
        batch_size: Suggested batch size for processing (if is_batch=True).

    Returns:
        A new IndexedParquetDataset instance.
    """
    new_mapper = SchemaMapper(
        mapping=self.mapper.mapping.copy(),
        file_mappings=self.mapper.file_mappings.copy(),
        transforms=self.mapper.transforms.copy(),
        row_transforms=self.mapper.row_transforms.copy(),
        batch_transforms=getattr(self.mapper, 'batch_transforms', []).copy(),
        batch_column_transforms=getattr(self.mapper, 'batch_column_transforms', {}).copy()
    )
    if isinstance(source, str):
        new_mapper.mapping[source] = name
        # Remove transform if we are re-aliasing to a source column
        if name in new_mapper.transforms:
            del new_mapper.transforms[name]
        if name in new_mapper.batch_column_transforms:
            del new_mapper.batch_column_transforms[name]
    elif callable(source):
        if is_batch:
            new_mapper.batch_column_transforms[name] = (source, batch_size)
            if name in new_mapper.transforms:
                del new_mapper.transforms[name]
        else:
            new_mapper.transforms[name] = source
            if name in new_mapper.batch_column_transforms:
                del new_mapper.batch_column_transforms[name]
    else:
        raise TypeError("Alias source must be a string or a callable.")

    return self._clone_with_mapper(new_mapper)

rename(old_name, new_name)

Renames a column.

Source code in src/indexed_parquet_dataset/dataset.py
def rename(self, old_name: str, new_name: str) -> 'IndexedParquetDataset':
    """Renames a column."""
    return self.alias(new_name, old_name)

cast(column, target_type)

Changes the type of a column using an alias transformation.

Parameters:

Name Type Description Default
column str

The name of the column to cast.

required
target_type Union[type, str, Callable]

The target type (int, float, str, etc.) or a callable.

required
Source code in src/indexed_parquet_dataset/dataset.py
def cast(self, column: str, target_type: Union[type, str, Callable]) -> 'IndexedParquetDataset':
    """Changes the type of a column using an alias transformation.

    Args:
        column: The name of the column to cast.
        target_type: The target type (int, float, str, etc.) or a callable.
    """
    if isinstance(target_type, str):
        if target_type == 'int': cast_fn = int
        elif target_type in ('float', 'double'): cast_fn = float
        elif target_type in ('str', 'string'): cast_fn = str
        else: raise ValueError(f"Unsupported type string: {target_type}")
    elif callable(target_type):
        cast_fn = target_type # type: ignore
    else:
        raise TypeError("target_type must be a string (int, float, str) or a callable.")

    def transform(row):
        val = row.get(column)
        if val is None:
            return None
        try:
            return cast_fn(val)
        except (ValueError, TypeError):
            return val

    return self.alias(column, transform)

filter(path_pattern=None, path_filter=None, column_conditions=None, filter_row=None, filter_batch=None, transform_batch=None, batch_size=1024, show_progress=False, limit=None, predicate=None, predicate_batch=None)

Filters the dataset based on file paths, column conditions, or a custom filter.

This method is lazy and returns a new IndexedParquetDataset instance with updated indices. It supports three levels of filtering: 1. File-level (fastest): via path_pattern or path_filter. 2. Column-level (fast, PyArrow-based): via column_conditions. 3. Row-level/Batch-level (flexible, Python-based): via filter_row or filter_batch.

Parameters:

Name Type Description Default
path_pattern Optional[Union[str, Callable]]

A string to search for as a substring in the absolute file path. If a callable is provided, it is automatically treated as the filter_row argument.

None
path_filter Optional[Union[str, List[str]]]

A glob pattern (e.g., "**/2023/*.parquet") or a list of glob patterns. Only files matching at least one pattern will be kept.

None
column_conditions Optional[Dict[str, Any]]

A dictionary mapping column names to filter conditions.

None
filter_row Optional[Callable[[Dict[str, Any]], bool]]

A callable that takes a row (dict) and returns a boolean. Example: lambda row: len(row["text"]) > 100

None
show_progress bool

If True, displays progress bars (using tqdm) during file processing for column conditions and predicates.

False

Returns:

Type Description
'IndexedParquetDataset'

A new IndexedParquetDataset instance containing only the matching rows.

Note

If multiple filter types are provided, they are applied in the following order: File filters -> Column conditions -> Predicate.

Source code in src/indexed_parquet_dataset/dataset.py
def filter(
    self, 
    path_pattern: Optional[Union[str, Callable]] = None,
    path_filter: Optional[Union[str, List[str]]] = None,
    column_conditions: Optional[Dict[str, Any]] = None,
    filter_row: Optional[Callable[[Dict[str, Any]], bool]] = None,
    filter_batch: Optional[Callable[[List[Dict[str, Any]]], List[bool]]] = None,
    transform_batch: Optional[Callable[[List[Dict[str, Any]]], List[Dict[str, Any]]]] = None,
    batch_size: int = 1024,
    show_progress: bool = False,
    limit: Optional[int] = None,
    # Legacy names
    predicate: Optional[Callable[[Dict[str, Any]], bool]] = None,
    predicate_batch: Optional[Callable[[List[Dict[str, Any]]], List[bool]]] = None
) -> 'IndexedParquetDataset':
    """Filters the dataset based on file paths, column conditions, or a custom filter.

    This method is lazy and returns a new IndexedParquetDataset instance with updated indices.
    It supports three levels of filtering:
    1. File-level (fastest): via `path_pattern` or `path_filter`.
    2. Column-level (fast, PyArrow-based): via `column_conditions`.
    3. Row-level/Batch-level (flexible, Python-based): via `filter_row` or `filter_batch`.

    Args:
        path_pattern: A string to search for as a substring in the absolute file path.
            If a callable is provided, it is automatically treated as the `filter_row` argument.
        path_filter: A glob pattern (e.g., `"**/2023/*.parquet"`) or a list of glob patterns.
            Only files matching at least one pattern will be kept.
        column_conditions: A dictionary mapping column names to filter conditions.
        filter_row: A callable that takes a row (dict) and returns a boolean.
            Example: `lambda row: len(row["text"]) > 100`
        show_progress: If True, displays progress bars (using tqdm) during file processing 
            for column conditions and predicates.

    Returns:
        A new IndexedParquetDataset instance containing only the matching rows.

    Note:
        If multiple filter types are provided, they are applied in the following order:
        File filters -> Column conditions -> Predicate.
    """
    if callable(path_pattern): filter_row = path_pattern; path_pattern = None

    # Handle legacy names
    f_row = filter_row or predicate
    f_batch = filter_batch or predicate_batch

    if predicate or predicate_batch:
        warnings.warn("Arguments 'predicate' and 'predicate_batch' are deprecated. "
                      "Use 'filter_row' and 'filter_batch' instead.", DeprecationWarning, stacklevel=2)

    # Lazy implementation: 
    # We don't filter immediately. We return a clone that will filter on first access.
    # This allows operator fusion with .limit()
    pending = {
        'filter_row': f_row,
        'filter_batch': f_batch,
        'transform_batch': transform_batch,
        'column_conditions': column_conditions,
        'path_pattern': path_pattern,
        'path_filter': path_filter,
        'limit': limit,
        'batch_size': batch_size,
        'show_progress': show_progress,
        'input_indices': self.indices.copy(), # Snapshot of current active indices
        'mapper_snapshot': self.mapper # Logic isolation
    }

    return self._clone_with_indices(None, _pending_filter=pending)

merge(other)

Merges this dataset with another one, deduplicating identical rows.

A row is considered identical if it comes from the same file and has the same local row index.

Source code in src/indexed_parquet_dataset/dataset.py
def merge(self, other: 'IndexedParquetDataset') -> 'IndexedParquetDataset':
    """Merges this dataset with another one, deduplicating identical rows.

    A row is considered identical if it comes from the same file and has
    the same local row index.
    """
    # 1. Unified unique files
    self_paths = {f.path: i for i, f in enumerate(self.index.files)}
    other_paths = {f.path: i for i, f in enumerate(other.index.files)}

    all_unique_paths = sorted(list(set(self_paths.keys()) | set(other_paths.keys())))
    path_to_new_idx = {path: i for i, path in enumerate(all_unique_paths)}

    new_files_info = []
    for path in all_unique_paths:
        if path in self_paths:
            new_files_info.append(self.index.files[self_paths[path]])
        else:
            new_files_info.append(other.index.files[other_paths[path]])

    # 2. Map indices to row identity (new_file_idx, local_idx)
    def get_row_identities(ds, path_map):
        ids = []
        for i in range(len(ds)):
            f_idx, l_idx = ds._get_file_and_local_idx(i)
            f_path = ds.index.files[f_idx].path
            new_f_idx = path_map[f_path]
            ids.append((new_f_idx, l_idx))
        return ids

    self_ids = get_row_identities(self, path_to_new_idx)
    other_ids = get_row_identities(other, path_to_new_idx)

    # 3. Deduplicate preserving order (self first, then new from other)
    # We use a dictionary as an ordered set
    seen = {}
    unified_ids = []
    for row_id in (self_ids + other_ids):
        if row_id not in seen:
            seen[row_id] = True
            unified_ids.append(row_id)

    # 4. Create new BaseIndex metadata
    new_total_rows_meta = sum(f.num_rows for f in new_files_info)
    new_all_columns = sorted(list(set(self.index.all_columns) | set(other.index.all_columns)))

    # 5. Type merging logic (upcasting conflicts)
    new_column_types = self.index.column_types.copy()
    type_casts = self._type_casts.copy()
    for col, o_type in other.index.column_types.items():
        if col in new_column_types:
            s_type = new_column_types[col]
            if s_type != o_type:
                common_type = 'string'
                s_is_float = 'float' in s_type or 'double' in s_type
                o_is_float = 'float' in o_type or 'double' in o_type
                s_is_int = 'int' in s_type
                o_is_int = 'int' in o_type

                if (s_is_int and o_is_float) or (s_is_float and o_is_int) or (s_is_float and o_is_float):
                    common_type = 'double'

                warnings.warn(f"Type mismatch for column '{col}': {s_type} vs {o_type}. Upcasting to {common_type}.")
                new_column_types[col] = common_type
                cast_fn = str if common_type == 'string' else (float if common_type == 'double' else None)
                if cast_fn: type_casts[col] = cast_fn
        else:
            new_column_types[col] = o_type

    new_index = BaseIndex(new_files_info, new_total_rows_meta, new_all_columns, new_column_types)

    # 6. Re-calculate global indices based on new file sequence
    new_file_offsets = np.zeros(len(new_files_info) + 1, dtype=int)
    current_offset = 0
    for i, f in enumerate(new_files_info):
        new_file_offsets[i] = current_offset
        current_offset += f.num_rows
    new_file_offsets[-1] = current_offset

    new_indices = np.array([new_file_offsets[f_idx] + l_idx for f_idx, l_idx in unified_ids], dtype=int)

    # 7. Merge mappers
    new_mapper = self.mapper.merge(
        other.mapper,
        list(self_paths.keys()),
        list(other_paths.keys())
    )

    return IndexedParquetDataset(
        new_index, new_indices, new_mapper,
        self.include_source_column, self.source_column_name,
        self.default_fill_value, self.fill_values_by_type, self.fill_values_by_column,
        max_open_files=self.max_open_files,
        _type_casts=type_casts,
        selected_columns=self.selected_columns
    )

copy()

Returns a copy of the dataset.

Source code in src/indexed_parquet_dataset/dataset.py
def copy(self) -> 'IndexedParquetDataset':
    """Returns a copy of the dataset."""
    return self._clone_with_indices(self.indices.copy())

clone(path, optimize_by_reorder=True, progress=True)

Materializes all computations and returns a new dataset instance.

Source code in src/indexed_parquet_dataset/dataset.py
def clone(
    self, 
    path: str, 
    optimize_by_reorder: bool = True, 
    progress: bool = True
) -> 'IndexedParquetDataset':
    """Materializes all computations and returns a new dataset instance."""
    # to_parquet will append .parquet if needed, but we need to know the effective path
    effective_path = str(path)
    if not effective_path.lower().endswith('.parquet'):
        effective_path += '.parquet'

    self.to_parquet(effective_path, optimize_by_reorder=optimize_by_reorder, progress=progress)
    return IndexedParquetDataset.from_folder(effective_path)

to_parquet(path, chunk_size=1024, shard_size=None, optimize_by_reorder=True, progress=True)

Materializes the dataset to one or more Parquet files.

Parameters:

Name Type Description Default
path str

Output file path or directory (if sharding).

required
chunk_size int

Cache size for intermediate batch collection.

1024
shard_size Optional[int]

If set, splits the dataset into multiple files of this many rows.

None
optimize_by_reorder bool

If True, reads data in source-linear order (fastest), but potentially changes the row order in the output file.

True
progress bool

Whether to show a progress bar.

True
Source code in src/indexed_parquet_dataset/dataset.py
def to_parquet(
    self, 
    path: str, 
    chunk_size: int = 1024, 
    shard_size: Optional[int] = None,
    optimize_by_reorder: bool = True,
    progress: bool = True
):
    """Materializes the dataset to one or more Parquet files.

    Args:
        path: Output file path or directory (if sharding).
        chunk_size: Cache size for intermediate batch collection.
        shard_size: If set, splits the dataset into multiple files of this many rows.
        optimize_by_reorder: If True, reads data in source-linear order (fastest),
            but potentially changes the row order in the output file.
        progress: Whether to show a progress bar.
    """
    # Ensure .parquet extension for single file output
    effective_path = str(path)
    if not shard_size and not effective_path.lower().endswith('.parquet'):
        effective_path += '.parquet'

    if shard_size:
        os.makedirs(effective_path, exist_ok=True)

    writer = None
    rows_in_current_shard = 0
    shard_idx = 0

    # Derive target schema upfront to avoid type inference issues
    target_schema = self.get_arrow_schema()

    def get_shard_path():
        if shard_size:
            return os.path.join(effective_path, f"part_{shard_idx:04d}.parquet")
        return effective_path

    def write_table_with_shards(table):
        nonlocal writer, rows_in_current_shard, shard_idx

        if not shard_size:
            if writer is None:
                writer = pq.ParquetWriter(get_shard_path(), target_schema)
            writer.write_table(table)
            rows_in_current_shard += len(table)
            return

        offset = 0
        while offset < len(table):
            # If current shard is full, move to next
            if rows_in_current_shard >= shard_size:
                if writer: writer.close()
                shard_idx += 1
                writer = None
                rows_in_current_shard = 0

            remaining_in_shard = shard_size - rows_in_current_shard
            to_write = min(len(table) - offset, remaining_in_shard)

            chunk = table.slice(offset, to_write)
            if writer is None:
                writer = pq.ParquetWriter(get_shard_path(), target_schema)

            writer.write_table(chunk)
            rows_in_current_shard += to_write
            offset += to_write

    total_rows = len(self)
    pbar = tqdm(total=total_rows, desc="Writing Parquet", disable=not progress)

    try:
        if not optimize_by_reorder:
            # Original slow path (preserves order)
            effective_chunk_size = chunk_size 
            for i in range(0, total_rows, effective_chunk_size):
                batch_indices = list(range(i, min(i + effective_chunk_size, total_rows)))
                batch_data = self[batch_indices]
                if not batch_data: continue

                table = pa.Table.from_pylist(batch_data, schema=target_schema)
                write_table_with_shards(table)
                pbar.update(len(batch_data))
        else:
            # Optimized path: Group by (file, RG) to minimize reads
            # ...
            from collections import defaultdict
            file_to_rg_to_rows = defaultdict(lambda: defaultdict(list))

            # Pre-calculate RG boundaries for speed
            file_rg_bounds = [np.cumsum([0] + f.row_groups) for f in self.index.files]

            # Process active indices
            for idx in self.indices:
                f_idx = np.searchsorted(self.file_offsets, idx, side='right') - 1
                l_idx = idx - self.file_offsets[f_idx]
                rg_idx = np.searchsorted(file_rg_bounds[f_idx], l_idx, side='right') - 1
                l_idx_in_rg = l_idx - file_rg_bounds[f_idx][rg_idx]
                file_to_rg_to_rows[f_idx][rg_idx].append(l_idx_in_rg)

            # 2. Iterate and write
            buffer = []

            # Sort files and RGs for linear access
            for f_idx in sorted(file_to_rg_to_rows.keys()):
                pf = self._get_file_handle(f_idx)
                file_info = self.index.files[f_idx]

                # Columns optimization
                requested_columns = None
                if self.selected_columns is not None and not self.mapper.transforms:
                    requested_columns = []
                    for col in self.schema:
                        if col == self.source_column_name: continue
                        src_col = self.mapper.get_source_column(col, file_info.path)
                        if src_col in file_info.columns: requested_columns.append(src_col)
                    if not requested_columns and self.schema: requested_columns = None

                rgs = file_to_rg_to_rows[f_idx]
                for rg_idx in sorted(rgs.keys()):
                    rows_in_rg = rgs[rg_idx]
                    table = pf.read_row_group(rg_idx, columns=requested_columns)

                    # Batch conversion for speed
                    is_sequential = len(rows_in_rg) == len(table) and all(rows_in_rg[i] == i for i in range(len(rows_in_rg)))
                    if not is_sequential:
                        batch_data = table.take(pa.array(rows_in_rg)).to_pylist()
                    else:
                        batch_data = table.to_pylist()

                    processed_batch = []
                    for l_idx_in_rg, item in zip(rows_in_rg, batch_data):
                        for col in self.index.all_columns:
                            if col not in item: item[col] = self._get_fill_value(self.mapper.mapping.get(col, col))
                        if self.include_source_column: item[self.source_column_name] = file_info.path
                        item = self.mapper.map_columns(item, file_info.path)

                        mapped_item = {}
                        for col in self.schema:
                            val = item.get(col, self._get_fill_value(col))
                            if val is None: val = self._get_fill_value(col)
                            if col in self._type_casts and val is not None:
                                try: val = self._type_casts[col](val)
                                except: pass
                            if isinstance(val, (dict, list)):
                                val = self._deep_fill_nones(val, self._get_fill_value(col))
                            mapped_item[col] = val
                        for row_fn in self.mapper.row_transforms:
                            mapped_item = row_fn(mapped_item)

                        processed_batch.append(mapped_item)

                    # Apply batch column transforms
                    if hasattr(self.mapper, 'batch_column_transforms') and self.mapper.batch_column_transforms:
                        for col_name, (batch_fn, _) in self.mapper.batch_column_transforms.items():
                            col_values = batch_fn(processed_batch)
                            for row, val in zip(processed_batch, col_values):
                                row[col_name] = val

                    # Apply whole-batch transforms
                    if hasattr(self.mapper, 'batch_transforms') and self.mapper.batch_transforms:
                        for batch_fn, _ in self.mapper.batch_transforms:
                            processed_batch = batch_fn(processed_batch)

                    buffer.extend(processed_batch)

                    if len(buffer) >= chunk_size:
                        out_table = pa.Table.from_pylist(buffer, schema=target_schema)
                        write_table_with_shards(out_table)
                        pbar.update(len(buffer))
                        buffer = []

            if buffer:
                out_table = pa.Table.from_pylist(buffer, schema=target_schema)
                write_table_with_shards(out_table)
                pbar.update(len(buffer))
    finally:
        if writer: writer.close()
        pbar.close()

iter_batches(batch_size, shuffle=False, seed=None)

Yields batches of data from the dataset.

Parameters:

Name Type Description Default
batch_size int

Number of rows per batch.

required
shuffle bool

Whether to shuffle before iterating.

False
seed Optional[int]

Random seed for shuffling.

None
Source code in src/indexed_parquet_dataset/dataset.py
def iter_batches(self, batch_size: int, shuffle: bool = False, seed: Optional[int] = None):
    """Yields batches of data from the dataset.

    Args:
        batch_size: Number of rows per batch.
        shuffle: Whether to shuffle before iterating.
        seed: Random seed for shuffling.
    """
    ds = self.shuffle(seed) if shuffle else self
    n = len(ds)
    for i in range(0, n, batch_size):
        end = min(i + batch_size, n)
        yield ds[i:end]

generate_collate_fn(on_none='raise')

Returns a DataLoader-compatible collate function with robust None handling.

Parameters:

Name Type Description Default
on_none str

Strategy for handling None values. 'raise' (default): Raises a descriptive TypeError. 'drop': Drops samples containing None from the batch. 'fill': Replaces None with 0/"" based on fill_values configuration.

'raise'

Raises:

Type Description
ImportError

If PyTorch is not installed.

Source code in src/indexed_parquet_dataset/dataset.py
def generate_collate_fn(self, on_none: str = 'raise') -> Callable:
    """Returns a DataLoader-compatible collate function with robust None handling.

    Args:
        on_none: Strategy for handling None values.
            'raise' (default): Raises a descriptive TypeError.
            'drop': Drops samples containing None from the batch.
            'fill': Replaces None with 0/"" based on fill_values configuration.

    Raises:
        ImportError: If PyTorch is not installed.
    """
    if not _TORCH_AVAILABLE:
        raise ImportError(
            "PyTorch is required for generate_collate_fn. "
            "Install it with: pip install torch"
        )
    # Pre-compute fill map: {column_name -> fill_value} for all known columns.
    # This avoids storing a reference to the full dataset inside CollateHandler.
    fill_map = {col: self._get_fill_value(col) for col in self.schema}
    return CollateHandler(fill_map, on_none)

CollateHandler

A helper class for correct batch collection in DataLoader. Use it via the ds.generate_collate_fn() method.

indexed_parquet_dataset.dataset.CollateHandler

Picklable helper for batch collation.

Stores only a pre-computed fill_map dict instead of a reference to the full dataset, so DataLoader workers (num_workers > 0) receive a minimal copy rather than the entire dataset index.

Source code in src/indexed_parquet_dataset/dataset.py
class CollateHandler:
    """Picklable helper for batch collation.

    Stores only a pre-computed fill_map dict instead of a reference to the
    full dataset, so DataLoader workers (num_workers > 0) receive a minimal
    copy rather than the entire dataset index.
    """

    def __init__(self, fill_map: Dict[str, Any], on_none: str):
        self.fill_map = fill_map  # {column_name -> fill_value}
        self.on_none = on_none

    def __call__(self, batch):
        from torch.utils.data._utils.collate import default_collate

        clean_batch = batch
        if self.on_none in ('drop', 'fill'):
            new_batch = []
            for item in batch:
                has_none = any(v is None for v in item.values())
                if has_none:
                    if self.on_none == 'drop':
                        continue
                    else:  # fill
                        item = item.copy()
                        for k, v in item.items():
                            if v is None:
                                item[k] = self.fill_map.get(k)
                new_batch.append(item)
            clean_batch = new_batch

        if not clean_batch:
            return {}

        try:
            return default_collate(clean_batch)
        except TypeError as e:
            if 'NoneType' in str(e):
                for i, item in enumerate(batch):
                    for k, v in item.items():
                        if v is None:
                            raise TypeError(
                                f"Batch collation failed: column '{k}' contains None at batch index {i}.\n"
                                f"PyTorch DataLoader cannot handle None values.\n"
                                f"Fix: Set 'auto_fill=True' or provide 'fill_values' when initializing IndexedParquetDataset."
                            ) from None
            raise e

SchemaMapper

The internal class responsible for schema transformations and calculated columns.

indexed_parquet_dataset.schema.SchemaMapper

Handles column mapping and aliasing for the dataset.

Source code in src/indexed_parquet_dataset/schema.py
class SchemaMapper:
    """Handles column mapping and aliasing for the dataset."""

    def __init__(
        self, 
        mapping: Optional[Dict[str, str]] = None, 
        file_mappings: Optional[Dict[str, Dict[str, str]]] = None,
        transforms: Optional[Dict[str, Callable]] = None,
        row_transforms: Optional[List[Callable[[dict], dict]]] = None,
        batch_transforms: Optional[List[tuple[Callable[[List[dict]], List[dict]], int]]] = None,
        batch_column_transforms: Optional[Dict[str, tuple[Callable[[List[dict]], List[Any]], int]]] = None
    ):
        """Initializes the SchemaMapper.

        Args:
            mapping: Global mapping (original name -> target name).
            file_mappings: File-specific mappings (file path -> {original -> target}).
            transforms: Global transformations (target name -> function(row)).
            row_transforms: Row-level transformations (list of functions function(row) -> row).
            batch_transforms: Batch-level transformations (list of (function, batch_size) tuples).
            batch_column_transforms: Batch-level column transformations (target name -> (function(batch), batch_size)).
        """
        self.mapping = mapping if mapping is not None else {}
        self.file_mappings = file_mappings if file_mappings is not None else {}
        self.transforms = transforms if transforms is not None else {}
        self.row_transforms = row_transforms if row_transforms is not None else []
        self.batch_transforms = batch_transforms if batch_transforms is not None else []
        self.batch_column_transforms = batch_column_transforms if batch_column_transforms is not None else {}
        self._rebuild_reverse_mapping()

    def _rebuild_reverse_mapping(self) -> None:
        """Rebuilds the reverse mapping for fast lookups."""
        self.reverse_mapping = {v: k for k, v in self.mapping.items()}

    def map_columns(self, data: Dict[str, Any], file_path: Optional[str] = None) -> Dict[str, Any]:
        """Renames columns in the input data according to global and file-specific mappings.

        Args:
            data: Raw dictionary of column values.
            file_path: Optional path to the file from which data was read.

        Returns:
            A dictionary with mapped column names.
        """
        # Apply file-specific mapping first if available
        current_data = data
        if file_path and file_path in self.file_mappings:
            f_map = self.file_mappings[file_path]
            current_data = {f_map.get(k, k): v for k, v in current_data.items()}

        if not self.mapping:
            mapped_data = current_data.copy()
        else:
            mapped_data = {}
            for col, val in current_data.items():
                target_name = self.mapping.get(col, col)
                mapped_data[target_name] = val

        # Apply transforms (computed columns)
        if not self.transforms:
            return mapped_data

        for target_name, transform in self.transforms.items():
            try:
                mapped_data[target_name] = transform(mapped_data)
            except Exception:
                pass

        return mapped_data

    def get_source_column(self, target_column: str, file_path: Optional[str] = None) -> str:
        """Returns the original column name for a given target name.

        Args:
            target_column: The mapped name of the column.
            file_path: Optional path to the file.

        Returns:
            The original column name.
        """
        return self.reverse_mapping.get(target_column, target_column)

    def select_source_columns(self, target_columns: List[str]) -> List[str]:
        """Returns a list of original column names required for the requested target columns."""
        return [self.get_source_column(col) for col in target_columns]

    def to_dict(self) -> Dict[str, Any]:
        """Converts the mapper state to a dictionary."""
        return {
            "mapping": self.mapping,
            "file_mappings": self.file_mappings,
            "transforms": self.transforms,
            "row_transforms": self.row_transforms,
            "batch_transforms": self.batch_transforms,
            "batch_column_transforms": self.batch_column_transforms
        }

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> 'SchemaMapper':
        """Creates a SchemaMapper from a dictionary."""
        return cls(
            mapping=data.get("mapping"),
            file_mappings=data.get("file_mappings"),
            transforms=data.get("transforms"),
            row_transforms=data.get("row_transforms"),
            batch_transforms=data.get("batch_transforms"),
            batch_column_transforms=data.get("batch_column_transforms")
        )

    def merge(self, other: 'SchemaMapper', self_files: List[str], other_files: List[str]) -> 'SchemaMapper':
        """Merges this mapper with another one, preserving conflicting aliases via file-specific mappings.

        Args:
            other: The other SchemaMapper to merge.
            self_files: List of absolute file paths belonging to the current dataset.
            other_files: List of absolute file paths belonging to the other dataset.

        Returns:
            A new merged SchemaMapper.
        """
        new_file_mappings = self.file_mappings.copy()
        new_file_mappings.update(other.file_mappings)

        new_global_mapping = self.mapping.copy()

        for src_col, target_col in other.mapping.items():
            if src_col in new_global_mapping:
                if new_global_mapping[src_col] != target_col:
                    # Conflict: same source column, different targets.
                    # We preserve the alias for 'other' files by moving it to file_mappings.
                    for f_path in other_files:
                        if f_path not in new_file_mappings:
                            new_file_mappings[f_path] = {}
                        # Only set if not already present in file_mappings
                        if src_col not in new_file_mappings[f_path]:
                            new_file_mappings[f_path][src_col] = target_col
                # If the targets match, no conflict at global level.
            else:
                # No conflict with current global mapping, can add safely.
                new_global_mapping[src_col] = target_col

        new_transforms = self.transforms.copy()
        new_transforms.update(other.transforms)

        new_row_transforms = self.row_transforms + other.row_transforms
        new_batch_transforms = self.batch_transforms + other.batch_transforms

        new_batch_column_transforms = self.batch_column_transforms.copy()
        new_batch_column_transforms.update(other.batch_column_transforms)

        return SchemaMapper(
            new_global_mapping, 
            new_file_mappings, 
            new_transforms, 
            new_row_transforms,
            new_batch_transforms,
            new_batch_column_transforms
        )

    def __repr__(self) -> str:
        return f"SchemaMapper(mapping={self.mapping}, file_mappings={self.file_mappings})"

Functions

__init__(mapping=None, file_mappings=None, transforms=None, row_transforms=None, batch_transforms=None, batch_column_transforms=None)

Initializes the SchemaMapper.

Parameters:

Name Type Description Default
mapping Optional[Dict[str, str]]

Global mapping (original name -> target name).

None
file_mappings Optional[Dict[str, Dict[str, str]]]

File-specific mappings (file path -> {original -> target}).

None
transforms Optional[Dict[str, Callable]]

Global transformations (target name -> function(row)).

None
row_transforms Optional[List[Callable[[dict], dict]]]

Row-level transformations (list of functions function(row) -> row).

None
batch_transforms Optional[List[tuple[Callable[[List[dict]], List[dict]], int]]]

Batch-level transformations (list of (function, batch_size) tuples).

None
batch_column_transforms Optional[Dict[str, tuple[Callable[[List[dict]], List[Any]], int]]]

Batch-level column transformations (target name -> (function(batch), batch_size)).

None
Source code in src/indexed_parquet_dataset/schema.py
def __init__(
    self, 
    mapping: Optional[Dict[str, str]] = None, 
    file_mappings: Optional[Dict[str, Dict[str, str]]] = None,
    transforms: Optional[Dict[str, Callable]] = None,
    row_transforms: Optional[List[Callable[[dict], dict]]] = None,
    batch_transforms: Optional[List[tuple[Callable[[List[dict]], List[dict]], int]]] = None,
    batch_column_transforms: Optional[Dict[str, tuple[Callable[[List[dict]], List[Any]], int]]] = None
):
    """Initializes the SchemaMapper.

    Args:
        mapping: Global mapping (original name -> target name).
        file_mappings: File-specific mappings (file path -> {original -> target}).
        transforms: Global transformations (target name -> function(row)).
        row_transforms: Row-level transformations (list of functions function(row) -> row).
        batch_transforms: Batch-level transformations (list of (function, batch_size) tuples).
        batch_column_transforms: Batch-level column transformations (target name -> (function(batch), batch_size)).
    """
    self.mapping = mapping if mapping is not None else {}
    self.file_mappings = file_mappings if file_mappings is not None else {}
    self.transforms = transforms if transforms is not None else {}
    self.row_transforms = row_transforms if row_transforms is not None else []
    self.batch_transforms = batch_transforms if batch_transforms is not None else []
    self.batch_column_transforms = batch_column_transforms if batch_column_transforms is not None else {}
    self._rebuild_reverse_mapping()

from_dict(data) classmethod

Creates a SchemaMapper from a dictionary.

Source code in src/indexed_parquet_dataset/schema.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SchemaMapper':
    """Creates a SchemaMapper from a dictionary."""
    return cls(
        mapping=data.get("mapping"),
        file_mappings=data.get("file_mappings"),
        transforms=data.get("transforms"),
        row_transforms=data.get("row_transforms"),
        batch_transforms=data.get("batch_transforms"),
        batch_column_transforms=data.get("batch_column_transforms")
    )

get_source_column(target_column, file_path=None)

Returns the original column name for a given target name.

Parameters:

Name Type Description Default
target_column str

The mapped name of the column.

required
file_path Optional[str]

Optional path to the file.

None

Returns:

Type Description
str

The original column name.

Source code in src/indexed_parquet_dataset/schema.py
def get_source_column(self, target_column: str, file_path: Optional[str] = None) -> str:
    """Returns the original column name for a given target name.

    Args:
        target_column: The mapped name of the column.
        file_path: Optional path to the file.

    Returns:
        The original column name.
    """
    return self.reverse_mapping.get(target_column, target_column)

map_columns(data, file_path=None)

Renames columns in the input data according to global and file-specific mappings.

Parameters:

Name Type Description Default
data Dict[str, Any]

Raw dictionary of column values.

required
file_path Optional[str]

Optional path to the file from which data was read.

None

Returns:

Type Description
Dict[str, Any]

A dictionary with mapped column names.

Source code in src/indexed_parquet_dataset/schema.py
def map_columns(self, data: Dict[str, Any], file_path: Optional[str] = None) -> Dict[str, Any]:
    """Renames columns in the input data according to global and file-specific mappings.

    Args:
        data: Raw dictionary of column values.
        file_path: Optional path to the file from which data was read.

    Returns:
        A dictionary with mapped column names.
    """
    # Apply file-specific mapping first if available
    current_data = data
    if file_path and file_path in self.file_mappings:
        f_map = self.file_mappings[file_path]
        current_data = {f_map.get(k, k): v for k, v in current_data.items()}

    if not self.mapping:
        mapped_data = current_data.copy()
    else:
        mapped_data = {}
        for col, val in current_data.items():
            target_name = self.mapping.get(col, col)
            mapped_data[target_name] = val

    # Apply transforms (computed columns)
    if not self.transforms:
        return mapped_data

    for target_name, transform in self.transforms.items():
        try:
            mapped_data[target_name] = transform(mapped_data)
        except Exception:
            pass

    return mapped_data

merge(other, self_files, other_files)

Merges this mapper with another one, preserving conflicting aliases via file-specific mappings.

Parameters:

Name Type Description Default
other 'SchemaMapper'

The other SchemaMapper to merge.

required
self_files List[str]

List of absolute file paths belonging to the current dataset.

required
other_files List[str]

List of absolute file paths belonging to the other dataset.

required

Returns:

Type Description
'SchemaMapper'

A new merged SchemaMapper.

Source code in src/indexed_parquet_dataset/schema.py
def merge(self, other: 'SchemaMapper', self_files: List[str], other_files: List[str]) -> 'SchemaMapper':
    """Merges this mapper with another one, preserving conflicting aliases via file-specific mappings.

    Args:
        other: The other SchemaMapper to merge.
        self_files: List of absolute file paths belonging to the current dataset.
        other_files: List of absolute file paths belonging to the other dataset.

    Returns:
        A new merged SchemaMapper.
    """
    new_file_mappings = self.file_mappings.copy()
    new_file_mappings.update(other.file_mappings)

    new_global_mapping = self.mapping.copy()

    for src_col, target_col in other.mapping.items():
        if src_col in new_global_mapping:
            if new_global_mapping[src_col] != target_col:
                # Conflict: same source column, different targets.
                # We preserve the alias for 'other' files by moving it to file_mappings.
                for f_path in other_files:
                    if f_path not in new_file_mappings:
                        new_file_mappings[f_path] = {}
                    # Only set if not already present in file_mappings
                    if src_col not in new_file_mappings[f_path]:
                        new_file_mappings[f_path][src_col] = target_col
            # If the targets match, no conflict at global level.
        else:
            # No conflict with current global mapping, can add safely.
            new_global_mapping[src_col] = target_col

    new_transforms = self.transforms.copy()
    new_transforms.update(other.transforms)

    new_row_transforms = self.row_transforms + other.row_transforms
    new_batch_transforms = self.batch_transforms + other.batch_transforms

    new_batch_column_transforms = self.batch_column_transforms.copy()
    new_batch_column_transforms.update(other.batch_column_transforms)

    return SchemaMapper(
        new_global_mapping, 
        new_file_mappings, 
        new_transforms, 
        new_row_transforms,
        new_batch_transforms,
        new_batch_column_transforms
    )

select_source_columns(target_columns)

Returns a list of original column names required for the requested target columns.

Source code in src/indexed_parquet_dataset/schema.py
def select_source_columns(self, target_columns: List[str]) -> List[str]:
    """Returns a list of original column names required for the requested target columns."""
    return [self.get_source_column(col) for col in target_columns]

to_dict()

Converts the mapper state to a dictionary.

Source code in src/indexed_parquet_dataset/schema.py
def to_dict(self) -> Dict[str, Any]:
    """Converts the mapper state to a dictionary."""
    return {
        "mapping": self.mapping,
        "file_mappings": self.file_mappings,
        "transforms": self.transforms,
        "row_transforms": self.row_transforms,
        "batch_transforms": self.batch_transforms,
        "batch_column_transforms": self.batch_column_transforms
    }

BaseIndex

The class that stores index metadata (file offsets, row counts).

indexed_parquet_dataset.indexer.BaseIndex dataclass

Metadata about the entire dataset.

Attributes:

Name Type Description
files List[FileInfo]

List of FileInfo objects for all files in the dataset.

total_rows int

Combined number of rows across all files.

all_columns List[str]

Sorted list of all unique columns found across all files.

column_types Dict[str, str]

Dict mapping column names to their PyArrow type as a string.

Source code in src/indexed_parquet_dataset/indexer.py
@dataclass
class BaseIndex:
    """Metadata about the entire dataset.

    Attributes:
        files: List of FileInfo objects for all files in the dataset.
        total_rows: Combined number of rows across all files.
        all_columns: Sorted list of all unique columns found across all files.
        column_types: Dict mapping column names to their PyArrow type as a string.
    """
    files: List[FileInfo]
    total_rows: int
    all_columns: List[str]
    column_types: Dict[str, str]