|
"""Support for reading delimiter-separated value files. |
|
|
|
This module contains unicode aware replacements for :func:`csv.reader` |
|
and :func:`csv.writer`. It was stolen/extracted from the ``csvkit`` |
|
project to allow re-use when the whole ``csvkit`` package isn't |
|
required. |
|
|
|
The original implementations were largely copied from |
|
`examples in the csv module documentation <http://docs.python.org/library/csv.html\ |
|
#examples>`_. |
|
|
|
.. seealso:: http://en.wikipedia.org/wiki/Delimiter-separated_values |
|
""" |
|
import io |
|
import csv |
|
import codecs |
|
import shutil |
|
import typing |
|
import pathlib |
|
import tempfile |
|
import warnings |
|
import functools |
|
import collections |
|
|
|
from . import utils |
|
from .dsv_dialects import Dialect |
|
|
|
__all__ = [ |
|
'UnicodeWriter', |
|
'UnicodeReader', 'UnicodeReaderWithLineNumber', 'UnicodeDictReader', 'NamedTupleReader', |
|
'iterrows', |
|
'rewrite', 'add_rows', 'filter_rows_as_dict', |
|
] |
|
|
|
LINES_OR_PATH = typing.Union[str, pathlib.Path, typing.IO, typing.Iterable[str]] |
|
|
|
|
|
def normalize_encoding(encoding: str) -> str: |
|
return codecs.lookup(encoding).name |
|
|
|
|
|
class UnicodeWriter: |
|
""" |
|
Write Unicode data to a csv file. |
|
|
|
:param f: The target to which to write the data; a local path specified as `str` or \ |
|
`pathlib.Path` or `None`, in which case the data, formatted as DSV can be retrieved \ |
|
via :meth:`~UnicodeWriter.read` |
|
:param dialect: Either a dialect name as recognized by `csv.writer` or a \ |
|
:class:`~Dialect` instance for dialect customization beyond what can be done with \ |
|
`csv.writer`. |
|
:param kw: Keyword arguments passed through to `csv.writer`. |
|
|
|
.. code-block:: python |
|
|
|
>>> from csvw import UnicodeWriter |
|
>>> with UnicodeWriter('data.tsv', delimiter='\t') as writer: |
|
... writer.writerow(['ä', 'ö', 'ü']) |
|
""" |
|
|
|
def __init__( |
|
self, |
|
f: typing.Optional[typing.Union[str, pathlib.Path]] = None, |
|
dialect: typing.Optional[typing.Union[Dialect, str]] = None, |
|
**kw): |
|
self.f = f |
|
self.encoding = kw.pop('encoding', 'utf-8') |
|
if isinstance(dialect, Dialect): |
|
self.encoding = dialect.python_encoding |
|
self.kw = dialect.as_python_formatting_parameters() |
|
self.kw.update(kw) |
|
else: |
|
self.kw = kw |
|
if dialect: |
|
self.kw['dialect'] = dialect |
|
self.encoding = normalize_encoding(self.encoding) |
|
self.escapechar = self.kw.get('escapechar') |
|
if self.escapechar and self.kw.get('quoting') != csv.QUOTE_NONE: |
|
|
|
|
|
def _escapedoubled(row, |
|
_type=str, |
|
_old=self.escapechar, |
|
_new=2 * self.escapechar): |
|
return [s.replace(_old, _new) if isinstance(s, _type) else s for s in row] |
|
else: |
|
def _escapedoubled(row): |
|
return row |
|
self._escapedoubled = _escapedoubled |
|
self._close = False |
|
|
|
def __enter__(self): |
|
if isinstance(self.f, (str, pathlib.Path)): |
|
if isinstance(self.f, pathlib.Path): |
|
self.f = str(self.f) |
|
|
|
self.f = io.open(self.f, 'wt', encoding=self.encoding, newline='') |
|
self._close = True |
|
elif self.f is None: |
|
self.f = io.StringIO(newline='') |
|
|
|
self.writer = csv.writer(self.f, **self.kw) |
|
return self |
|
|
|
def read(self) -> typing.Optional[bytes]: |
|
""" |
|
If the writer has been initialized passing `None` as target, the CSV data as `bytes` can be |
|
retrieved calling this method. |
|
""" |
|
if hasattr(self.f, 'seek'): |
|
self.f.seek(0) |
|
if hasattr(self.f, 'read'): |
|
return self.f.read().encode('utf-8') |
|
|
|
def __exit__(self, type, value, traceback): |
|
if self._close: |
|
self.f.close() |
|
|
|
def writerow(self, row: typing.Union[tuple, list]): |
|
self.writer.writerow(self._escapedoubled(row)) |
|
|
|
def writerows(self, rows: typing.Iterable[typing.Union[tuple, list]]): |
|
for row in rows: |
|
self.writerow(row) |
|
|
|
|
|
class UnicodeReader: |
|
""" |
|
Read Unicode data from a csv file. |
|
|
|
:param f: The source from which to read the data; a local path specified as `str` or \ |
|
`pathlib.Path`, a file-like object or a `list` of lines. |
|
:param dialect: Either a dialect name as recognized by `csv.reader` or a \ |
|
:class:`~Dialect` instance for dialect customization beyond what can be done with \ |
|
`csv.writer`. |
|
:param kw: Keyword arguments passed through to `csv.reader`. |
|
|
|
.. code-block:: python |
|
|
|
>>> with UnicodeReader('tests/fixtures/frictionless-data.csv', delimiter='|') as reader: |
|
... for row in reader: |
|
... print(row) |
|
... break |
|
... |
|
['FK', 'Year', 'Location name', 'Value', 'binary', 'anyURI', 'email', 'boolean', 'array', |
|
'geojson'] |
|
""" |
|
def __init__( |
|
self, |
|
f: LINES_OR_PATH, |
|
dialect: typing.Optional[typing.Union[Dialect, str]] = None, |
|
**kw): |
|
self.f = f |
|
self.encoding = normalize_encoding(kw.pop('encoding', 'utf-8-sig')) |
|
self.newline = kw.pop('lineterminator', None) |
|
self.dialect = dialect if isinstance(dialect, Dialect) else None |
|
if self.dialect: |
|
self.encoding = self.dialect.python_encoding |
|
self.kw = dialect.as_python_formatting_parameters() |
|
self.kw.update(kw) |
|
else: |
|
self.kw = kw |
|
if dialect: |
|
self.kw['dialect'] = dialect |
|
self._close = False |
|
self.comments = [] |
|
|
|
|
|
|
|
|
|
|
|
if self.encoding == 'utf-8': |
|
self.encoding = 'utf-8-sig' |
|
|
|
|
|
|
|
|
|
self._reader_encoding = self.encoding |
|
|
|
def __enter__(self): |
|
if isinstance(self.f, (str, pathlib.Path)): |
|
if isinstance(self.f, pathlib.Path): |
|
self.f = str(self.f) |
|
|
|
self.f = io.open(self.f, mode='rt', encoding=self.encoding, newline=self.newline or '') |
|
self._close = True |
|
elif not hasattr(self.f, 'read'): |
|
lines = [] |
|
for line in self.f: |
|
lines.append(line.decode(self.encoding) if isinstance(line, bytes) else line) |
|
self.f = lines |
|
self.reader = csv.reader(self.f, **self.kw) |
|
self.lineno = -1 |
|
return self |
|
|
|
def _next_row(self): |
|
self.lineno += 1 |
|
row = [ |
|
s if isinstance(s, str) else s.decode(self._reader_encoding) |
|
for s in next(self.reader)] |
|
self.lineno += sum([list(s).count('\n') for s in row]) |
|
return row |
|
|
|
def __next__(self): |
|
row = self._next_row() |
|
if self.dialect: |
|
while (row and self.dialect.commentPrefix and |
|
row[0].startswith(self.dialect.commentPrefix)) or \ |
|
((not row or set(row) == {''}) and self.dialect.skipBlankRows) or \ |
|
(self.lineno < self.dialect.skipRows): |
|
if (row and self.dialect.commentPrefix and |
|
row[0].startswith(self.dialect.commentPrefix)) or \ |
|
(row and self.lineno < self.dialect.skipRows): |
|
self.comments.append(( |
|
self.lineno, |
|
self.dialect.delimiter.join(row).lstrip(self.dialect.commentPrefix).strip(), |
|
)) |
|
row = self._next_row() |
|
row = [self.dialect.trimmer(s) for s in row][self.dialect.skipColumns:] |
|
return row |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
if self._close: |
|
self.f.close() |
|
|
|
def __iter__(self): |
|
return self |
|
|
|
|
|
class UnicodeReaderWithLineNumber(UnicodeReader): |
|
""" |
|
A `UnicodeReader` yielding (lineno, row) pairs, where "lineno" is the 1-based number of the |
|
the **text line** where the (possibly multi-line) row data starts in the DSV file. |
|
""" |
|
def __next__(self): |
|
""" |
|
:return: a pair (1-based line number in the input, row) |
|
""" |
|
|
|
row = super(UnicodeReaderWithLineNumber, self).__next__() |
|
return self.lineno + 1, row |
|
|
|
|
|
class UnicodeDictReader(UnicodeReader): |
|
""" |
|
A `UnicodeReader` yielding one `dict` per row. |
|
|
|
:param f: As for :class:`UnicodeReader` |
|
:param fieldnames: |
|
|
|
.. code-block:: python |
|
|
|
>>> with UnicodeDictReader( |
|
... 'tests/fixtures/frictionless-data.csv', |
|
... dialect=Dialect(delimiter='|', header=False), |
|
... fieldnames=[str(i) for i in range(1, 11)]) as reader: |
|
... for row in reader: |
|
... print(row) |
|
... break |
|
... |
|
OrderedDict([('1', 'FK'), ('2', 'Year'), ('3', 'Location name'), ('4', 'Value'), |
|
('5', 'binary'), ('6', 'anyURI'), ('7', 'email'), ('8', 'boolean'), ('9', 'array'), |
|
('10', 'geojson')]) |
|
|
|
""" |
|
|
|
def __init__(self, f, fieldnames=None, restkey=None, restval=None, **kw): |
|
self._fieldnames = fieldnames |
|
self.restkey = restkey |
|
self.restval = restval |
|
self.line_num = 0 |
|
super(UnicodeDictReader, self).__init__(f, **kw) |
|
|
|
@property |
|
def fieldnames(self): |
|
if self._fieldnames is None: |
|
try: |
|
self._fieldnames = super(UnicodeDictReader, self).__next__() |
|
except StopIteration: |
|
pass |
|
self.line_num = self.reader.line_num |
|
if self._fieldnames: |
|
if len(set(self._fieldnames)) != len(self._fieldnames): |
|
warnings.warn('Duplicate column names!') |
|
return self._fieldnames |
|
|
|
def __next__(self): |
|
if self.line_num == 0: |
|
|
|
self.fieldnames |
|
row = super(UnicodeDictReader, self).__next__() |
|
self.line_num = self.reader.line_num |
|
|
|
|
|
|
|
|
|
while row == []: |
|
row = super(UnicodeDictReader, self).__next__() |
|
return self.item(row) |
|
|
|
def item(self, row): |
|
d = collections.OrderedDict((k, v) for k, v in zip(self.fieldnames, row)) |
|
lf = len(self.fieldnames) |
|
lr = len(row) |
|
if lf < lr: |
|
d[self.restkey] = row[lf:] |
|
elif lf > lr: |
|
for key in self.fieldnames[lr:]: |
|
d[key] = self.restval |
|
return d |
|
|
|
|
|
class NamedTupleReader(UnicodeDictReader): |
|
""" |
|
A `UnicodeReader` yielding one `namedtuple` per row. |
|
|
|
.. note:: |
|
|
|
This reader has some limitations, notably that fieldnames must be normalized to be |
|
admissible Python names, but also bad performance (compared with `UnicodeDictReader`). |
|
""" |
|
|
|
_normalize_fieldname = staticmethod(utils.normalize_name) |
|
|
|
@functools.cached_property |
|
def cls(self): |
|
fieldnames = list(map(self._normalize_fieldname, self.fieldnames)) |
|
return collections.namedtuple('Row', fieldnames) |
|
|
|
def item(self, row): |
|
d = UnicodeDictReader.item(self, row) |
|
for name in self.fieldnames: |
|
d.setdefault(name, None) |
|
return self.cls( |
|
**{self._normalize_fieldname(k): v for k, v in d.items() if k in self.fieldnames}) |
|
|
|
|
|
def iterrows(lines_or_file: LINES_OR_PATH, |
|
namedtuples: typing.Optional[bool] = False, |
|
dicts: typing.Optional[bool] = False, |
|
encoding: typing.Optional[str] = 'utf-8', |
|
**kw) -> typing.Generator: |
|
"""Convenience factory function for csv reader. |
|
|
|
:param lines_or_file: Content to be read. Either a file handle, a file path or a list\ |
|
of strings. |
|
:param namedtuples: Yield namedtuples. |
|
:param dicts: Yield dicts. |
|
:param encoding: Encoding of the content. |
|
:param kw: Keyword parameters are passed through to csv.reader. |
|
:return: A generator over the rows. |
|
""" |
|
if namedtuples and dicts: |
|
raise ValueError('either namedtuples or dicts can be chosen as output format') |
|
elif namedtuples: |
|
_reader = NamedTupleReader |
|
elif dicts: |
|
_reader = UnicodeDictReader |
|
else: |
|
_reader = UnicodeReader |
|
|
|
with _reader(lines_or_file, encoding=encoding, **kw) as r: |
|
for item in r: |
|
yield item |
|
|
|
|
|
reader = iterrows |
|
|
|
|
|
def rewrite(fname: typing.Union[str, pathlib.Path], |
|
visitor: typing.Callable[[int, typing.List[str]], typing.Union[None, typing.List[str]]], |
|
**kw): |
|
"""Utility function to rewrite rows in dsv files. |
|
|
|
:param fname: Path of the dsv file to operate on. |
|
:param visitor: A callable that takes a line-number and a row as input and returns a \ |
|
(modified) row or None to filter out the row. |
|
:param kw: Keyword parameters are passed through to csv.reader/csv.writer. |
|
""" |
|
fname = utils.ensure_path(fname) |
|
assert fname.is_file() |
|
with tempfile.NamedTemporaryFile(delete=False) as fp: |
|
tmp = pathlib.Path(fp.name) |
|
|
|
with UnicodeReader(fname, **kw) as reader_: |
|
with UnicodeWriter(tmp, **kw) as writer: |
|
for i, row in enumerate(reader_): |
|
row = visitor(i, row) |
|
if row is not None: |
|
writer.writerow(row) |
|
shutil.move(str(tmp), str(fname)) |
|
|
|
|
|
def add_rows(fname: typing.Union[str, pathlib.Path], *rows: typing.List[str]): |
|
with tempfile.NamedTemporaryFile(delete=False) as fp: |
|
tmp = pathlib.Path(fp.name) |
|
|
|
fname = utils.ensure_path(fname) |
|
with UnicodeWriter(tmp) as writer: |
|
if fname.exists(): |
|
with UnicodeReader(fname) as reader_: |
|
for row in reader_: |
|
writer.writerow(row) |
|
writer.writerows(rows) |
|
shutil.move(str(tmp), str(fname)) |
|
|
|
|
|
def filter_rows_as_dict(fname: typing.Union[str, pathlib.Path], |
|
filter_: typing.Callable[[dict], bool], |
|
**kw) -> int: |
|
"""Rewrite a dsv file, filtering the rows. |
|
|
|
:param fname: Path to dsv file |
|
:param filter_: callable which accepts a `dict` with a row's data as single argument\ |
|
returning a `Boolean` indicating whether to keep the row (`True`) or to discard it \ |
|
`False`. |
|
:param kw: Keyword arguments to be passed `UnicodeReader` and `UnicodeWriter`. |
|
:return: The number of rows that have been removed. |
|
""" |
|
filter_ = DictFilter(filter_) |
|
rewrite(fname, filter_, **kw) |
|
return filter_.removed |
|
|
|
|
|
class DictFilter(object): |
|
|
|
def __init__(self, filter_): |
|
self.header = None |
|
self.filter = filter_ |
|
self.removed = 0 |
|
|
|
def __call__(self, i, row): |
|
if i == 0: |
|
self.header = row |
|
return row |
|
if row: |
|
item = dict(zip(self.header, row)) |
|
if self.filter(item): |
|
return row |
|
else: |
|
self.removed += 1 |
|
|