longling.lib.stream 源代码

# coding: utf-8

"""此模块用以进行流处理"""
# todo 完善注释

from __future__ import print_function

import json
from _io import TextIOWrapper
from contextlib import contextmanager
from tqdm import tqdm
import codecs
import os
import sys
from pathlib import PurePath
import fileinput
from typing import BinaryIO, TextIO

__all__ = ['to_io', 'as_io', 'as_out_io', 'rf_open', 'wf_open', 'rf_group_open', 'wf_group_open', 'close_io',
           'flush_print', 'build_dir', 'json_load',
           'as_io_group', 'as_out_io_group',
           'pickle_load', 'AddPrinter', 'AddObject', 'StreamError', 'check_file',
           'PATH_TYPE', 'encode', 'IO_TYPE', "PATH_IO_TYPE", "block_std"
           ]


[文档]class StreamError(Exception): pass
PATH_TYPE = (str, PurePath) IO_TYPE = (TextIOWrapper, TextIO, BinaryIO, codecs.StreamReaderWriter, fileinput.FileInput) PATH_IO_TYPE = (PATH_TYPE, IO_TYPE)
[文档]@contextmanager def block_std(): """ Examples -------- >>> print("hello world") hello world >>> with block_std(): ... print("hello world") """ sys.stdout = open(os.devnull, "w") sys.stderr = open(os.devnull, "w") yield sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__
[文档]def flush_print(*values, **kwargs): """刷新打印函数""" sep = kwargs.pop('sep', "") end = kwargs.pop('end', "") print('\r', *values, sep=sep, end=end, flush=True, **kwargs)
[文档]def build_dir(path, mode=0o775, parse_dir=True): """ 创建目录,从path中解析出目录路径,如果目录不存在,创建目录 Parameters ---------- path: str mode: int parse_dir: bool """ if parse_dir: dirname = os.path.dirname(path) else: dirname = path if not dirname or os.path.exists(dirname): return path os.makedirs(dirname, mode) return path
[文档]def check_file(filepath, size=None): """ 检查文件是否存在,size给定时,检查文件大小是否一致 Parameters ---------- filepath: str size: int Returns ------- file exist or not: bool """ if os.path.exists(filepath): return size == os.path.getsize(filepath) if size is not None else True return False
def rf_open(stream_name: (PATH_IO_TYPE, list, None) = None, encoding='utf-8', **kwargs): if stream_name is None: return sys.stdin if isinstance(stream_name, list): if encoding is None: return fileinput.input(files=stream_name, **kwargs) else: return fileinput.input( files=stream_name, openhook=fileinput.hook_encoded(encoding) ) elif isinstance(stream_name, IO_TYPE): return stream_name elif isinstance(stream_name, PATH_TYPE): encoding = None if kwargs.get("mode") == "rb" else encoding return open(stream_name, encoding=encoding, **kwargs) else: raise TypeError("unknown type: %s(%s)" % (stream_name, type(stream_name))) def rf_group_open(*files, encoding="utf-8", **kwargs): return [rf_open(_file, encoding=encoding, **kwargs) for _file in files] def json_load(fp: (IO_TYPE, PurePath), **kwargs): with as_io(fp) as f: return json.load(f, **kwargs) def pickle_load(fp, encoding='utf-8', mode='rb', **kwargs): import pickle if isinstance(fp, PATH_TYPE): fp = open(fp, mode=mode, **kwargs) datas = pickle.load(fp, encoding=encoding, **kwargs) fp.close() return datas else: return pickle.load(fp, encoding=encoding, **kwargs)
[文档]def wf_open(stream_name: (PATH_IO_TYPE, None) = None, mode="w", encoding="utf-8", **kwargs): r""" Simple wrapper to codecs for writing. stream_name为空时 mode - w 返回标准错误输出 stderr; 否则,返回标准输出 stdout stream_name不为空时,返回文件流 Parameters ---------- stream_name: str, PurePath or None mode: str encoding: str 编码方式,默认为 utf-8 Returns ------- write_stream: StreamReaderWriter 返回打开的流 Examples -------- >>> wf = wf_open(mode="stdout") >>> print("hello world", file=wf) hello world """ if stream_name is None: if stream_name is None and mode in {"w", "wb", "a", "ab"}: return sys.stdout elif mode == "stdout": return sys.stdout elif mode == "stderr": return sys.stderr else: raise TypeError("Unknown mode for std mode, only `stdout` and `stderr` are supported.") elif isinstance(stream_name, PATH_TYPE): build_dir(stream_name) if mode == "wb": return open(stream_name, mode=mode, **kwargs) return codecs.open(stream_name, mode=mode, encoding=encoding, **kwargs) elif isinstance(stream_name, IO_TYPE): return stream_name else: raise TypeError("unknown type: %s(%s)" % (stream_name, type(stream_name)))
def wf_group_open(*files, encoding="utf-8", **kwargs): return [wf_open(_file, encoding=encoding, **kwargs) for _file in files]
[文档]def to_io(stream: (TextIOWrapper, TextIO, BinaryIO, PATH_TYPE, list, None) = None, mode='r', encoding='utf-8', **kwargs): """ Convert an object as an io stream, could be a path to file or an io stream. Examples -------- .. code-block:: python to_io("demo.txt") # equal to open("demo.txt") to_io(open("demo.txt")) # equal to open("demo.txt") a = to_io() # equal to a = sys.stdin b = to_io(mode="w) # equal to a = sys.stdout """ if isinstance(stream, IO_TYPE): return stream if 'r' in mode: return rf_open(stream, encoding=encoding, mode=mode, **kwargs) elif 'w' in mode or 'a' in mode or 'stderr' in mode or 'stdout' in mode: return wf_open(stream_name=stream, mode=mode, encoding=encoding) else: raise ValueError("cannot handle the mode %s" % mode)
def to_io_group(*streams, mode='r', encoding='utf-8', **kwargs): return [to_io(stream, mode, encoding, **kwargs) for stream in streams]
[文档]def close_io(stream): """关闭文件流,忽略 sys.stdin, sys.stdout, sys.stderr""" if isinstance(stream, list): for _stream in stream: close_io(_stream) elif stream in {sys.stdin, sys.stdout, sys.stderr}: pass else: try: stream.close() except Exception: raise StreamError('io_close: %s' % stream)
[文档]@contextmanager def as_io(src: (TextIOWrapper, TextIO, BinaryIO, PATH_TYPE, list, None) = None, mode='r', encoding='utf-8', **kwargs): """ with wrapper for to_io function, default mode is "r" Examples -------- .. code-block:: python with as_io("demo.txt") as f: for line in f: pass # equal to with open(demo.txt) as src: with as_io(src) as f: for line in f: pass # from several files with as_io(["demo1.txt", "demo2.txt"]) as f: for line in f: pass # from sys.stdin with as_io() as f: for line in f: pass """ _io_object = to_io(src, mode, encoding, **kwargs) yield _io_object close_io(_io_object)
@contextmanager def as_io_group(*src, mode='r', encoding='utf-8', **kwargs): ios = to_io_group(*src, mode=mode, encoding=encoding, **kwargs) yield ios close_io(ios)
[文档]@contextmanager def as_out_io(tar: (TextIOWrapper, TextIO, BinaryIO, PATH_TYPE, list, None) = None, mode='w', encoding='utf-8', **kwargs): """ with wrapper for to_io function, default mode is "w" Examples -------- .. code-block:: python with as_out_io("demo.txt") as wf: print("hello world", file=wf) # equal to with open(demo.txt) as tar: with as_out_io(tar) as f: print("hello world", file=wf) # to sys.stdout with as_out_io() as wf: print("hello world", file=wf) # to sys.stderr with as_out_io(mode="stderr) as wf: print("hello world", file=wf) """ with as_io(tar, mode, encoding, **kwargs) as out_io: yield out_io
@contextmanager def as_out_io_group(*src, mode='w', encoding='utf-8', **kwargs): out_ios = to_io_group(*src, mode=mode, encoding=encoding, **kwargs) yield out_ios close_io(out_ios)
[文档]def encode(src, src_encoding, tar, tar_encoding): """ Convert a file in source encoding to target encoding Parameters ---------- src src_encoding tar tar_encoding Returns ------- """ with rf_open(src, encoding=src_encoding) as f, wf_open(tar, encoding=tar_encoding) as wf: for line in tqdm(f, "encoding %s[%s] --> %s[%s]" % (src, src_encoding, tar, tar_encoding)): print(line, end='', file=wf)
class AddObject(object): def add(self, *args, **kwargs): # pragma: no cover raise NotImplementedError
[文档]class AddPrinter(AddObject): """ 以add方法添加文件内容的打印器 Examples -------- >>> import sys >>> printer = AddPrinter(sys.stdout, ensure_io=True) >>> printer.add("hello world") hello world """ def __init__(self, fp, values_wrapper=lambda x: x, to_io_params=None, ensure_io=False, **kwargs): super(AddPrinter, self).__init__() self.fp = wf_open(fp, **to_io_params if to_io_params is not None else {}) if not ensure_io else fp self.value_wrapper = values_wrapper self.kwargs = kwargs def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def add(self, value): print(self.value_wrapper(value), file=self.fp, **self.kwargs) def close(self): close_io(self.fp) @property def name(self): return self.fp.name