codehaus


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to append to parquet file periodically and read intermediate data - pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.


This turns out to be a very common problem (landing incremental
updates, dealing with compaction and small files). It's part of the
reason that systems like Apache Kudu were developed, e.g.

https://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/

If you have to use file storage, then figuring out a scheme to compact
Parquet files (e.g. once per hour, once per day) will definitely be
worth it compared with using a slower file format (like Avro)

- Wes

On Wed, Dec 19, 2018 at 7:37 AM Joel Pfaff <joel.pfaff@xxxxxxxxx> wrote:
>
> Hello,
>
> For my company's usecases, we have found that the number of files was a
> critical part of the time spent doing the execution plan, so we found the
> idea of very regularly writing small parquet files to be rather inefficient.
>
> There are some formats that support an `append` semantic (I have tested
> successfully with avro, but there are a couple others that could be used
> similarly).
> So we had a few cases where we were aggregating data in a `current table`
> in set of avro files, and rewriting all of it in few parquet files at the
> end of the day.
> This allowed us to have files that have been prepared to optimize their
> querying performance (file size, row group size, sorting per column) by
> maximizing the ability to benefit from the statistics.
> And our queries were doing an UNION between "optimized for speed" history
> tables and "optimized for latency" current tables, when the query timeframe
> was crossing the boundaries of the current day.
>
> Regards, Joel
>
> On Wed, Dec 19, 2018 at 2:14 PM Francois Saint-Jacques <
> fsaintjacques@xxxxxxxxxxxxxxx> wrote:
>
> > Hello Darren,
> >
> > what Uwe suggests is usually the way to go, your active process writes to a
> > new file every time. Then you have a parallel process/thread that does
> > compaction of smaller files in the background such that you don't have too
> > many files.
> >
> > On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <uwelk@xxxxxxxxxx> wrote:
> >
> > > Hello Darren,
> > >
> > > you're out of luck here. Parquet files are immutable and meant for batch
> > > writes. Once they're written you cannot modify them anymore. To load
> > them,
> > > you need to know their metadata which is in the footer. The footer is
> > > always at the end of the file and written once you call close.
> > >
> > > Your use case is normally fulfilled by continously starting new files and
> > > reading them back in using the ParquetDataset class
> > >
> > > Cheers
> > > Uwe
> > >
> > > Am 18.12.2018 um 21:03 schrieb Darren Gallagher <dazzag@xxxxxxxxx>:
> > >
> > > >> [Cross posted from https://github.com/apache/arrow/issues/3203]
> > > >>
> > > >> I'm adding new data to a parquet file every 60 seconds using this
> > code:
> > > >>
> > > >> import os
> > > >> import json
> > > >> import time
> > > >> import requests
> > > >> import pandas as pd
> > > >> import numpy as np
> > > >> import pyarrow as pa
> > > >> import pyarrow.parquet as pq
> > > >>
> > > >> api_url = 'https://opensky-network.org/api/states/all'
> > > >>
> > > >> cols = ['icao24', 'callsign', 'origin', 'time_position',
> > > >>        'last_contact', 'longitude', 'latitude',
> > > >>        'baro_altitude', 'on_ground', 'velocity', 'true_track',
> > > >>        'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
> > > >>        'spi', 'position_source']
> > > >>
> > > >> def get_new_flight_info(writer):
> > > >>    print("Requesting new data")
> > > >>    req = requests.get(api_url)
> > > >>    content = req.json()
> > > >>
> > > >>    states = content['states']
> > > >>    df = pd.DataFrame(states, columns = cols)
> > > >>    df['timestamp'] = content['time']
> > > >>    print("Found {} new items".format(len(df)))
> > > >>
> > > >>    table = pa.Table.from_pandas(df)
> > > >>    if writer is None:
> > > >>        writer = pq.ParquetWriter('openskyflights.parquet',
> > table.schema)
> > > >>    writer.write_table(table=table)
> > > >>    return writer
> > > >>
> > > >> if __name__ == '__main__':
> > > >>    writer = None
> > > >>    while (not os.path.exists('opensky.STOP')):
> > > >>        writer = get_new_flight_info(writer)
> > > >>        time.sleep(60)
> > > >>
> > > >>    if writer:
> > > >>        writer.close()
> > > >>
> > > >> This is working fine and the file grows every 60 seconds.
> > > >> However unless I force the loop to exit I am unable to use the parquet
> > > >> file. In a separate terminal I try to access the parquet file using
> > this
> > > >> code:
> > > >>
> > > >> import pandas as pd
> > > >> import pyarrow.parquet as pq
> > > >>
> > > >> table = pq.read_table("openskyflights.parquet")
> > > >> df = table.to_pandas()
> > > >> print(len(df))
> > > >>
> > > >> which results in this error:
> > > >>
> > > >> Traceback (most recent call last):
> > > >>  File "checkdownloadsize.py", line 7, in <module>
> > > >>    table = pq.read_table("openskyflights.parquet")
> > > >>  File
> > >
> > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > line 1074, in read_table
> > > >>    use_pandas_metadata=use_pandas_metadata)
> > > >>  File
> > >
> > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py",
> > > line 182, in read_parquet
> > > >>    filesystem=self)
> > > >>  File
> > >
> > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > line 882, in __init__
> > > >>    self.validate_schemas()
> > > >>  File
> > >
> > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > line 895, in validate_schemas
> > > >>    self.schema = self.pieces[0].get_metadata(open_file).schema
> > > >>  File
> > >
> > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > line 453, in get_metadata
> > > >>    return self._open(open_file_func).metadata
> > > >>  File
> > >
> > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > line 459, in _open
> > > >>    reader = open_file_func(self.path)
> > > >>  File
> > >
> > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > line 984, in open_file
> > > >>    common_metadata=self.common_metadata)
> > > >>  File
> > >
> > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> > > line 102, in __init__
> > > >>    self.reader.open(source, metadata=metadata)
> > > >>  File "pyarrow/_parquet.pyx", line 639, in
> > > pyarrow._parquet.ParquetReader.open
> > > >>  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> > > >> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.
> > > >>
> > > >> Is there a way to achieve this?
> > > >> I'm assuming that if I call writer.close() in the while loop then it
> > > will
> > > >> prevent any further data being written to the file? Is there some kind
> > > of
> > > >> "flush" operation that can be used to ensure all data is written to
> > disk
> > > >> and available to other processes or threads that want to read the
> > data?
> > > >>
> > > >> Thanks
> > > >>
> > >
> > >
> >
> > --
> > Sent from my jetpack.
> >