from collections import defaultdict import copy from packaging.version import Version from functools import lru_cache import io import struct import os import operator import re import numbers import zoneinfo import numpy as np import pandas as pd import fsspec from fastparquet import parquet_thrift from fastparquet.cencoding import ThriftObject from fastparquet import __version__ PANDAS_VERSION = Version(pd.__version__) created_by = f"fastparquet-python version {__version__} (build 0)" class ParquetException(Exception): """Generic Exception related to unexpected data format when reading parquet file.""" pass def default_mkdirs(f): os.makedirs(f, exist_ok=True) PATH_DATE_FMT = '%Y%m%d_%H%M%S.%f' def path_string(o): if isinstance(o, pd.Timestamp): return o.isoformat() return str(o) default_open = open def default_remove(paths): for path in paths: try: os.unlink(path) except IOError: pass def val_from_meta(x, meta): try: if meta['pandas_type'] == 'categorical': return x t = np.dtype(meta['numpy_type']) if t == "bool": return x in [True, "true", "True", 't', "T", 1, "1"] return np.dtype(t).type(x) except ValueError: if meta['numpy_type'] == 'datetime64[ns]': return pd.to_datetime(x, format=PATH_DATE_FMT) else: raise def val_to_num(x, meta=None): """Parse a string as a number, date or timedelta if possible""" if meta: return val_from_meta(x, meta) return _val_to_num(x) @lru_cache(1000) def _val_to_num(x): if isinstance(x, numbers.Real): return x if x in ['now', 'NOW', 'TODAY', '']: return x if type(x) == str and x.lower() == 'nan': return x if x == "True": return True if x == "False": return False try: return int(x, base=10) except: pass try: return float(x) except: pass try: return pd.Timestamp(x) except: pass try: # TODO: determine the valid usecases for this, then try to limit the set # ofstrings which may get inadvertently converted to timedeltas return pd.Timedelta(x) except: return x def ensure_bytes(s): return s.encode('utf-8') if isinstance(s, str) else s def ensure_str(b, *, ignore_error=False): if isinstance(b, str): return b else: try: return b.decode('utf-8') except (UnicodeDecodeError, AttributeError): if not ignore_error: raise return b def check_column_names(columns, *args): """Ensure that parameters listing column names have corresponding columns""" for arg in args: if isinstance(arg, (tuple, list)): missing = set(arg) - set(columns) if missing: raise ValueError("Following columns were requested but are " "not available: %s.\n" "All requested columns: %s\n" "Available columns: %s" "" % (missing, arg, columns)) def reset_row_idx(data: pd.DataFrame) -> pd.DataFrame: """Reset row (multi-)index as column(s) of the DataFrame. Multi-index are stored in columns, one per index level. Parameters ---------- data : dataframe Returns ------- dataframe """ if isinstance(data.index, pd.MultiIndex): for name, cats, codes in zip(data.index.names, data.index.levels, data.index.codes): data = data.assign(**{name: pd.Categorical.from_codes(codes, cats)}) data.reset_index(drop=True) else: data = data.reset_index() return data def metadata_from_many(file_list, verify_schema=False, open_with=default_open, root=False, fs=None): """ Given list of parquet files, make a FileMetaData that points to them Parameters ---------- file_list: list of paths of parquet files verify_schema: bool (False) Whether to assert that the schemas in each file are identical open_with: function Use this to open each path. root: str Top of the dataset's directory tree, for cases where it can't be automatically inferred. fs: fsspsec.AbstractFileSystem Used in preference to open_with, if given Returns ------- basepath: the root path that other paths are relative to fmd: metadata thrift structure """ from fastparquet import api legacy = True if all(isinstance(pf, api.ParquetFile) for pf in file_list): pfs = file_list file_list = [pf.fn for pf in pfs] elif all(not isinstance(pf, api.ParquetFile) for pf in file_list): if verify_schema or fs is None or len(file_list) < 3: pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list] else: # activate new code path here f0 = file_list[0] pf0 = api.ParquetFile(f0, open_with=open_with) if pf0.file_scheme not in ['empty', 'simple']: # set of directories, revert pfs = [pf0] + [api.ParquetFile(fn, open_with=open_with) for fn in file_list[1:]] else: # permits concurrent fetch of footers; needs fsspec >= 2021.6 size = int(1.4 * pf0._head_size) pieces = fs.cat(file_list[1:], start=-size) sizes = {path: int.from_bytes(piece[-8:-4], "little") + 8 for path, piece in pieces.items()} not_bigenough = [path for path, s in sizes.items() if s > size] if not_bigenough: new_pieces = fs.cat(not_bigenough, start=-max(sizes.values())) pieces.update(new_pieces) pieces = {k: _get_fmd(v) for k, v in pieces.items()} pieces = [(fn, pieces[fn]) for fn in file_list[1:]] # recover ordering legacy = False else: raise ValueError("Merge requires all ParquetFile instances or none") basepath, file_list = analyse_paths(file_list, root=root) if legacy: # legacy code path if verify_schema: for pf in pfs[1:]: if pf._schema != pfs[0]._schema: raise ValueError('Incompatible schemas') fmd = copy.copy(pfs[0].fmd) # we inherit "created by" field rgs = [] for pf, fn in zip(pfs, file_list): if pf.file_scheme not in ['simple', 'empty']: for rg in pf.row_groups: rg = copy.copy(rg) rg.columns = [copy.copy(c) for c in rg.columns] for chunk in rg.columns: chunk.file_path = '/'.join( [fn, chunk.file_path if isinstance(chunk.file_path, str) else chunk.file_path.decode()] ) rgs.append(rg) else: for rg in pf.row_groups: rg = copy.copy(rg) rg.columns = [copy.copy(c) for c in rg.columns] for chunk in rg.columns: chunk.file_path = fn rgs.append(rg) fmd.row_groups = rgs fmd.num_rows = sum(rg.num_rows for rg in fmd.row_groups) return basepath, fmd for rg in pf0.fmd.row_groups: # chunks of first file, which would have file_path=None rg.columns[0].file_path = f0[len(basepath):].lstrip("/") rgs0 = pf0.fmd.row_groups for k, v in pieces: # Set file paths on other files if len(v.schema) > len(pf0.fmd.schema): # or was UPDATED with supercast pf0.fmd.schema = v.schema rgs = v.row_groups or [] for rg in rgs: rg.columns[0].file_path = k[len(basepath):].lstrip("/") rgs0.extend(rgs) pf0.fmd.row_groups = rgs0 pf0.fmd.num_rows = sum(rg.num_rows for rg in pf0.fmd.row_groups) return basepath, pf0.fmd def _get_fmd(inbytes): from .cencoding import from_buffer f = io.BytesIO(inbytes) f.seek(-8, 2) head_size = struct.unpack(' 1: return 'other' if set(lens) == {1}: return 'flat' matches = all(all("=" in p[1:-1] for p in part[:-1]) for part in parts) return "hive" if matches else "drill" def join_path(*path): return "/".join([str(p).replace("\\", "/").rstrip("/") for p in path if p]) def _strip_path_tail(paths) -> set: return {path.rsplit("/", 1)[0] if "/" in path else "" for path in paths} ops = { "==": operator.eq, "=": operator.eq, "!=": operator.ne, ">": operator.gt, ">=": operator.ge, "<": operator.lt, "<=": operator.le } def norm_col_name(name, is_index:bool=None): if isinstance(name, tuple): if is_index: return name[0] else: return str(name) return name def get_fs(fn, open_with, mkdirs): fs = None if "FastParquetImpl.write.." in str(open_with): import inspect so = inspect.getclosurevars(open_with).nonlocals["storage_options"] or {} fs, fn = fsspec.core.url_to_fs(fn, **so) open_with = fs.open mkdirs = mkdirs or (lambda d: fs.mkdirs(d, exist_ok=True)) return fs, fn, open_with, mkdirs