Import python venv for stability

This commit is contained in:
2026-02-15 21:24:16 -08:00
parent 1343e93a59
commit 7d784705c9
4997 changed files with 1628270 additions and 0 deletions
@@ -0,0 +1,8 @@
"""parquet - read parquet files."""
from fastparquet._version import __version__
from fastparquet.writer import write, update_file_custom_metadata
from fastparquet import core, schema, converted_types, api
from fastparquet.api import ParquetFile
from fastparquet.util import ParquetException
@@ -0,0 +1,34 @@
# file generated by setuptools-scm
# don't change, don't track in version control
__all__ = [
"__version__",
"__version_tuple__",
"version",
"version_tuple",
"__commit_id__",
"commit_id",
]
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import Tuple
from typing import Union
VERSION_TUPLE = Tuple[Union[int, str], ...]
COMMIT_ID = Union[str, None]
else:
VERSION_TUPLE = object
COMMIT_ID = object
version: str
__version__: str
__version_tuple__: VERSION_TUPLE
version_tuple: VERSION_TUPLE
commit_id: COMMIT_ID
__commit_id__: COMMIT_ID
__version__ = version = '2025.12.0'
__version_tuple__ = version_tuple = (2025, 12, 0)
__commit_id__ = commit_id = 'gb94076c40'
File diff suppressed because it is too large Load Diff
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,114 @@
import cramjam
import numpy as np
from fastparquet import parquet_thrift
# TODO: use stream/direct-to-buffer conversions instead of memcopy
compressions = {
'UNCOMPRESSED': lambda x: x
}
decompressions = {
'UNCOMPRESSED': lambda x, y: x
}
# Gzip is present regardless
COMPRESSION_LEVEL = 6
def gzip_compress_v3(data, compresslevel=COMPRESSION_LEVEL):
return cramjam.gzip.compress(data, level=compresslevel)
def gzip_decompress(data, uncompressed_size):
return cramjam.gzip.decompress(data, output_len=uncompressed_size)
compressions['GZIP'] = gzip_compress_v3
decompressions['GZIP'] = gzip_decompress
compressions['SNAPPY'] = cramjam.snappy.compress_raw
decompressions['SNAPPY'] = cramjam.snappy.decompress_raw
try:
import lzo
def lzo_decompress(data, uncompressed_size):
return lzo.decompress(data)
compressions['LZO'] = lzo.compress
decompressions['LZO'] = lzo_decompress
except ImportError:
pass
compressions['BROTLI'] = cramjam.brotli.compress
decompressions['BROTLI'] = cramjam.brotli.decompress
def lz4_compress(data, **kwargs):
kwargs['store_size'] = False
return cramjam.lz4.compress_block(data, **kwargs)
def lz4_decomp(data, size):
return cramjam.lz4.decompress_block(np.frombuffer(data, 'uint8'), size)
compressions['LZ4'] = lz4_compress
decompressions['LZ4'] = lz4_decomp
# LZ4 is actually LZ4 block, aka "raw", see
# https://github.com/apache/parquet-format/commit/7f06e838cbd1b7dbd722ff2580b9c2525e37fc46
compressions['LZ4_RAW'] = lz4_compress
decompressions['LZ4_RAW'] = lz4_decomp
compressions['ZSTD'] = cramjam.zstd.compress
decompressions['ZSTD'] = cramjam.zstd.decompress
decom_into = {
"GZIP": cramjam.gzip.decompress_into,
"SNAPPY": cramjam.snappy.decompress_raw_into,
"ZSTD": cramjam.zstd.decompress_into,
"BROTLI": cramjam.brotli.decompress_into
}
compressions = {k.upper(): v for k, v in compressions.items()}
decompressions = {k.upper(): v for k, v in decompressions.items()}
rev_map = {getattr(parquet_thrift.CompressionCodec, key): key for key in
dir(parquet_thrift.CompressionCodec) if key in
['UNCOMPRESSED', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD', 'LZ4_RAW']}
def compress_data(data, compression='gzip'):
if isinstance(compression, dict):
algorithm = compression.get('type', 'gzip')
if isinstance(algorithm, int):
algorithm = rev_map[compression]
args = compression.get('args', None)
else:
algorithm = compression
args = None
if isinstance(algorithm, int):
algorithm = rev_map[compression]
if algorithm.upper() not in compressions:
raise RuntimeError("Compression '%s' not available. Options: %s" %
(algorithm, sorted(compressions)))
if args is None:
return compressions[algorithm.upper()](data)
else:
if not isinstance(args, dict):
raise ValueError("args dict entry is not a dict")
return compressions[algorithm.upper()](data, **args)
def decompress_data(data, uncompressed_size, algorithm='gzip'):
if isinstance(algorithm, int):
algorithm = rev_map[algorithm]
if algorithm.upper() not in decompressions:
raise RuntimeError(
"Decompression '%s' not available. Options: %s" %
(algorithm.upper(), sorted(decompressions))
)
if algorithm.upper() in decom_into:
# ensures writable buffer from cramjam
x = np.empty(uncompressed_size, dtype='uint8')
decom_into[algorithm.upper()](np.frombuffer(data, dtype=np.uint8), x)
return x
return decompressions[algorithm.upper()](data, uncompressed_size)
@@ -0,0 +1,248 @@
# -#- coding: utf-8 -#-
"""
Deal with parquet logical types (aka converted types), higher-order things built from primitive types.
The implementations in this class are pure python for the widest compatibility,
but they're not necessarily the most performant.
"""
import logging
import numpy as np
import pandas as pd
from fastparquet import parquet_thrift
from fastparquet.cencoding import time_shift
from fastparquet.json import json_decoder
logger = logging.getLogger('parquet') # pylint: disable=invalid-name
try:
from bson import BSON
unbson = BSON.decode
tobson = BSON.encode
except ImportError: # pragma: no cover
try:
import bson
unbson = bson.loads
tobson = bson.dumps
except:
def unbson(x):
raise ImportError("BSON not found")
def tobson(x):
raise ImportError("BSON not found")
# Explicitly use numpy type in order to avoid promotion errors due to NEP 50 in numpy >= 2
DAYS_TO_NANOS = np.int64(86400000000000)
"""Number of nanoseconds in a day. Used to convert a Date to a date"""
nat = np.datetime64('NaT').view('int64')
simple = {
parquet_thrift.Type.INT32: np.dtype('int32'),
parquet_thrift.Type.INT64: np.dtype('int64'),
parquet_thrift.Type.FLOAT: np.dtype('float32'),
parquet_thrift.Type.DOUBLE: np.dtype('float64'),
parquet_thrift.Type.BOOLEAN: np.dtype('bool'),
parquet_thrift.Type.INT96: np.dtype('S12'),
parquet_thrift.Type.BYTE_ARRAY: np.dtype("O"),
parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY: np.dtype("O")
}
complex = {
parquet_thrift.ConvertedType.UTF8: np.dtype("O"),
parquet_thrift.ConvertedType.DECIMAL: np.dtype('float64'),
parquet_thrift.ConvertedType.UINT_8: np.dtype("uint8"),
parquet_thrift.ConvertedType.UINT_16: np.dtype("uint16"),
parquet_thrift.ConvertedType.UINT_32: np.dtype('uint32'),
parquet_thrift.ConvertedType.UINT_64: np.dtype('uint64'),
parquet_thrift.ConvertedType.INT_8: np.dtype("int8"),
parquet_thrift.ConvertedType.INT_16: np.dtype("int16"),
parquet_thrift.ConvertedType.INT_32: np.dtype('int32'),
parquet_thrift.ConvertedType.INT_64: np.dtype('int64'),
parquet_thrift.ConvertedType.TIME_MILLIS: np.dtype('<m8[ms]'),
parquet_thrift.ConvertedType.DATE: np.dtype('<M8[ns]'),
parquet_thrift.ConvertedType.TIMESTAMP_MILLIS: np.dtype('<M8[ms]'),
parquet_thrift.ConvertedType.TIME_MICROS: np.dtype('<m8[us]'),
parquet_thrift.ConvertedType.TIMESTAMP_MICROS: np.dtype('<M8[us]')
}
nullable = {
np.dtype('int8'): pd.Int8Dtype(),
np.dtype('int16'): pd.Int16Dtype(),
np.dtype('int32'): pd.Int32Dtype(),
np.dtype('int64'): pd.Int64Dtype(),
np.dtype('uint8'): pd.UInt8Dtype(),
np.dtype('uint16'): pd.UInt16Dtype(),
np.dtype('uint32'): pd.UInt32Dtype(),
np.dtype('uint64'): pd.UInt64Dtype(),
np.dtype('bool'): pd.BooleanDtype()
}
pandas_nullable = {
"Int8": pd.Int8Dtype(),
"Int16": pd.Int16Dtype(),
"Int32": pd.Int32Dtype(),
"Int64": pd.Int64Dtype(),
"UInt8": pd.UInt8Dtype(),
"UInt16": pd.UInt16Dtype(),
"UInt32": pd.UInt32Dtype(),
"UInt64": pd.UInt64Dtype(),
"boolean": pd.BooleanDtype()
}
def _logical_to_time_dtype(logical_timestamp_type):
if getattr(logical_timestamp_type.unit, "NANOS", None) is not None:
unit = "ns"
elif getattr(logical_timestamp_type.unit, "MICROS", None) is not None:
unit = "us"
elif getattr(logical_timestamp_type.unit, "MILLIS", None) is not None:
unit = "ms"
else:
raise ValueError("Timestamp ")
return np.dtype(f"<M8[{unit}]")
def typemap(se, md=None):
"""Get the final dtype - no actual conversion"""
md = md or {}
md = md.get(se.name, {})
if md and ("Int" in md["numpy_type"] or md["numpy_type"] == "boolean"):
# arrow has numpy and pandas types swapped
return pandas_nullable[md["numpy_type"]]
if md and ("Int" in md["pandas_type"] or md["pandas_type"] == "boolean"):
return pandas_nullable[md["pandas_type"]]
if se.logicalType is not None and se.logicalType.TIMESTAMP is not None:
return _logical_to_time_dtype(se.logicalType.TIMESTAMP)
if se.converted_type is None:
if se.type in simple:
return simple[se.type]
else:
return np.dtype("S%i" % se.type_length)
if md and "time" in md.get("numpy_type", ""):
return np.dtype(md["numpy_type"])
if se.converted_type in complex:
return complex[se.converted_type]
return np.dtype("O")
def converts_inplace(se):
"""when converting, reuses input array"""
if se.type == parquet_thrift.Type.BOOLEAN:
return False # always needs unpacking
ctype = se.converted_type
if ctype is None:
return True
if se.type == parquet_thrift.Type.BYTE_ARRAY:
return ctype == parquet_thrift.ConvertedType.UTF8
if ctype in [
parquet_thrift.ConvertedType.DATE,
parquet_thrift.ConvertedType.TIME_MILLIS,
parquet_thrift.ConvertedType.TIMESTAMP_MILLIS,
parquet_thrift.ConvertedType.TIME_MICROS,
parquet_thrift.ConvertedType.TIMESTAMP_MICROS
]:
return True
if getattr(se.logicalType, "TIMESTAMP", None) is not None:
# this will be nanos, since micro and milli hit block above
return True
return False
def convert(data, se, timestamp96=True, dtype=None):
"""Convert known types from primitive to rich.
Parameters
----------
data: pandas series of primitive type
se: a schema element.
timestamp96: convert int96 as if it were written by mr-parquet
"""
ctype = se.converted_type
if se.type == parquet_thrift.Type.INT96 and timestamp96:
data2 = data.view([('ns', 'i8'), ('day', 'i4')])
# TODO: this should be ms unit, now that we can?
return ((data2['day'] - np.int64(2440588)) * DAYS_TO_NANOS +
data2['ns']).view('M8[ns]')
if se.logicalType is not None and se.logicalType.TIMESTAMP is not None:
dt = _logical_to_time_dtype(se.logicalType.TIMESTAMP)
return data.view(dt)
if ctype is None:
return data
if ctype == parquet_thrift.ConvertedType.UTF8:
if data.dtype != "O" or (len(data) == 1 and not isinstance(data[0], str)):
# fixed string
import pandas as pd
return pd.Series(data).str.decode("utf8").values
# already converted in speedups.unpack_byte_array
return data
if ctype == parquet_thrift.ConvertedType.DECIMAL:
scale_factor = 10**-se.scale
if data.dtype.kind in ['i', 'f']:
return data * scale_factor
else: # byte-string
# NB: general but slow method
# could optimize when data.dtype.itemsize <= 8
# TODO: easy cythonize (but rare)
# TODO: extension point for pandas-decimal (no conversion needed)
return np.array([
int.from_bytes(
data.data[i:i + 1], byteorder='big', signed=True
) * scale_factor
for i in range(len(data))
])
elif ctype == parquet_thrift.ConvertedType.DATE:
data = data * DAYS_TO_NANOS
return data.view('datetime64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIME_MILLIS:
# this was not covered by new pandas time units
data = data.astype('int64', copy=False)
time_shift(data, 1000000)
return data.view('timedelta64[ns]')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MILLIS:
return data.view('datetime64[ms]')
elif ctype == parquet_thrift.ConvertedType.TIME_MICROS:
return data.view('timedelta64[us]')
elif ctype == parquet_thrift.ConvertedType.TIMESTAMP_MICROS:
return data.view('datetime64[us]')
elif ctype == parquet_thrift.ConvertedType.UINT_8:
# TODO: return strided views?
# data.view('uint8')[::data.itemsize].view(out_dtype)
return data.astype(np.uint8)
elif ctype == parquet_thrift.ConvertedType.UINT_16:
return data.astype(np.uint16)
elif ctype == parquet_thrift.ConvertedType.UINT_32:
return data.astype(np.uint32)
elif ctype == parquet_thrift.ConvertedType.UINT_64:
return data.astype(np.uint64)
elif ctype == parquet_thrift.ConvertedType.INT_8:
return data.astype(np.int8)
elif ctype == parquet_thrift.ConvertedType.INT_16:
return data.astype(np.int16)
elif ctype == parquet_thrift.ConvertedType.INT_32:
return data.astype(np.int32)
elif ctype == parquet_thrift.ConvertedType.INT_64:
return data.astype(np.int64)
elif ctype == parquet_thrift.ConvertedType.JSON:
if isinstance(data, list) or data.dtype != "O":
out = np.empty(len(data), dtype="O")
else:
out = data
# TODO: unnecessary list - loop would save memory, and can cythonize
decoder = json_decoder()
out[:] = [decoder(d) for d in data]
return out
elif ctype == parquet_thrift.ConvertedType.BSON:
if isinstance(data, list) or data.dtype != "O":
out = np.empty(len(data), dtype="O")
else:
out = data
# TODO: unnecessary list - loop would save memory, and can cythonize
# and could use better BSON lib (bson-numpy, python-bsonjs)?
out[:] = [unbson(d) for d in data]
return out
elif ctype == parquet_thrift.ConvertedType.INTERVAL:
# for those that understand, output is month, day, ms
# maybe should convert to timedelta
return data.view('<u4').reshape((len(data), -1))
else:
logger.info("Converted type '%s'' not handled",
parquet_thrift.ConvertedType._VALUES_TO_NAMES[ctype]) # pylint:disable=protected-access
return data
@@ -0,0 +1,658 @@
import numpy as np
import pandas as pd
from fastparquet import encoding
from fastparquet.encoding import read_plain
import fastparquet.cencoding as encoding
from fastparquet.compression import decompress_data, rev_map, decom_into
from fastparquet.converted_types import convert, simple, converts_inplace
from fastparquet.schema import _is_list_like, _is_map_like
from fastparquet.speedups import unpack_byte_array
from fastparquet import parquet_thrift
from fastparquet.cencoding import ThriftObject
from fastparquet.util import val_to_num
def _read_page(file_obj, page_header, column_metadata):
"""Read the data page from the given file-object and convert it to raw,
uncompressed bytes (if necessary)."""
raw_bytes = file_obj.read(page_header.compressed_page_size)
raw_bytes = decompress_data(
raw_bytes,
page_header.uncompressed_page_size,
column_metadata.codec,
)
if column_metadata.codec:
assert len(raw_bytes) == page_header.uncompressed_page_size, \
"found {0} raw bytes (expected {1})".format(
len(raw_bytes),
page_header.uncompressed_page_size)
return raw_bytes
def read_data(fobj, coding, count, bit_width, out=None):
"""For definition and repetition levels
Reads with RLE/bitpacked hybrid, where length is given by first byte.
out: potentially provide a len(count) uint8 array to reuse
"""
out = out or np.empty(count, dtype=np.uint8)
o = encoding.NumpyIO(out)
if coding == parquet_thrift.Encoding.RLE:
while o.tell() < count:
encoding.read_rle_bit_packed_hybrid(fobj, bit_width, 0, o, itemsize=1)
else:
raise NotImplementedError('Encoding %s' % coding)
return out
def read_def(io_obj, daph, helper, metadata, out=None):
"""
Read the definition levels from this page, if any.
"""
definition_levels = None
num_nulls = 0
if not helper.is_required(metadata.path_in_schema):
max_definition_level = helper.max_definition_level(
metadata.path_in_schema)
bit_width = encoding.width_from_max_int(max_definition_level)
if bit_width:
# NB: num_values is index 1 for either type of page header
definition_levels = read_data(
io_obj, parquet_thrift.Encoding.RLE,
daph.num_values, bit_width, out=out)
if False and (
daph.statistics is not None
and getattr(daph.statistics, "null_count", None) is not None
):
num_nulls = daph.statistics.null_count
elif False and (
daph.num_values == metadata.num_values
and metadata.statistics
and getattr(metadata.statistics, "null_count", None) is not None
):
num_nulls = metadata.statistics.null_count
else:
num_nulls = daph.num_values - (definition_levels ==
max_definition_level).sum()
if num_nulls == 0:
definition_levels = None
return definition_levels, num_nulls
def read_rep(io_obj, daph, helper, metadata, out=None):
"""
Read the repetition levels from this page, if any.
"""
repetition_levels = None
if len(metadata.path_in_schema) > 1:
max_repetition_level = helper.max_repetition_level(
metadata.path_in_schema)
if max_repetition_level == 0:
repetition_levels = None
else:
bit_width = encoding.width_from_max_int(max_repetition_level)
# NB: num_values is index 1 for either type of page header
repetition_levels = read_data(io_obj, parquet_thrift.Encoding.RLE,
daph.num_values,
bit_width,
out=out)
return repetition_levels
def read_data_page(f, helper, header, metadata, skip_nulls=False,
selfmade=False):
"""Read a data page: definitions, repetitions, values (in order)
Only values are guaranteed to exist, e.g., for a top-level, required
field.
"""
daph = header.data_page_header
raw_bytes = _read_page(f, header, metadata)
io_obj = encoding.NumpyIO(raw_bytes)
repetition_levels = read_rep(io_obj, daph, helper, metadata)
if skip_nulls and not helper.is_required(metadata.path_in_schema):
num_nulls = 0
definition_levels = None
skip_definition_bytes(io_obj, daph.num_values)
else:
definition_levels, num_nulls = read_def(io_obj, daph, helper, metadata)
nval = daph.num_values - num_nulls
se = helper.schema_element(metadata.path_in_schema)
if daph.encoding == parquet_thrift.Encoding.PLAIN:
width = se.type_length
values = read_plain(io_obj.read(),
metadata.type,
int(daph.num_values - num_nulls),
width=width,
utf=se.converted_type == 0)
elif daph.encoding in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE_DICTIONARY,
parquet_thrift.Encoding.RLE]:
# bit_width is stored as single byte.
if metadata.type == parquet_thrift.Type.BOOLEAN:
bit_width = 1
elif daph.encoding == parquet_thrift.Encoding.RLE:
bit_width = se.type_length
else:
bit_width = io_obj.read_byte()
if bit_width in [8, 16, 32] and selfmade:
num = (encoding.read_unsigned_var_int(io_obj) >> 1) * 8
values = np.frombuffer(io_obj.read(num * bit_width // 8),
dtype='int%i' % bit_width)
elif bit_width:
if bit_width > 8:
values = np.empty(daph.num_values-num_nulls, dtype=np.int32)
o = encoding.NumpyIO(values.view('uint8'))
encoding.read_rle_bit_packed_hybrid(
io_obj, bit_width, io_obj.len-io_obj.tell(), o=o, itemsize=4)
else:
values = np.empty(daph.num_values-num_nulls, dtype=np.uint8)
o = encoding.NumpyIO(values)
encoding.read_rle_bit_packed_hybrid(
io_obj, bit_width, io_obj.len-io_obj.tell(), o=o, itemsize=1)
if isinstance(values, np.ndarray):
values = values[:nval]
else:
values = values.data[:nval]
else:
values = np.zeros(nval, dtype=np.int8)
elif daph.encoding == parquet_thrift.Encoding.DELTA_BINARY_PACKED:
values = np.empty(daph.num_values - num_nulls,
dtype=np.int64 if metadata.type == 2 else np.int32)
o = encoding.NumpyIO(values.view('uint8'))
encoding.delta_binary_unpack(io_obj, o, longval=metadata.type == 2)
else:
raise NotImplementedError('Encoding %s' % daph.encoding)
return definition_levels, repetition_levels, values[:nval]
def skip_definition_bytes(io_obj, num):
io_obj.seek(6, 1)
n = num // 64
while n:
io_obj.seek(1, 1)
n //= 128
def read_dictionary_page(file_obj, schema_helper, page_header, column_metadata, utf=False):
"""Read a page containing dictionary data.
Consumes data using the plain encoding and returns an array of values.
"""
raw_bytes = _read_page(file_obj, page_header, column_metadata)
if column_metadata.type == parquet_thrift.Type.BYTE_ARRAY:
values = unpack_byte_array(
raw_bytes, page_header.dictionary_page_header.num_values, utf=utf)
else:
width = schema_helper.schema_element(
column_metadata.path_in_schema).type_length
values = read_plain(
raw_bytes, column_metadata.type,
page_header.dictionary_page_header.num_values, width)
return values
def read_data_page_v2(infile, schema_helper, se, data_header2, cmd,
dic, assign, num, use_cat, file_offset, ph, idx=None,
selfmade=False, row_filter=None):
"""
:param infile: open file
:param schema_helper:
:param se: schema element
:param data_header2: page header struct
:param cmd: column metadata
:param dic: any dictionary labels encountered
:param assign: output array (all of it)
:param num: offset, rows so far
:param use_cat: output is categorical?
:return: None
test data "/Users/mdurant/Downloads/datapage_v2.snappy.parquet"
a b c d e
0 abc 1 2.0 True [1, 2, 3]
1 abc 2 3.0 True None
2 abc 3 4.0 True None
3 None 4 5.0 False [1, 2, 3]
4 abc 5 2.0 True [1, 2]
b is delta encoded; c is dict encoded
"""
if data_header2.encoding not in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE_DICTIONARY,
parquet_thrift.Encoding.RLE,
parquet_thrift.Encoding.PLAIN,
parquet_thrift.Encoding.DELTA_BINARY_PACKED
]:
raise NotImplementedError
size = (ph.compressed_page_size - data_header2.repetition_levels_byte_length -
data_header2.definition_levels_byte_length)
data = infile.tell() + data_header2.definition_levels_byte_length + data_header2.repetition_levels_byte_length
n_values = data_header2.num_values - data_header2.num_nulls
max_rep = schema_helper.max_repetition_level(cmd.path_in_schema)
if max_rep:
# TODO: probably not functional
bit_width = encoding.width_from_max_int(max_rep)
io_obj = encoding.NumpyIO(infile.read(data_header2.repetition_levels_byte_length))
repi = np.empty(data_header2.num_values, dtype="uint8")
encoding.read_rle_bit_packed_hybrid(io_obj, bit_width, data_header2.num_values,
encoding.NumpyIO(repi), itemsize=1)
max_def = schema_helper.max_definition_level(cmd.path_in_schema)
nullable = isinstance(assign.dtype, pd.core.arrays.masked.BaseMaskedDtype)
if max_def and data_header2.num_nulls:
bit_width = encoding.width_from_max_int(max_def)
# not the same as read_data(), because we know the length
io_obj = encoding.NumpyIO(infile.read(data_header2.definition_levels_byte_length))
if nullable:
defi = assign._mask
else:
# TODO: in tabular data, nulls arrays could be reused for each column
defi = np.empty(data_header2.num_values, dtype=np.uint8)
encoding.read_rle_bit_packed_hybrid(io_obj, bit_width, data_header2.num_values,
encoding.NumpyIO(defi), itemsize=1)
if max_rep:
# assemble_objects needs both arrays
nulls = defi != max_def
else:
np.not_equal(defi.view("uint8"), max_def, out=defi)
nulls = defi.view(np.bool_)
infile.seek(data)
# input and output element sizes match
see = se.type_length == assign.dtype.itemsize * 8 or simple.get(se.type).itemsize == assign.dtype.itemsize
# can read-into
into0 = ((use_cat or converts_inplace(se) and see)
and data_header2.num_nulls == 0
and max_rep == 0 and assign.dtype.kind != "O" and row_filter is None
and assign.dtype.kind not in "Mm") # TODO: this can be done in place but is complex
if row_filter is None:
row_filter = Ellipsis
# can decompress-into
if data_header2.is_compressed is None:
data_header2.is_compressed = True
into = (data_header2.is_compressed and rev_map[cmd.codec] in decom_into
and into0)
if nullable:
assign = assign._data
uncompressed_page_size = (ph.uncompressed_page_size - data_header2.definition_levels_byte_length -
data_header2.repetition_levels_byte_length)
if into0 and data_header2.encoding == parquet_thrift.Encoding.PLAIN and (
not data_header2.is_compressed or cmd.codec == parquet_thrift.CompressionCodec.UNCOMPRESSED
):
# PLAIN read directly into output (a copy for remote files)
assign[num:num+n_values].view('uint8')[:] = infile.read(size)
convert(assign[num:num+n_values], se)
elif into and data_header2.encoding == parquet_thrift.Encoding.PLAIN:
# PLAIN decompress directly into output
decomp = decom_into[rev_map[cmd.codec]]
decomp(np.frombuffer(infile.read(size), dtype="uint8"),
assign[num:num+data_header2.num_values].view('uint8'))
convert(assign[num:num+n_values], se)
elif data_header2.encoding == parquet_thrift.Encoding.PLAIN:
# PLAIN, but with nulls or not in-place conversion
codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
raw_bytes = decompress_data(np.frombuffer(infile.read(size), "uint8"),
uncompressed_page_size, codec)
values = read_plain(raw_bytes,
cmd.type,
n_values,
width=se.type_length,
utf=se.converted_type == 0)
if data_header2.num_nulls:
if nullable:
assign[num:num+data_header2.num_values][~nulls[row_filter]] = convert(values, se)[row_filter]
else:
assign[num:num+data_header2.num_values][nulls[row_filter]] = None # or nan or nat
if row_filter is Ellipsis:
assign[num:num+data_header2.num_values][~nulls] = convert(values, se)
else:
assign[num:num+data_header2.num_values][~nulls[row_filter]] = convert(values, se)[row_filter[~nulls]]
else:
assign[num:num+data_header2.num_values] = convert(values, se)[row_filter]
elif (use_cat and data_header2.encoding in [
parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE_DICTIONARY,
]) or (data_header2.encoding == parquet_thrift.Encoding.RLE):
# DICTIONARY or BOOL direct decode RLE into output (no nulls)
codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
raw_bytes = np.frombuffer(infile.read(size), dtype='uint8')
raw_bytes = decompress_data(raw_bytes, uncompressed_page_size, codec)
pagefile = encoding.NumpyIO(raw_bytes)
if data_header2.encoding != parquet_thrift.Encoding.RLE:
# TODO: check this bit; is the varint read only row byte-exact fastpath?
bit_width = pagefile.read_byte()
encoding.read_unsigned_var_int(pagefile)
else:
bit_width = 1
pagefile.seek(4, 1)
if bit_width in [8, 16, 32] and selfmade:
# special fastpath for cats
outbytes = raw_bytes[pagefile.tell():]
if len(outbytes) == assign[num:num+data_header2.num_values].nbytes:
assign[num:num+data_header2.num_values].view('uint8')[row_filter] = outbytes[row_filter]
else:
if data_header2.num_nulls == 0:
assign[num:num+data_header2.num_values][row_filter] = outbytes[row_filter]
else:
if row_filter is Ellipsis:
assign[num:num+data_header2.num_values][~nulls] = outbytes
else:
assign[num:num + data_header2.num_values][~nulls[row_filter]] = outbytes[~nulls * row_filter]
assign[num:num+data_header2.num_values][nulls[row_filter]] = -1
else:
if data_header2.num_nulls == 0:
encoding.read_rle_bit_packed_hybrid(
pagefile,
bit_width,
uncompressed_page_size,
encoding.NumpyIO(assign[num:num+data_header2.num_values].view('uint8')),
itemsize=bit_width
)
else:
temp = np.empty(data_header2.num_values, assign.dtype)
encoding.read_rle_bit_packed_hybrid(
pagefile,
bit_width,
uncompressed_page_size,
encoding.NumpyIO(temp.view('uint8')),
itemsize=bit_width
)
if not nullable:
assign[num:num+data_header2.num_values][nulls[row_filter]] = None
assign[num:num+data_header2.num_values][~nulls[row_filter]] = temp[row_filter]
elif data_header2.encoding in [
parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE_DICTIONARY
]:
# DICTIONARY to be de-referenced, with or without nulls
codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
compressed_bytes = np.frombuffer(infile.read(size), "uint8")
raw_bytes = decompress_data(compressed_bytes, uncompressed_page_size, codec)
out = np.empty(n_values, dtype='uint32')
pagefile = encoding.NumpyIO(raw_bytes)
bit_width = pagefile.read_byte()
encoding.read_rle_bit_packed_hybrid(
pagefile,
bit_width,
uncompressed_page_size,
encoding.NumpyIO(out.view("uint8")),
itemsize=4
)
if max_rep:
# num_rows got filled, but consumed num_values data entries
encoding._assemble_objects(
assign[idx[0]:idx[0]+data_header2.num_rows], defi, repi, out, dic, d=True,
null=True, null_val=False, max_defi=max_def, prev_i=0
)
idx[0] += data_header2.num_rows
elif data_header2.num_nulls:
if not nullable and assign.dtype != "O":
assign[num:num+data_header2.num_values][nulls] = None # may be unnecessary
assign[num:num+data_header2.num_values][~nulls[row_filter]] = dic[out][row_filter]
else:
assign[num:num+data_header2.num_values][row_filter] = dic[out][row_filter]
elif data_header2.encoding == parquet_thrift.Encoding.DELTA_BINARY_PACKED:
assert data_header2.num_nulls == 0, "null delta-int not implemented"
codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
raw_bytes = decompress_data(np.frombuffer(infile.read(size), "uint8"),
uncompressed_page_size, codec)
if converts_inplace(se):
encoding.delta_binary_unpack(
encoding.NumpyIO(raw_bytes),
encoding.NumpyIO(assign[num:num+data_header2.num_values].view('uint8'))
)
convert(assign[num:num+data_header2.num_values], se)
else:
out = np.empty(data_header2.num_values, dtype='int32')
encoding.delta_binary_unpack(
encoding.NumpyIO(raw_bytes), encoding.NumpyIO(out.view('uint8'))
)
assign[num:num+data_header2.num_values][row_filter] = convert(out, se)[row_filter]
else:
# codec = cmd.codec if data_header2.is_compressed else "UNCOMPRESSED"
# raw_bytes = decompress_data(infile.read(size),
# ph.uncompressed_page_size, codec)
raise NotImplementedError
return data_header2.num_values
def read_col(column, schema_helper, infile, use_cat=False,
selfmade=False, assign=None, catdef=None,
row_filter=None):
"""Using the given metadata, read one column in one row-group.
Parameters
----------
column: thrift structure
Details on the column
schema_helper: schema.SchemaHelper
Based on the schema for this parquet data
infile: open file or string
If a string, will open; if an open object, will use as-is
use_cat: bool (False)
If this column is encoded throughout with dict encoding, give back
a pandas categorical column; otherwise, decode to values
row_filter: bool array or None
if given, selects which of the values read are to be written
into the output. Effectively implies NULLs, even for a required
column.
"""
cmd = column.meta_data
try:
se = schema_helper.schema_element(cmd.path_in_schema)
except KeyError:
# column not present in this row group
assign[:] = None
return
off = min((cmd.dictionary_page_offset or cmd.data_page_offset,
cmd.data_page_offset))
infile.seek(off)
column_binary = infile.read(cmd.total_compressed_size)
infile = encoding.NumpyIO(column_binary)
rows = row_filter.sum() if isinstance(row_filter, np.ndarray) else cmd.num_values
if use_cat:
my_nan = -1
else:
if assign.dtype.kind in ['i', 'u', 'b']:
my_nan = pd.NA
elif assign.dtype.kind == 'f':
my_nan = np.nan
elif assign.dtype.kind in ["M", 'm']:
# GH#489 use a NaT representation compatible with ExtensionArray
my_nan = assign.dtype.type("NaT")
else:
my_nan = None
num = 0 # how far through the output we are
row_idx = [0] # map/list objects
dic = None
index_off = 0 # how far through row_filter we are
while num < rows:
off = infile.tell()
ph = ThriftObject.from_buffer(infile, "PageHeader")
if ph.type == parquet_thrift.PageType.DICTIONARY_PAGE:
dic2 = read_dictionary_page(infile, schema_helper, ph, cmd, utf=se.converted_type == 0)
dic2 = convert(dic2, se)
if use_cat and dic is not None and (dic2 != dic).any():
raise RuntimeError("Attempt to read as categorical a column"
"with multiple dictionary pages.")
dic = dic2
if use_cat and dic is not None:
# fastpath skips the check the number of categories hasn't changed.
# In this case, they may change, if the default RangeIndex was used.
ddt = [kv.value.decode() for kv in (cmd.key_value_metadata or [])
if kv.key == b"label_dtype"]
ddt = ddt[0] if ddt else None
catdef._set_categories(pd.Index(dic, dtype=ddt), fastpath=True)
if np.iinfo(assign.dtype).max < len(dic):
raise RuntimeError('Assigned array dtype (%s) cannot accommodate '
'number of category labels (%i)' %
(assign.dtype, len(dic)))
continue
elif use_cat and dic is None and getattr(catdef, "_multiindex", False) is False:
raise TypeError("Attempt to load as categorical a column with no dictionary")
if ph.type == parquet_thrift.PageType.DATA_PAGE_V2:
num += read_data_page_v2(infile, schema_helper, se, ph.data_page_header_v2, cmd,
dic, assign, num, use_cat, off, ph, row_idx, selfmade=selfmade,
row_filter=row_filter)
continue
if (selfmade and hasattr(cmd, 'statistics') and
getattr(cmd.statistics, 'null_count', 1) == 0):
skip_nulls = True
else:
skip_nulls = False
defi, rep, val = read_data_page(infile, schema_helper, ph, cmd,
skip_nulls, selfmade=selfmade)
max_defi = schema_helper.max_definition_level(cmd.path_in_schema)
if isinstance(row_filter, np.ndarray):
io = index_off + len(val) # will be new index_off
if row_filter[index_off:index_off+len(val)].sum() == 0:
num += len(defi) if defi is not None else len(val)
continue
if defi is not None:
val = val[row_filter[index_off:index_off+len(defi)][defi == max_defi]]
defi = defi[row_filter[index_off:index_off+len(defi)]]
else:
val = val[row_filter[index_off:index_off+len(val)]]
rep = rep[row_filter[index_off:index_off+len(defi)]] if rep is not None else rep
index_off = io
if rep is not None and assign.dtype.kind != 'O': # pragma: no cover
# this should never get called
raise ValueError('Column contains repeated value, must use object '
'type, but has assumed type: %s' % assign.dtype)
d = ph.data_page_header.encoding in [parquet_thrift.Encoding.PLAIN_DICTIONARY,
parquet_thrift.Encoding.RLE_DICTIONARY]
if use_cat and not d:
if not hasattr(catdef, '_set_categories'):
raise ValueError('Returning category type requires all chunks'
' to use dictionary encoding; column: %s',
cmd.path_in_schema)
if rep is not None:
null = not schema_helper.is_required(cmd.path_in_schema[0])
null_val = (se.repetition_type !=
parquet_thrift.FieldRepetitionType.REQUIRED)
row_idx[0] = 1 + encoding._assemble_objects(
assign, defi, rep, val, dic, d,
null, null_val, max_defi, row_idx[0]
)
elif defi is not None:
part = assign[num:num+len(defi)]
if isinstance(part.dtype, pd.core.arrays.masked.BaseMaskedDtype):
# TODO: could have read directly into array
part._mask[:] = defi != max_defi
part = part._data
elif part.dtype.kind != "O":
part[defi != max_defi] = my_nan
if d and not use_cat:
part[defi == max_defi] = dic[val]
elif not use_cat:
part[defi == max_defi] = convert(val, se, dtype=assign.dtype)
else:
part[defi == max_defi] = val
else:
piece = assign[num:num+len(val)]
if isinstance(piece.dtype, pd.core.arrays.masked.BaseMaskedDtype):
piece = piece._data
if use_cat and not d:
# only possible for multi-index
val = convert(val, se, dtype=assign.dtype)
try:
i = pd.Categorical(val)
except:
i = pd.Categorical(val.tolist())
catdef._set_categories(pd.Index(i.categories), fastpath=True)
piece[:] = i.codes
elif d and not use_cat:
piece[:] = dic[val]
elif not use_cat:
piece[:] = convert(val, se, dtype=assign.dtype)
else:
piece[:] = val
num += len(defi) if defi is not None else len(val)
def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats,
selfmade=False, assign=None, row_filter=False):
"""
Read a row group and return as a dict of arrays
Note that categorical columns (if appearing in the parameter categories)
will be pandas Categorical objects: the codes and the category labels
are arrays.
"""
out = assign
remains = set(_ for _ in out if not _.endswith("-catdef") and not _ + "-catdef" in out)
maps = {}
for column in rg.columns:
if (_is_list_like(schema_helper, column) or
_is_map_like(schema_helper, column)):
name = ".".join(column.meta_data.path_in_schema[:-2])
else:
name = ".".join(column.meta_data.path_in_schema)
if name not in columns or name in cats:
continue
remains.discard(name)
read_col(column, schema_helper, file, use_cat=name+'-catdef' in out,
selfmade=selfmade, assign=out[name],
catdef=out.get(name+'-catdef', None),
row_filter=row_filter)
if _is_map_like(schema_helper, column):
# TODO: could be done in fast loop in _assemble_objects?
if name not in maps:
maps[name] = out[name].copy()
else:
if column.meta_data.path_in_schema[0] == 'key':
key, value = out[name], maps[name]
else:
value, key = out[name], maps[name]
out[name][:] = [dict(zip(k, v)) if k is not None else None
for k, v in zip(key, value)]
del maps[name]
for k in remains:
out[k][:] = None
def read_row_group(file, rg, columns, categories, schema_helper, cats,
selfmade=False, index=None, assign=None,
scheme='hive', partition_meta=None, row_filter=False):
"""
Access row-group in a file and read some columns into a data-frame.
"""
partition_meta = partition_meta or {}
if assign is None:
raise RuntimeError('Going with pre-allocation!')
read_row_group_arrays(file, rg, columns, categories, schema_helper,
cats, selfmade, assign=assign, row_filter=row_filter)
for cat in cats:
if cat not in assign:
# do no need to have partition columns in output
continue
if scheme == 'hive':
partitions = [s.split("=") for s in rg.columns[0].file_path.split("/")]
else:
partitions = [('dir%i' % i, v) for (i, v) in enumerate(
rg.columns[0].file_path.split('/')[:-1])]
key, val = [p for p in partitions if p[0] == cat][0]
val = val_to_num(val, meta=partition_meta.get(key))
assign[cat][:] = cats[cat].index(val)
@@ -0,0 +1,272 @@
import re
from collections import OrderedDict
from packaging.version import Version
import numpy as np
from pandas import (
Categorical, DataFrame, Series,
CategoricalIndex, RangeIndex, Index, MultiIndex,
DatetimeIndex, CategoricalDtype,
DatetimeTZDtype
)
from pandas.core.arrays.masked import BaseMaskedDtype
import warnings
from fastparquet.util import PANDAS_VERSION
class Dummy(object):
pass
def empty(types, size, cats=None, cols=None, index_types=None, index_names=None,
timezones=None, columns_dtype=None):
"""
Create empty DataFrame to assign into
In the simplest case, will return a Pandas dataframe of the given size,
with columns of the given names and types. The second return value `views`
is a dictionary of numpy arrays into which you can assign values that
show up in the dataframe.
For categorical columns, you get two views to assign into: if the
column name is "col", you get both "col" (the category codes) and
"col-catdef" (the category labels).
For a single categorical index, you should use the `.set_categories`
method of the appropriate "-catdef" columns, passing an Index of values
``views['index-catdef'].set_categories(pd.Index(newvalues), fastpath=True)``
Multi-indexes work a lot like categoricals, even if the types of each
index are not themselves categories, and will also have "-catdef" entries
in the views. However, these will be Dummy instances, providing only a
``.set_categories`` method, to be used as above.
Parameters
----------
types: like np record structure, 'i4,u2,f4,f2,f4,M8,m8', or using tuples
applies to non-categorical columns. If there are only categorical
columns, an empty string of None will do.
size: int
Number of rows to allocate
cats: dict {col: labels}
Location and labels for categorical columns, e.g., {1: ['mary', 'mo]}
will create column index 1 (inserted amongst the numerical columns)
with two possible values. If labels is an integers, `{'col': 5}`,
will generate temporary labels using range. If None, or column name
is missing, will assume 16-bit integers (a reasonable default).
cols: list of labels
assigned column names, including categorical ones.
index_types: list of str
For one of more index columns, make them have this type. See general
description, above, for caveats about multi-indexing. If None, the
index will be the default RangeIndex.
index_names: list of str
Names of the index column(s), if using
timezones: dict {col: timezone_str}
for timestamp type columns, apply this timezone to the pandas series;
the numpy view will be UTC.
file_has_columns: bool, default False
for files that are filtered but had columns before
Returns
-------
- dataframe with correct shape and data-types
- list of numpy views, in order, of the columns of the dataframe. Assign
to this.
"""
views = {}
timezones = timezones or {}
if isinstance(types, str):
types = types.split(',')
cols = cols if cols is not None else range(len(types))
def cat(col):
if cats is None or col not in cats:
return RangeIndex(0, 2**14)
elif isinstance(cats[col], int):
return RangeIndex(0, cats[col])
else: # explicit labels list
return cats[col]
df = OrderedDict()
for t, col in zip(types, cols):
if str(t) == 'category':
df[str(col)] = Categorical.from_codes([], categories=cat(col))
elif isinstance(t, BaseMaskedDtype):
# pandas masked types
arr_type = t.construct_array_type()
df[str(col)] = arr_type(
values=np.empty(0, dtype=t.numpy_dtype),
mask=np.empty(0, dtype=np.bool_),
copy=False
)
else:
if hasattr(t, 'base'):
# funky pandas not-dtype
t = t.base
if ("M" in str(t) or "time" in str(t)) and "[" not in str(t):
t = str(t) + "[ns]"
d = np.empty(0, dtype=t)
if d.dtype.kind == "M" and str(col) in timezones:
try:
z = tz_to_dt_tz(timezones[str(col)])
d = Series(d).dt.tz_localize(z)
except:
warnings.warn("Inferring time-zone from %s in column %s "
"failed, using time-zone-agnostic"
"" % (timezones[str(col)], col))
df[str(col)] = d
columns = Index(df.keys(), dtype=columns_dtype) if columns_dtype is not None else None
df = DataFrame(df, columns=columns)
if not index_types:
index = RangeIndex(size)
elif len(index_types) == 1:
t, col = index_types[0], index_names[0]
if col is None:
raise ValueError('If using an index, must give an index name')
if str(t) == 'category':
# https://github.com/dask/fastparquet/issues/576#issuecomment-805579337
temp = Categorical.from_codes([], categories=cat(col))
vals = np.zeros(size, dtype=temp.codes.dtype)
c = Categorical.from_codes(vals, dtype=temp.dtype)
index = CategoricalIndex(c)
views[col] = vals
views[col+'-catdef'] = index._data
else:
if hasattr(t, 'base'):
# funky pandas not-dtype
t = t.base
# Initialize datetime index to zero: uninitialized data might fail
# validation due to being an out-of-bounds datetime. xref
# https://github.com/dask/fastparquet/issues/778
dtype = np.dtype(t)
if dtype.kind == "M":
d = np.zeros(size, dtype=dtype)
# 1) create the DatetimeIndex in UTC as no datetime conversion is needed and
# it works with d uninitialised data (no NonExistentTimeError or AmbiguousTimeError)
# 2) convert to timezone (if UTC=noop, if None=remove tz, if other=change tz)
if str(col) in timezones:
index = DatetimeIndex(d, tz="UTC").tz_convert(
tz_to_dt_tz(timezones[str(col)]))
else:
index = DatetimeIndex(d, tz=None)
d = index._data._ndarray
else:
d = np.empty(size, dtype=dtype)
index = Index(d)
views[col] = d
else:
index = MultiIndex([[]], [[]])
# index = MultiIndex.from_arrays(indexes)
index._levels = list()
index._labels = list()
index._codes = list()
index._names = list(index_names)
for i, col in enumerate(index_names):
index._levels.append(Index([None]))
def set_cats(values, i=i, col=col, **kwargs):
values.name = col
if index._levels[i][0] is None:
index._levels[i] = values
elif not index._levels[i].equals(values):
raise RuntimeError("Different dictionaries encountered"
" while building categorical")
x = Dummy()
x._set_categories = set_cats
x._multiindex = True
d = np.zeros(size, dtype=int)
if PANDAS_VERSION >= Version("0.24.0"):
index._codes = list(index._codes) + [d]
else:
index._labels.append(d)
views[col] = d
views[col+'-catdef'] = x
# Patch our blocks with desired-length arrays. Kids: don't try this at home.
mgr = df._mgr
for block in mgr.blocks:
bvalues = block.values
shape = list(bvalues.shape)
shape[-1] = size
if isinstance(bvalues, Categorical):
code = np.full(fill_value=-1, shape=shape, dtype=bvalues.codes.dtype)
values = Categorical.from_codes(codes=code, dtype=bvalues.dtype)
elif isinstance(bvalues.dtype, DatetimeTZDtype):
dt = "M8[ns]" if PANDAS_VERSION.major < 2 else f'M8[{bvalues.dtype.unit}]'
values = np.zeros(shape=shape, dtype=dt)
values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype)
else:
if not isinstance(bvalues, np.ndarray):
# e.g. DatetimeLikeBlock backed by DatetimeArray/TimedeltaArray
if bvalues.dtype.kind == "m":
dt = "m8[ns]" if PANDAS_VERSION.major < 2 else bvalues.dtype
values = np.zeros(shape=shape, dtype=dt)
values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype)
elif bvalues.dtype.kind == "M":
dt = "M8[ns]" if PANDAS_VERSION.major < 2 else bvalues.dtype
values = np.zeros(shape=shape, dtype=dt)
values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype)
elif str(bvalues.dtype)[0] in {"I", "U"} or str(bvalues.dtype) == "boolean":
arr_type = bvalues.dtype.construct_array_type()
values = arr_type(
values=np.empty(size, dtype=bvalues.dtype.numpy_dtype),
mask=np.zeros(size, dtype=np.bool_)
)
else:
raise NotImplementedError
else:
values = np.empty(shape=shape, dtype=bvalues.dtype)
block.values = values
mgr.axes[-1] = index
# create views
for block in df._mgr.blocks:
dtype = block.dtype
inds = block.mgr_locs.indexer
if isinstance(inds, slice):
inds = list(range(inds.start, inds.stop, inds.step))
for i, ind in enumerate(inds):
col = df.columns[ind]
if isinstance(dtype, CategoricalDtype):
views[col] = block.values._codes
views[col+'-catdef'] = block.values
elif getattr(block.dtype, 'tz', None):
arr = block.values._ndarray
if len(arr.shape) > 1:
# pandas >= 1.3 does this for some reason
arr = arr.squeeze(axis=0)
views[col] = arr
elif str(dtype)[0] in {"I", "U"} or str(dtype) == "boolean":
views[col] = block.values
else:
views[col] = block.values[i]
if index_names:
df.index.names = [
None if re.match(r'__index_level_\d+__', n) else n
for n in index_names
]
return df, views
def tz_to_dt_tz(z):
if ":" in z:
import datetime
hours, mins = z.split(":", 1)
sign = z.startswith("-")
z = int(hours) * 3600
z += (1, -1)[sign] * int(mins) * 60
z = datetime.timezone(datetime.timedelta(seconds=z))
return z
@@ -0,0 +1,41 @@
"""encoding.py - methods for reading parquet encoded data blocks."""
import numpy as np
from fastparquet.cencoding import read_bitpacked1, NumpyIO
from fastparquet.speedups import unpack_byte_array
from fastparquet import parquet_thrift
def read_plain_boolean(raw_bytes, count, out=None):
data = np.frombuffer(raw_bytes, dtype='uint8')
out = out or np.empty(count, dtype=bool)
read_bitpacked1(NumpyIO(data), count, NumpyIO(out.view('uint8')))
return out[:count]
DECODE_TYPEMAP = {
parquet_thrift.Type.INT32: np.int32,
parquet_thrift.Type.INT64: np.int64,
parquet_thrift.Type.INT96: np.dtype('S12'),
parquet_thrift.Type.FLOAT: np.float32,
parquet_thrift.Type.DOUBLE: np.float64,
}
def read_plain(raw_bytes, type_, count, width=0, utf=False, stat=False):
if type_ in DECODE_TYPEMAP:
dtype = DECODE_TYPEMAP[type_]
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
if type_ == parquet_thrift.Type.FIXED_LEN_BYTE_ARRAY:
if count == 1:
width = len(raw_bytes)
dtype = np.dtype('S%i' % width)
return np.frombuffer(memoryview(raw_bytes), dtype=dtype, count=count)
if type_ == parquet_thrift.Type.BOOLEAN:
return read_plain_boolean(raw_bytes, count)
if type_ == parquet_thrift.Type.BYTE_ARRAY:
if stat:
if utf:
return np.array([bytes(raw_bytes).decode()], dtype='O')
else:
return np.array([bytes(raw_bytes)], dtype='O')
return unpack_byte_array(raw_bytes, count, utf=utf)
@@ -0,0 +1,146 @@
import logging
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger("parquet")
class JsonCodecError(Exception):
"""Exception raised when trying to load an invalid json codec."""
class BaseImpl(ABC):
@abstractmethod
def dumps(self, data):
"""Serialize ``obj`` to a JSON formatted bytes instance containing UTF-8 data."""
@abstractmethod
def loads(self, s):
"""Deserialize ``s`` (str, bytes or bytearray containing JSON) to a Python object."""
class OrjsonImpl(BaseImpl):
def __init__(self):
import orjson
logger.debug("Using orjson encoder/decoder")
self.api = orjson
def dumps(self, data):
return self.api.dumps(data, option=self.api.OPT_SERIALIZE_NUMPY)
def loads(self, s):
return self.api.loads(s)
class UjsonImpl(BaseImpl):
def __init__(self):
import ujson
logger.debug("Using ujson encoder/decoder")
self.api = ujson
def dumps(self, data):
return self.api.dumps(
data,
ensure_ascii=False,
escape_forward_slashes=False,
).encode("utf-8")
def loads(self, s):
return self.api.loads(s)
class RapidjsonImpl(BaseImpl):
def __init__(self):
import rapidjson
logger.debug("Using rapidjson encoder/decoder")
self.api = rapidjson
def dumps(self, data):
return self.api.dumps(data, ensure_ascii=False).encode("utf-8")
def loads(self, s):
return self.api.loads(s)
class JsonImpl(BaseImpl):
def __init__(self):
import json
logger.debug("Using json encoder/decoder")
self.api = json
def dumps(self, data):
return self.api.dumps(data, separators=(",", ":")).encode("utf-8")
def loads(self, s):
return self.api.loads(s)
@dataclass
class CodecCache:
env: Optional[str] = None
instance: Optional[BaseImpl] = None
def clear(self):
self.env = None
self.instance = None
def update(self, env, instance):
self.env = env
self.instance = instance
def _get_specific_codec(codec):
try:
return _codec_classes[codec]()
except KeyError:
raise JsonCodecError(
f"Unsupported json codec {codec!r}. Please use one of {list(_codec_classes)}"
) from None
except ImportError:
raise JsonCodecError(
f"Unavailable json codec {codec!r}. Please install the required library."
) from None
def _get_cached_codec():
"""Return the requested or first available json encoder/decoder implementation."""
env = os.getenv("FASTPARQUET_JSON_CODEC", "")
# return the cached codec instance only if the env variable didn't change
if _codec_cache.env == env:
return _codec_cache.instance
if env:
_codec_cache.update(env=env, instance=_get_specific_codec(env))
return _codec_cache.instance
for codec in _codec_classes:
try:
_codec_cache.update(env=env, instance=_get_specific_codec(codec))
return _codec_cache.instance
except JsonCodecError:
pass
raise JsonCodecError("No available json codecs.")
def json_encoder():
"""Return the first available json encoder function."""
return _get_cached_codec().dumps
def json_decoder():
"""Return the first available json decoder function."""
return _get_cached_codec().loads
# module_name -> implementation_class
_codec_classes = {
"orjson": OrjsonImpl,
"ujson": UjsonImpl,
"rapidjson": RapidjsonImpl,
"json": JsonImpl, # it should be the last
}
_codec_cache = CodecCache()
@@ -0,0 +1,10 @@
from functools import partial
from .parquet.ttypes import *
def __getattr__(name):
# for compatability with coe that calls, e.g., parquet_thrift.RowGroup(...)
from fastparquet.cencoding import ThriftObject
if name[0].isupper():
return partial(ThriftObject.from_fields, thrift_name=name)
raise AttributeError(name)
@@ -0,0 +1 @@
__all__ = ['ttypes', 'constants']
@@ -0,0 +1,272 @@
class Type(object):
"""
Types supported by Parquet. These types are intended to be used in combination
with the encodings to control the on disk storage format.
For example INT16 is not included as a type since a good encoding of INT32
would handle this.
"""
BOOLEAN = 0
INT32 = 1
INT64 = 2
INT96 = 3
FLOAT = 4
DOUBLE = 5
BYTE_ARRAY = 6
FIXED_LEN_BYTE_ARRAY = 7
_VALUES_TO_NAMES = {
0: "BOOLEAN",
1: "INT32",
2: "INT64",
3: "INT96",
4: "FLOAT",
5: "DOUBLE",
6: "BYTE_ARRAY",
7: "FIXED_LEN_BYTE_ARRAY",
}
_NAMES_TO_VALUES = {
"BOOLEAN": 0,
"INT32": 1,
"INT64": 2,
"INT96": 3,
"FLOAT": 4,
"DOUBLE": 5,
"BYTE_ARRAY": 6,
"FIXED_LEN_BYTE_ARRAY": 7,
}
class ConvertedType(object):
"""
DEPRECATED: Common types used by frameworks(e.g. hive, pig) using parquet.
ConvertedType is superseded by LogicalType. This enum should not be extended.
See LogicalTypes.md for conversion between ConvertedType and LogicalType.
"""
UTF8 = 0
MAP = 1
MAP_KEY_VALUE = 2
LIST = 3
ENUM = 4
DECIMAL = 5
DATE = 6
TIME_MILLIS = 7
TIME_MICROS = 8
TIMESTAMP_MILLIS = 9
TIMESTAMP_MICROS = 10
UINT_8 = 11
UINT_16 = 12
UINT_32 = 13
UINT_64 = 14
INT_8 = 15
INT_16 = 16
INT_32 = 17
INT_64 = 18
JSON = 19
BSON = 20
INTERVAL = 21
_VALUES_TO_NAMES = {
0: "UTF8",
1: "MAP",
2: "MAP_KEY_VALUE",
3: "LIST",
4: "ENUM",
5: "DECIMAL",
6: "DATE",
7: "TIME_MILLIS",
8: "TIME_MICROS",
9: "TIMESTAMP_MILLIS",
10: "TIMESTAMP_MICROS",
11: "UINT_8",
12: "UINT_16",
13: "UINT_32",
14: "UINT_64",
15: "INT_8",
16: "INT_16",
17: "INT_32",
18: "INT_64",
19: "JSON",
20: "BSON",
21: "INTERVAL",
}
_NAMES_TO_VALUES = {
"UTF8": 0,
"MAP": 1,
"MAP_KEY_VALUE": 2,
"LIST": 3,
"ENUM": 4,
"DECIMAL": 5,
"DATE": 6,
"TIME_MILLIS": 7,
"TIME_MICROS": 8,
"TIMESTAMP_MILLIS": 9,
"TIMESTAMP_MICROS": 10,
"UINT_8": 11,
"UINT_16": 12,
"UINT_32": 13,
"UINT_64": 14,
"INT_8": 15,
"INT_16": 16,
"INT_32": 17,
"INT_64": 18,
"JSON": 19,
"BSON": 20,
"INTERVAL": 21,
}
class FieldRepetitionType(object):
"""
Representation of Schemas
"""
REQUIRED = 0
OPTIONAL = 1
REPEATED = 2
_VALUES_TO_NAMES = {
0: "REQUIRED",
1: "OPTIONAL",
2: "REPEATED",
}
_NAMES_TO_VALUES = {
"REQUIRED": 0,
"OPTIONAL": 1,
"REPEATED": 2,
}
class Encoding(object):
"""
Encodings supported by Parquet. Not all encodings are valid for all types. These
enums are also used to specify the encoding of definition and repetition levels.
See the accompanying doc for the details of the more complicated encodings.
"""
PLAIN = 0
PLAIN_DICTIONARY = 2
RLE = 3
BIT_PACKED = 4
DELTA_BINARY_PACKED = 5
DELTA_LENGTH_BYTE_ARRAY = 6
DELTA_BYTE_ARRAY = 7
RLE_DICTIONARY = 8
BYTE_STREAM_SPLIT = 9
_VALUES_TO_NAMES = {
0: "PLAIN",
2: "PLAIN_DICTIONARY",
3: "RLE",
4: "BIT_PACKED",
5: "DELTA_BINARY_PACKED",
6: "DELTA_LENGTH_BYTE_ARRAY",
7: "DELTA_BYTE_ARRAY",
8: "RLE_DICTIONARY",
9: "BYTE_STREAM_SPLIT",
}
_NAMES_TO_VALUES = {
"PLAIN": 0,
"PLAIN_DICTIONARY": 2,
"RLE": 3,
"BIT_PACKED": 4,
"DELTA_BINARY_PACKED": 5,
"DELTA_LENGTH_BYTE_ARRAY": 6,
"DELTA_BYTE_ARRAY": 7,
"RLE_DICTIONARY": 8,
"BYTE_STREAM_SPLIT": 9,
}
class CompressionCodec(object):
"""
Supported compression algorithms.
Codecs added in format version X.Y can be read by readers based on X.Y and later.
Codec support may vary between readers based on the format version and
libraries available at runtime.
See Compression.md for a detailed specification of these algorithms.
"""
UNCOMPRESSED = 0
SNAPPY = 1
GZIP = 2
LZO = 3
BROTLI = 4
LZ4 = 5
ZSTD = 6
LZ4_RAW = 7
_VALUES_TO_NAMES = {
0: "UNCOMPRESSED",
1: "SNAPPY",
2: "GZIP",
3: "LZO",
4: "BROTLI",
5: "LZ4",
6: "ZSTD",
7: "LZ4_RAW",
}
_NAMES_TO_VALUES = {
"UNCOMPRESSED": 0,
"SNAPPY": 1,
"GZIP": 2,
"LZO": 3,
"BROTLI": 4,
"LZ4": 5,
"ZSTD": 6,
"LZ4_RAW": 7,
}
class PageType(object):
DATA_PAGE = 0
INDEX_PAGE = 1
DICTIONARY_PAGE = 2
DATA_PAGE_V2 = 3
_VALUES_TO_NAMES = {
0: "DATA_PAGE",
1: "INDEX_PAGE",
2: "DICTIONARY_PAGE",
3: "DATA_PAGE_V2",
}
_NAMES_TO_VALUES = {
"DATA_PAGE": 0,
"INDEX_PAGE": 1,
"DICTIONARY_PAGE": 2,
"DATA_PAGE_V2": 3,
}
class BoundaryOrder(object):
"""
Enum to annotate whether lists of min/max elements inside ColumnIndex
are ordered and if so, in which direction.
"""
UNORDERED = 0
ASCENDING = 1
DESCENDING = 2
_VALUES_TO_NAMES = {
0: "UNORDERED",
1: "ASCENDING",
2: "DESCENDING",
}
_NAMES_TO_VALUES = {
"UNORDERED": 0,
"ASCENDING": 1,
"DESCENDING": 2,
}
@@ -0,0 +1,203 @@
"""Utils for working with the parquet thrift models."""
from collections import OrderedDict
from fastparquet import parquet_thrift
def schema_tree(schema, i=0):
root = schema[i]
root["children"] = OrderedDict()
while len(root["children"]) < root.num_children:
i += 1
s = schema[i]
root["children"][s.name] = s
if s.num_children not in [None, 0]:
i = schema_tree(schema, i)
if root.num_children:
return i
else:
return i + 1
def schema_to_text(root, indent=[]):
text = "".join(indent) + '- ' + root.name + ": "
parts = []
if root.type is not None:
parts.append(parquet_thrift.Type._VALUES_TO_NAMES[root.type])
if root.logicalType is not None:
for key in dir(root.logicalType):
if getattr(root.logicalType, key) is not None:
if key == "TIMESTAMP":
unit = [k for k in dir(root.logicalType.TIMESTAMP.unit) if getattr(
root.logicalType.TIMESTAMP.unit, k) is not None][0]
parts.append(f"TIMESTAMP[{unit}]")
else:
# extra parameters possible here
parts.append(key)
break
if root.converted_type is not None:
parts.append(parquet_thrift.ConvertedType._VALUES_TO_NAMES[
root.converted_type])
if root.repetition_type is not None:
parts.append(parquet_thrift.FieldRepetitionType._VALUES_TO_NAMES[
root.repetition_type])
text += ', '.join(parts)
indent.append('|')
if hasattr(root, 'children'):
indent[-1] = '| '
for i, child in enumerate(root["children"].values()):
if i == len(root["children"]) - 1:
indent[-1] = ' '
text += '\n' + schema_to_text(child, indent)
indent.pop()
return text
def flatten(schema, root, name_parts=[]):
if not hasattr(schema, 'children'):
return
if schema is not root:
name_parts = name_parts + [schema.name]
# root["children"].pop('.'.join(name_parts), None)
for name, item in schema["children"].copy().items():
if schema.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED:
continue
if len(getattr(item, 'children', [])) == 0:
root["children"]['.'.join(name_parts + [name])] = item
elif item.converted_type in [parquet_thrift.ConvertedType.LIST,
parquet_thrift.ConvertedType.MAP]:
root["children"]['.'.join(name_parts + [name])] = item
else:
flatten(item, root, name_parts)
item["isflat"] = True
class SchemaHelper(object):
"""Utility providing convenience methods for schema_elements."""
def __init__(self, schema_elements):
"""Initialize with the specified schema_elements."""
self.schema_elements = schema_elements
for se in schema_elements:
try:
se.name = se.name.decode()
except AttributeError:
pass # already a str
self.root = schema_elements[0]
self.schema_elements_by_name = dict(
[(se.name, se) for se in schema_elements])
schema_tree(schema_elements)
self._text = None
flatten(self.root, self.root)
@property
def text(self):
if self._text is None:
self._text = schema_to_text(self.schema_elements[0])
return self._text
def __eq__(self, other):
return self.schema_elements == other.schema_elements
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return self.text
def __repr__(self):
return "<Parquet Schema with {} entries>".format(
len(self.schema_elements))
def schema_element(self, name):
"""Get the schema element with the given name or path"""
root = self.root
if isinstance(name, str):
name = name.split('.')
for part in name:
root = root["children"][part]
return root
def is_required(self, name):
"""Return true if the schema element with the given name is required."""
required = True
if isinstance(name, str):
name = name.split('.')
parts = []
for part in name:
parts.append(part)
s = self.schema_element(parts)
if s.repetition_type != parquet_thrift.FieldRepetitionType.REQUIRED:
required = False
break
return required
def max_repetition_level(self, parts):
"""Get the max repetition level for the given schema path."""
max_level = 0
if isinstance(parts, str):
parts = parts.split('.')
for i in range(len(parts)):
element = self.schema_element(parts[:i+1])
if element.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED:
max_level += 1
return max_level
def max_definition_level(self, parts):
"""Get the max definition level for the given schema path."""
max_level = 0
if isinstance(parts, str):
parts = parts.split('.')
for i in range(len(parts)):
element = self.schema_element(parts[:i+1])
if element.repetition_type != parquet_thrift.FieldRepetitionType.REQUIRED:
max_level += 1
return max_level
def _is_list_like(helper, column):
if len(column.meta_data.path_in_schema) < 3:
return False
se = helper.schema_element(
column.meta_data.path_in_schema[:-2])
ct = se.converted_type
if ct != parquet_thrift.ConvertedType.LIST:
return False
if len(se["children"]) > 1:
return False
se2 = list(se["children"].values())[0]
if len(se2["children"]) > 1:
return False
if se2.repetition_type != parquet_thrift.FieldRepetitionType.REPEATED:
return False
se3 = list(se2["children"].values())[0]
if se3.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED:
return False
return True
def _is_map_like(helper, column):
if len(column.meta_data.path_in_schema) < 3:
return False
se = helper.schema_element(
column.meta_data.path_in_schema[:-2])
ct = se.converted_type
if ct != parquet_thrift.ConvertedType.MAP:
return False
if len(se["children"]) > 1:
return False
se2 = list(se["children"].values())[0]
if len(se2["children"]) != 2:
return False
if se2.repetition_type != parquet_thrift.FieldRepetitionType.REPEATED:
return False
if set(se2["children"]) != {'key', 'value'}:
return False
se3 = se2["children"]['key']
if se3.repetition_type != parquet_thrift.FieldRepetitionType.REQUIRED:
return False
se3 = se2["children"]['value']
if se3.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED:
return False
return True
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,117 @@
"""
Native accelerators for Parquet encoding and decoding.
"""
# cython: profile=False
# cython: linetrace=False
# cython: binding=False
# cython: language_level=3
# cython: initializedcheck=False
# cython: boundscheck=False
# cython: wraparound=False
# cython: overflowcheck=False
# cython: initializedcheck=False
# cython: cdivision=True
# cython: always_allow_keywords=False
from libc.string cimport memcpy
from cpython cimport (PyUnicode_AsUTF8String, PyUnicode_DecodeUTF8,
PyBytes_CheckExact, PyBytes_FromStringAndSize,
PyBytes_GET_SIZE, PyBytes_AS_STRING)
from cpython.unicode cimport PyUnicode_DecodeUTF8
import numpy as np
cimport numpy as np
import cython
_obj_dtype = np.dtype('object')
def array_encode_utf8(inp):
"""
utf-8 encode all elements of a 1d ndarray of "object" dtype.
A new ndarray of bytes objects is returned.
"""
# TODO: combine with pack_byte_array as is done for unpack
cdef:
Py_ssize_t i, n
np.ndarray[object, ndim=1] arr
np.ndarray[object] result
arr = np.array(inp, copy=False)
n = arr.shape[0]
# TODO: why not inplace?
result = np.empty(n, dtype=object)
for i in range(n):
# Fast utf-8 encoding, avoiding method call and codec lookup indirection
result[i] = PyUnicode_AsUTF8String(arr[i])
return result
def pack_byte_array(list items):
"""
Pack a variable length byte array column.
A bytes object is returned.
"""
cdef:
Py_ssize_t i, n, itemlen, total_size
unsigned char *start
unsigned char *data
object val, out
# Strategy: compute the total output size and allocate it in one go.
n = len(items)
total_size = 0
for i in range(n):
val = items[i]
if not PyBytes_CheckExact(val):
raise TypeError("expected list of bytes")
total_size += 4 + PyBytes_GET_SIZE(val)
out = PyBytes_FromStringAndSize(NULL, total_size)
start = data = <unsigned char *> PyBytes_AS_STRING(out)
# Copy data to output.
for i in range(n):
val = items[i]
# `itemlen` should be >= 0, so no signed extension issues
itemlen = PyBytes_GET_SIZE(val)
(<int*> data)[0] = itemlen
data += 4
memcpy(data, PyBytes_AS_STRING(val), itemlen)
data += itemlen
assert (data - start) == total_size
return out
@cython.boundscheck(False)
def unpack_byte_array(const unsigned char[::1] raw_bytes, Py_ssize_t n, const char utf=False):
"""
Unpack a variable length byte array column.
An array of bytes objects is returned.
"""
cdef:
Py_ssize_t i = 0
char* ptr = <char*>&raw_bytes[0]
int itemlen, bytecount
np.ndarray[object, ndim=1, mode="c"] out = np.empty(n, dtype="object")
assert out is not None
bytecount = raw_bytes.shape[0]
while i < n and bytecount > 0:
itemlen = (<int*> ptr)[0]
ptr += 4
if utf:
out[i] = PyUnicode_DecodeUTF8(ptr, itemlen, "ignore")
else:
out[i] = PyBytes_FromStringAndSize(ptr, itemlen)
ptr += itemlen
bytecount -= 4 + itemlen
i += 1
return out
@@ -0,0 +1,5 @@
from fastparquet import parquet_thrift
from fastparquet.cencoding import ThriftObject
__all__ = ["ThriftObject", "parquet_thrift"]
@@ -0,0 +1,557 @@
from collections import defaultdict
import copy
from packaging.version import Version
from functools import lru_cache
import io
import struct
import os
import operator
import re
import numbers
import zoneinfo
import numpy as np
import pandas as pd
import fsspec
from fastparquet import parquet_thrift
from fastparquet.cencoding import ThriftObject
from fastparquet import __version__
PANDAS_VERSION = Version(pd.__version__)
created_by = f"fastparquet-python version {__version__} (build 0)"
class ParquetException(Exception):
"""Generic Exception related to unexpected data format when
reading parquet file."""
pass
def default_mkdirs(f):
os.makedirs(f, exist_ok=True)
PATH_DATE_FMT = '%Y%m%d_%H%M%S.%f'
def path_string(o):
if isinstance(o, pd.Timestamp):
return o.isoformat()
return str(o)
default_open = open
def default_remove(paths):
for path in paths:
try:
os.unlink(path)
except IOError:
pass
def val_from_meta(x, meta):
try:
if meta['pandas_type'] == 'categorical':
return x
t = np.dtype(meta['numpy_type'])
if t == "bool":
return x in [True, "true", "True", 't', "T", 1, "1"]
return np.dtype(t).type(x)
except ValueError:
if meta['numpy_type'] == 'datetime64[ns]':
return pd.to_datetime(x, format=PATH_DATE_FMT)
else:
raise
def val_to_num(x, meta=None):
"""Parse a string as a number, date or timedelta if possible"""
if meta:
return val_from_meta(x, meta)
return _val_to_num(x)
@lru_cache(1000)
def _val_to_num(x):
if isinstance(x, numbers.Real):
return x
if x in ['now', 'NOW', 'TODAY', '']:
return x
if type(x) == str and x.lower() == 'nan':
return x
if x == "True":
return True
if x == "False":
return False
try:
return int(x, base=10)
except:
pass
try:
return float(x)
except:
pass
try:
return pd.Timestamp(x)
except:
pass
try:
# TODO: determine the valid usecases for this, then try to limit the set
# ofstrings which may get inadvertently converted to timedeltas
return pd.Timedelta(x)
except:
return x
def ensure_bytes(s):
return s.encode('utf-8') if isinstance(s, str) else s
def ensure_str(b, *, ignore_error=False):
if isinstance(b, str):
return b
else:
try:
return b.decode('utf-8')
except (UnicodeDecodeError, AttributeError):
if not ignore_error:
raise
return b
def check_column_names(columns, *args):
"""Ensure that parameters listing column names have corresponding columns"""
for arg in args:
if isinstance(arg, (tuple, list)):
missing = set(arg) - set(columns)
if missing:
raise ValueError("Following columns were requested but are "
"not available: %s.\n"
"All requested columns: %s\n"
"Available columns: %s"
"" % (missing, arg, columns))
def reset_row_idx(data: pd.DataFrame) -> pd.DataFrame:
"""Reset row (multi-)index as column(s) of the DataFrame.
Multi-index are stored in columns, one per index level.
Parameters
----------
data : dataframe
Returns
-------
dataframe
"""
if isinstance(data.index, pd.MultiIndex):
for name, cats, codes in zip(data.index.names, data.index.levels,
data.index.codes):
data = data.assign(**{name: pd.Categorical.from_codes(codes,
cats)})
data.reset_index(drop=True)
else:
data = data.reset_index()
return data
def metadata_from_many(file_list, verify_schema=False, open_with=default_open,
root=False, fs=None):
"""
Given list of parquet files, make a FileMetaData that points to them
Parameters
----------
file_list: list of paths of parquet files
verify_schema: bool (False)
Whether to assert that the schemas in each file are identical
open_with: function
Use this to open each path.
root: str
Top of the dataset's directory tree, for cases where it can't be
automatically inferred.
fs: fsspsec.AbstractFileSystem
Used in preference to open_with, if given
Returns
-------
basepath: the root path that other paths are relative to
fmd: metadata thrift structure
"""
from fastparquet import api
legacy = True
if all(isinstance(pf, api.ParquetFile) for pf in file_list):
pfs = file_list
file_list = [pf.fn for pf in pfs]
elif all(not isinstance(pf, api.ParquetFile) for pf in file_list):
if verify_schema or fs is None or len(file_list) < 3:
pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
else:
# activate new code path here
f0 = file_list[0]
pf0 = api.ParquetFile(f0, open_with=open_with)
if pf0.file_scheme not in ['empty', 'simple']:
# set of directories, revert
pfs = [pf0] + [api.ParquetFile(fn, open_with=open_with) for fn in file_list[1:]]
else:
# permits concurrent fetch of footers; needs fsspec >= 2021.6
size = int(1.4 * pf0._head_size)
pieces = fs.cat(file_list[1:], start=-size)
sizes = {path: int.from_bytes(piece[-8:-4], "little") + 8 for
path, piece in pieces.items()}
not_bigenough = [path for path, s in sizes.items() if s > size]
if not_bigenough:
new_pieces = fs.cat(not_bigenough, start=-max(sizes.values()))
pieces.update(new_pieces)
pieces = {k: _get_fmd(v) for k, v in pieces.items()}
pieces = [(fn, pieces[fn]) for fn in file_list[1:]] # recover ordering
legacy = False
else:
raise ValueError("Merge requires all ParquetFile instances or none")
basepath, file_list = analyse_paths(file_list, root=root)
if legacy:
# legacy code path
if verify_schema:
for pf in pfs[1:]:
if pf._schema != pfs[0]._schema:
raise ValueError('Incompatible schemas')
fmd = copy.copy(pfs[0].fmd) # we inherit "created by" field
rgs = []
for pf, fn in zip(pfs, file_list):
if pf.file_scheme not in ['simple', 'empty']:
for rg in pf.row_groups:
rg = copy.copy(rg)
rg.columns = [copy.copy(c) for c in rg.columns]
for chunk in rg.columns:
chunk.file_path = '/'.join(
[fn, chunk.file_path if isinstance(chunk.file_path, str) else chunk.file_path.decode()]
)
rgs.append(rg)
else:
for rg in pf.row_groups:
rg = copy.copy(rg)
rg.columns = [copy.copy(c) for c in rg.columns]
for chunk in rg.columns:
chunk.file_path = fn
rgs.append(rg)
fmd.row_groups = rgs
fmd.num_rows = sum(rg.num_rows for rg in fmd.row_groups)
return basepath, fmd
for rg in pf0.fmd.row_groups:
# chunks of first file, which would have file_path=None
rg.columns[0].file_path = f0[len(basepath):].lstrip("/")
rgs0 = pf0.fmd.row_groups
for k, v in pieces:
# Set file paths on other files
if len(v.schema) > len(pf0.fmd.schema):
# or was UPDATED with supercast
pf0.fmd.schema = v.schema
rgs = v.row_groups or []
for rg in rgs:
rg.columns[0].file_path = k[len(basepath):].lstrip("/")
rgs0.extend(rgs)
pf0.fmd.row_groups = rgs0
pf0.fmd.num_rows = sum(rg.num_rows for rg in pf0.fmd.row_groups)
return basepath, pf0.fmd
def _get_fmd(inbytes):
from .cencoding import from_buffer
f = io.BytesIO(inbytes)
f.seek(-8, 2)
head_size = struct.unpack('<i', f.read(4))[0]
f.seek(-(head_size + 8), 2)
data = f.read(head_size)
return from_buffer(data, "FileMetaData")
def update_custom_metadata(obj, custom_metadata : dict):
"""Update custom metadata stored in thrift object or parquet file.
Update strategy depends if key found in new custom metadata is also found
in already existing custom metadata within thrift object, as well as its
value.
- If not found in existing, it is added.
- If found in existing, it is updated.
- If its value is `None`, it is not added, and if found in existing,
it is removed from existing.
Parameters
----------
obj : metadata ThriftObject or parquet file
Thrift object or parquet file which metadata is to update.
custom_metadata : dict
Key-value metadata to update in thrift object.
The values must be strings or binary. To pass a dictionary, serialize it as json string then encode it in binary.
Notes
-----
Key-value metadata are expected binary encoded. This function ensures it
is.
"""
kvm = (obj.key_value_metadata if isinstance(obj, ThriftObject)
else obj.fmd.key_value_metadata)
if kvm is None:
kvm = []
# Spare list of keys.
kvm_keys = [item.key for item in kvm]
for key, value in custom_metadata.items():
key_b = ensure_bytes(key)
if key_b in kvm_keys:
idx = kvm_keys.index(key_b)
if value is None:
# Remove item.
del kvm[idx]
# Update 'kvm_keys' as well, for keeping indexing
# up-to-date.
del kvm_keys[idx]
else:
# Replace item.
kvm[idx] = parquet_thrift.KeyValue(key=key_b,
value=ensure_bytes(value))
elif value is not None:
kvm.append(parquet_thrift.KeyValue(key=key_b,
value=ensure_bytes(value)))
if isinstance(obj, ThriftObject):
obj.key_value_metadata = kvm
else:
obj.fmd.key_value_metadata = kvm
# Reset '_kvm' to refresh 'key_value_metadata' cached property.
obj._kvm = None
# simple cache to avoid re compile every time
seps = {}
def ex_from_sep(sep):
"""Generate regex for category folder matching"""
if sep not in seps:
if sep in r'\^$.|?*+()[]':
s = re.compile(r"([a-zA-Z_0-9]+)=([^\\{}]+)".format(sep))
else:
s = re.compile("([a-zA-Z_0-9]+)=([^{}]+)".format(sep))
seps[sep] = s
return seps[sep]
def analyse_paths(file_list, root=False):
"""Consolidate list of file-paths into parquet relative paths"""
path_parts_list = [join_path(fn).split('/') for fn in file_list]
if root is False:
basepath = path_parts_list[0][:-1]
for i, path_parts in enumerate(path_parts_list):
j = len(path_parts) - 1
for k, (base_part, path_part) in enumerate(
zip(basepath, path_parts)):
if base_part != path_part:
j = k
break
basepath = basepath[:j]
l = len(basepath)
else:
basepath = join_path(root).split('/')
l = len(basepath)
assert all(p[:l] == basepath for p in path_parts_list
), "All paths must begin with the given root"
out_list = []
for path_parts in path_parts_list:
out_list.append('/'.join(path_parts[l:])) # use '/'.join() instead of join_path to be consistent with split('/')
return '/'.join(basepath), out_list # use '/'.join() instead of join_path to be consistent with split('/')
def infer_dtype(column):
try:
return pd.api.types.infer_dtype(column, skipna=False)
except AttributeError:
return pd.lib.infer_dtype(column)
def groupby_types(iterable):
groups = defaultdict(list)
for x in iterable:
groups[type(x)].append(x)
return groups
def get_column_metadata(column, name, object_dtype=None):
"""Produce pandas column metadata block"""
inferred_dtypes = {
"utf8": "unicode",
"bytes": "bytes",
"bool": "bool",
"int": "int",
"json": "object",
"bson": "object"
}
dtype = column.dtype
if object_dtype in inferred_dtypes and dtype == "object":
inferred_dtype = inferred_dtypes.get(object_dtype, "mixed")
else:
inferred_dtype = infer_dtype(column)
if str(dtype) == "bool":
# pandas accidentally calls this "boolean"
inferred_dtype = "bool"
if isinstance(dtype, pd.CategoricalDtype):
extra_metadata = {
'num_categories': len(column.cat.categories),
'ordered': column.cat.ordered,
}
dtype = column.cat.codes.dtype
elif isinstance(dtype, pd.DatetimeTZDtype):
if isinstance(dtype.tz, zoneinfo.ZoneInfo):
extra_metadata = {'timezone': dtype.tz.key}
else:
try:
stz = str(dtype.tz)
if "UTC" in stz and ":" in stz:
extra_metadata = {'timezone': stz.strip("UTC")}
elif len(str(stz)) == 3: # like "UTC", "CET", ...
extra_metadata = {'timezone': str(stz)}
elif getattr(dtype.tz, "zone", False):
extra_metadata = {'timezone': dtype.tz.zone}
elif "pytz" not in stz:
pd.Series([pd.to_datetime('now', utc=True)]).dt.tz_localize(stz)
extra_metadata = {'timezone': stz}
elif "Offset" in stz:
extra_metadata = {'timezone': f"{dtype.tz._minutes // 60:+03}:00"}
else:
raise KeyError
except Exception as e:
raise ValueError("Time-zone information could not be serialised: "
"%s, please use another" % str(dtype.tz)) from e
else:
extra_metadata = None
if isinstance(name, tuple):
name = str(name)
elif not isinstance(name, str):
raise TypeError(
'Column name must be a string. Got column {} of type {}'.format(
name, type(name).__name__
)
)
return {
'name': name,
'field_name': name,
'pandas_type': {
'string': 'unicode',
'datetime64': (
'datetimetz' if hasattr(dtype, 'tz')
else 'datetime'
),
'integer': str(dtype),
'floating': str(dtype),
}.get(inferred_dtype, inferred_dtype),
'numpy_type': get_numpy_type(dtype),
'metadata': extra_metadata,
}
def get_numpy_type(dtype):
if isinstance(dtype, pd.CategoricalDtype):
return 'category'
elif "Int" in str(dtype):
return str(dtype).lower()
elif str(dtype) == "boolean":
return "bool"
elif str(dtype) == "string":
return "object"
else:
return str(dtype)
def get_file_scheme(paths):
"""For the given row groups, figure out if the partitioning scheme
Parameters
----------
paths: list of str
normally from row_group.columns[0].file_path
Returns
-------
'empty': no rgs at all
'simple': all rgs in a single file
'flat': multiple files in one directory
'hive': directories are all `key=value`; all files are at the same
directory depth
'drill': assume directory names are labels, and field names are of the
form dir0, dir1; all files are at the same directory depth
'other': none of the above, assume no partitioning
"""
if not paths:
return 'empty'
if set(paths) == {None}:
return 'simple'
if None in paths:
return 'other'
parts = [p.split('/') for p in paths]
lens = [len(p) for p in parts]
if len(set(lens)) > 1:
return 'other'
if set(lens) == {1}:
return 'flat'
matches = all(all("=" in p[1:-1] for p in part[:-1]) for part in parts)
return "hive" if matches else "drill"
def join_path(*path):
return "/".join([str(p).replace("\\", "/").rstrip("/") for p in path if p])
def _strip_path_tail(paths) -> set:
return {path.rsplit("/", 1)[0] if "/" in path else "" for path in paths}
ops = {
"==": operator.eq,
"=": operator.eq,
"!=": operator.ne,
">": operator.gt,
">=": operator.ge,
"<": operator.lt,
"<=": operator.le
}
def norm_col_name(name, is_index:bool=None):
if isinstance(name, tuple):
if is_index:
return name[0]
else:
return str(name)
return name
def get_fs(fn, open_with, mkdirs):
fs = None
if "FastParquetImpl.write.<locals>.<lambda>" in str(open_with):
import inspect
so = inspect.getclosurevars(open_with).nonlocals["storage_options"] or {}
fs, fn = fsspec.core.url_to_fs(fn, **so)
open_with = fs.open
mkdirs = mkdirs or (lambda d: fs.mkdirs(d, exist_ok=True))
return fs, fn, open_with, mkdirs
File diff suppressed because it is too large Load Diff