Source code for lasio.las

from __future__ import print_function

# Standard library packages
import codecs

try:  # will work in Python 3
    from collections.abc import Sequence
except ImportError:  # Support Python 2.7
    from collections import Sequence

import csv
import json
import logging
import re
import sys

# get basestring in py3

try:
    unicode = unicode
except NameError:
    # 'unicode' is undefined, must be Python 3
    unicode = str
    basestring = (str, bytes)
else:
    # 'unicode' exists, must be Python 2
    bytes = str
    basestring = basestring

# Required third-party packages available on PyPi:

import numpy as np

# internal lasio imports

from . import exceptions
from .las_items import HeaderItem, CurveItem, SectionItems, OrderedDict
from . import defaults
from . import reader
from . import writer

logger = logging.getLogger(__name__)


[docs]class LASFile(object): """LAS file object. Keyword Arguments: file_ref (file-like object, str): either a filename, an open file object, or a string containing the contents of a file. See these routines for additional keyword arguments you can use when reading in a LAS file: * :func:`lasio.reader.open_with_codecs` - manage issues relate to character encodings * :meth:`lasio.LASFile.read` - control how NULL values and errors are handled during parsing Attributes: encoding (str or None): the character encoding used when reading the file in from disk """ def __init__(self, file_ref=None, **read_kwargs): super(LASFile, self).__init__() self._text = "" self.index_unit = None default_items = defaults.get_default_items() self.sections = { "Version": default_items["Version"], "Well": default_items["Well"], "Curves": default_items["Curves"], "Parameter": default_items["Parameter"], "Other": str(default_items["Other"]), } if not (file_ref is None): self.read(file_ref, **read_kwargs)
[docs] def read( self, file_ref, ignore_data=False, read_policy="default", null_policy="strict", ignore_header_errors=False, ignore_comments=("#",), mnemonic_case="upper", index_unit=None, remove_data_line_filter="#", **kwargs ): """Read a LAS file. Arguments: file_ref (file-like object, str): either a filename, an open file object, or a string containing the contents of a file. Keyword Arguments: null_policy (str or list): see http://lasio.readthedocs.io/en/latest/data-section.html#handling-invalid-data-indicators-automatically ignore_data (bool): if True, do not read in any of the actual data, just the header metadata. False by default. ignore_header_errors (bool): ignore LASHeaderErrors (False by default) ignore_comments (tuple/str): ignore comments beginning with characters e.g. ``("#", '"')`` in header sections mnemonic_case (str): 'preserve': keep the case of HeaderItem mnemonics 'upper': convert all HeaderItem mnemonics to uppercase 'lower': convert all HeaderItem mnemonics to lowercase index_unit (str): Optionally force-set the index curve's unit to "m" or "ft" remove_data_line_filter (str, func): string or function for removing/ignoring lines in the data section e.g. a function which accepts a string (a line from the data section) and returns either True (do not parse the line) or False (parse the line). If this argument is a string it will instead be converted to a function which rejects all lines starting with that value e.g. ``"#"`` will be converted to ``lambda line: line.strip().startswith("#")`` See :func:`lasio.reader.open_with_codecs` for additional keyword arguments which help to manage issues relate to character encodings. """ logger.debug("Reading {}...".format(str(file_ref))) file_obj = "" try: file_obj, self.encoding = reader.open_file(file_ref, **kwargs) logger.debug( "Fetching substitutions for read_policy {} and null policy {}".format( read_policy, null_policy ) ) regexp_subs, value_null_subs, version_NULL = reader.get_substitutions( read_policy, null_policy ) provisional_version = 2.0 provisional_wrapped = "YES" provisional_null = None section_positions = reader.find_sections_in_file(file_obj) logger.debug("Found {} sections".format(len(section_positions))) if len(section_positions) == 0: raise KeyError("No ~ sections found. Is this a LAS file?") data_section_indices = [] # This is a transitional data_section_indicies till the las30 data # reading can handle 1.2 and 2.0 data, then it will be merged back # into data_section_indices las3_data_section_indices = [] las3_section_indicators = ["_DATA", "_PARAMETER", "_DEFINITION"] for i, (k, first_line, last_line, section_title) in enumerate( section_positions ): section_type = reader.determine_section_type(section_title) logger.debug( "Parsing {typ} section at lines {first_line}-{last_line} ({k} bytes) {title}".format( typ=section_type, title=section_title, first_line=first_line + 1, last_line=last_line + 1, k=k, ) ) # Read traditional LAS header item section if section_type == "Header items": file_obj.seek(k) sct_items = reader.parse_header_items_section( file_obj, line_nos=(first_line, last_line), version=provisional_version, ignore_header_errors=ignore_header_errors, mnemonic_case=mnemonic_case, ignore_comments=ignore_comments, ) # Update provisional statuses if "VERS" in sct_items: provisional_version = sct_items.VERS.value if "WRAP" in sct_items: provisional_wrapped = sct_items.WRAP.value if "NULL" in sct_items: provisional_null = sct_items.NULL.value # las3 sections can contain _Data, _Parameter or _Definition las3_section = any( [ section_str in section_title[1:].upper() for section_str in las3_section_indicators ] ) if provisional_version == 3.0 and las3_section: self.sections[section_title[1:]] = sct_items elif section_title[1] == "V": self.sections["Version"] = sct_items elif section_title[1] == "W": self.sections["Well"] = sct_items elif section_title[1] == "C": self.sections["Curves"] = sct_items elif section_title[1] == "P": self.sections["Parameter"] = sct_items else: self.sections[section_title[1:]] = sct_items # Read free-text LAS header section elif section_type == "Header (other)": file_obj.seek(k) line_no = first_line contents = [] for line in file_obj: if line.startswith("~"): continue line_no += 1 contents.append(line.strip("\n").strip()) if line_no == last_line: break sct_contents = "\n".join(contents) if section_title[1] == "O": self.sections["Other"] = sct_contents else: self.sections[section_title[1:]] = sct_contents elif section_type == "Data": logger.debug("Storing reference and returning later...") data_section_indices.append(i) # Initial stub for parsing las3 data. This is probably a # transitional section that will merge with 1.2/2.0 data # parsing once fully functional elif section_type == "Las3_Data": logger.debug("Storing Las3_Data reference and returning later...") las3_data_section_indices.append(i) if not ignore_data: for k, first_line, last_line, section_title in [ section_positions[i] for i in data_section_indices ]: logger.debug("Reading data section {}".format(section_title)) file_obj.seek(k) n_columns = reader.inspect_data_section( file_obj, (first_line, last_line), regexp_subs, remove_line_filter=remove_data_line_filter, ) file_obj.seek(k) arr = reader.read_data_section_iterative( file_obj, (first_line, last_line), regexp_subs, value_null_subs, remove_line_filter=remove_data_line_filter, ) logger.debug("Read ndarray {arrshape}".format(arrshape=arr.shape)) # This is so we can check data size and use self.set_data(data, truncate=False) # in cases of data.size is zero. data = arr if data.size > 0: # TODO: check whether this treatment of NULLs is correct logger.debug("~A data {}".format(arr)) if version_NULL: arr[arr == provisional_null] = np.nan logger.debug("~A after NULL replacement data {}".format(arr)) # Provisionally, assume that the number of columns represented # by the data section's array is equal to the number of columns # defined in the Curves/Definition section. n_columns_in_arr = len(self.curves) # If we are told the file is unwrapped, then we assume that each # column detected is a column, and we ignore the Curves/Definition # section's number of columns instead. if provisional_wrapped == "NO": n_columns_in_arr = n_columns # --------------------------------------------------------------------- # TODO: # This enables tests/test_read.py::test_barebones_missing_all_sections # to pass, but may not be the complete or final solution. # --------------------------------------------------------------------- if len(self.curves) == 0 and n_columns > 0: n_columns_in_arr = n_columns logger.debug( "Data array (size {}) assumed to have {} columns " "({} curves defined)".format( arr.shape, n_columns_in_arr, len(self.curves) ) ) # We attempt to reshape the 1D array read in from # the data section so that it can be assigned to curves. try: data = np.reshape(arr, (-1, n_columns_in_arr)) except ValueError as exception: error_message = "Cannot reshape ~A data size {0} into {1} columns".format( arr.shape, n_columns_in_arr ) if sys.version_info.major < 3: exception.message = error_message raise exception else: raise ValueError(error_message).with_traceback( exception.__traceback__ ) self.set_data(data, truncate=False) finally: if hasattr(file_obj, "close"): file_obj.close() # TODO: reimplement these warnings!! ###### logger.warning("No data section (regexp='~A') found") ###### logger.warning("No numerical data found inside ~A section") # Understand the depth/index unit. if "m" in str(index_unit): index_unit = "m" if index_unit: self.index_unit = index_unit else: check_units_on = [] for mnemonic in ("STRT", "STOP", "STEP"): if mnemonic in self.well: check_units_on.append(self.well[mnemonic]) if len(self.curves) > 0: check_units_on.append(self.curves[0]) matches = [] for index_unit, possibilities in defaults.DEPTH_UNITS.items(): for check_unit in check_units_on: if any([check_unit.unit == p for p in possibilities]) or any( [check_unit.unit.upper() == p for p in possibilities] ): matches.append(index_unit) matches = set(matches) if len(matches) == 1: self.index_unit = tuple(matches)[0] elif len(matches) == 0: self.index_unit = None else: logger.warning("Conflicting index units found: {}".format(matches)) self.index_unit = None
[docs] def write(self, file_ref, **kwargs): """Write LAS file to disk. Arguments: file_ref (open file-like object or str): a file-like object opening for writing, or a filename. All ``**kwargs`` are passed to :func:`lasio.writer.write` -- please check the docstring of that function for more keyword arguments you can use here! Examples: >>> import lasio >>> las = lasio.read("tests/examples/sample.las") >>> with open('test_output.las', mode='w') as f: ... las.write(f, version=2.0) # <-- this method """ opened_file = False if isinstance(file_ref, basestring) and not hasattr(file_ref, "write"): opened_file = True file_ref = open(file_ref, "w") writer.write(self, file_ref, **kwargs) if opened_file: file_ref.close()
[docs] def to_excel(self, filename): """Export LAS file to a Microsoft Excel workbook. This function will raise an :exc:`ImportError` if ``openpyxl`` is not installed. Arguments: filename (str) """ from . import excel converter = excel.ExcelConverter(self) converter.write(filename)
[docs] def to_csv(self, file_ref, mnemonics=True, units=True, units_loc="line", **kwargs): """Export to a CSV file. Arguments: file_ref (open file-like object or str): a file-like object opening for writing, or a filename. Keyword Arguments: mnemonics (list, True, False): write mnemonics as a header line at the start. If list, use the supplied items as mnemonics. If True, use the curve mnemonics. units (list, True, False): as for mnemonics. units_loc (str or None): either 'line', '[]' or '()'. 'line' will put units on the line following the mnemonics (good for WellCAD). '[]' and '()' will put the units in either brackets or parentheses following the mnemonics, on the single header line (better for Excel) **kwargs: passed to :class:`csv.writer`. Note that if ``lineterminator`` is **not** specified here, then it will be sent to :class:`csv.writer` as ``lineterminator='\\n'``. """ opened_file = False if isinstance(file_ref, basestring) and not hasattr(file_ref, "write"): opened_file = True file_ref = open(file_ref, "w") if not "lineterminator" in kwargs: kwargs["lineterminator"] = "\n" writer = csv.writer(file_ref, **kwargs) if mnemonics is True: mnemonics = [c.original_mnemonic for c in self.curves] if units is True: units = [c.unit for c in self.curves] if mnemonics: if units_loc in ("()", "[]") and units: mnemonics = [ m + " " + units_loc[0] + u + units_loc[1] for m, u in zip(mnemonics, units) ] writer.writerow(mnemonics) if units: if units_loc == "line": writer.writerow(units) for i in range(self.data.shape[0]): writer.writerow(self.data[i, :]) if opened_file: file_ref.close()
[docs] def match_raw_section(self, pattern, re_func="match", flags=re.IGNORECASE): """Find raw section with a regular expression. Arguments: pattern (str): regular expression (you need to include the tilde) Keyword Arguments: re_func (str): either "match" or "search", see python ``re`` module. flags (int): flags for :func:`re.compile` Returns: dict Intended for internal use only. """ for title in self.raw_sections.keys(): title = title.strip() p = re.compile(pattern, flags=flags) if re_func == "match": re_func = re.match elif re_func == "search": re_func = re.search m = re_func(p, title) if m: return self.raw_sections[title]
[docs] def get_curve(self, mnemonic): """Return CurveItem object. Arguments: mnemonic (str): the name of the curve Returns: :class:`lasio.CurveItem` (not just the data array) """ for curve in self.curves: if curve.mnemonic == mnemonic: return curve
[docs] def __getitem__(self, key): """Provide access to curve data. Arguments: key (str, int): either a curve mnemonic or the column index. Returns: 1D :class:`numpy.ndarray` (the data for the curve) """ # TODO: If I implement 2D arrays, need to check here for :1 :2 :3 etc. curve_mnemonics = [c.mnemonic for c in self.curves] if isinstance(key, int): return self.curves[key].data elif key in curve_mnemonics: return self.curves[key].data else: raise KeyError("{} not found in curves ({})".format(key, curve_mnemonics))
[docs] def __setitem__(self, key, value): """Append a curve. Arguments: key (str): the curve mnemonic value (1D data or CurveItem): either the curve data, or a CurveItem See :meth:`lasio.LASFile.append_curve_item` or :meth:`lasio.LASFile.append_curve` for more details. """ if isinstance(value, CurveItem): if key != value.mnemonic: raise KeyError( "key {} does not match value.mnemonic {}".format( key, value.mnemonic ) ) self.append_curve_item(value) else: # Assume value is an ndarray self.append_curve(key, value)
[docs] def keys(self): """Return curve mnemonics.""" return [c.mnemonic for c in self.curves]
[docs] def values(self): """Return data for each curve.""" return [c.data for c in self.curves]
[docs] def items(self): """Return mnemonics and data for all curves.""" return [(c.mnemonic, c.data) for c in self.curves]
def iterkeys(self): return iter(list(self.keys())) def itervalues(self): return iter(list(self.values())) def iteritems(self): return iter(list(self.items())) @property def version(self): """Header information from the Version (~V) section. Returns: :class:`lasio.SectionItems` object. """ return self.sections["Version"] @version.setter def version(self, section): self.sections["Version"] = section @property def well(self): """Header information from the Well (~W) section. Returns: :class:`lasio.SectionItems` object. """ return self.sections["Well"] @well.setter def well(self, section): self.sections["Well"] = section @property def curves(self): """Curve information and data from the Curves (~C) and data section.. Returns: :class:`lasio.SectionItems` object. """ return self.sections["Curves"] @curves.setter def curves(self, section): self.sections["Curves"] = section @property def curvesdict(self): """Curve information and data from the Curves (~C) and data section.. Returns: dict """ d = {} for curve in self.curves: d[curve["mnemonic"]] = curve return d @property def params(self): """Header information from the Parameter (~P) section. Returns: :class:`lasio.SectionItems` object. """ return self.sections["Parameter"] @params.setter def params(self, section): self.sections["Parameter"] = section @property def other(self): """Header information from the Other (~O) section. Returns: str """ return self.sections["Other"] @other.setter def other(self, section): self.sections["Other"] = section @property def header(self): """All header information Returns: dict """ return self.sections
[docs] def df(self): """Return data as a :class:`pandas.DataFrame` structure. The first Curve of the LASFile object is used as the pandas DataFrame's index. """ import pandas as pd from pandas.api.types import is_object_dtype df = pd.DataFrame(self.data, columns=[c.mnemonic for c in self.curves]) for column in df.columns: if is_object_dtype(df[column].dtype): try: df[column] = df[column].astype(np.float64) except ValueError: pass if len(self.curves) > 0: df = df.set_index(self.curves[0].mnemonic) return df
@property def data(self): return np.vstack([c.data for c in self.curves]).T @data.setter def data(self, value): return self.set_data(value)
[docs] def set_data(self, array_like, names=None, truncate=False): """Set the data for the LAS; actually sets data on individual curves. Arguments: array_like (array_like or :class:`pandas.DataFrame`): 2-D data array Keyword Arguments: names (list, optional): used to replace the names of the existing :class:`lasio.CurveItem` objects. truncate (bool): remove any columns which are not included in the Curves (~C) section. Note: you can pass a :class:`pandas.DataFrame` to this method. """ try: import pandas as pd except ImportError: pass else: if isinstance(array_like, pd.DataFrame): return self.set_data_from_df( array_like, **dict(names=names, truncate=False) ) data = np.asarray(array_like) # Truncate data array if necessary. if truncate: data = data[:, len(self.curves)] # Extend curves list if necessary. while data.size > 0 and (data.shape[1] > len(self.curves)): self.curves.append(CurveItem("")) if not names: names = [c.original_mnemonic for c in self.curves] else: # Extend names list if necessary. while len(self.curves) > len(names): names.append("") logger.debug("set_data. names to use: {}".format(names)) if data.size > 0: for i, curve in enumerate(self.curves): curve.mnemonic = names[i] curve.data = data[:, i] self.curves.assign_duplicate_suffixes()
[docs] def set_data_from_df(self, df, **kwargs): """Set the LAS file data from a :class:`pandas.DataFrame`. Arguments: df (pandas.DataFrame): curve mnemonics are the column names. The depth column for the curves must be the index of the DataFrame. Keyword arguments are passed to :meth:`lasio.LASFile.set_data`. """ df_values = np.vstack([df.index.values, df.values.T]).T if (not "names" in kwargs) or (not kwargs["names"]): kwargs["names"] = [df.index.name] + [ str(name) for name in df.columns.values ] self.set_data(df_values, **kwargs)
[docs] def stack_curves(self, mnemonic, sort_curves=True): """Stack multi-channel curve data to a numpy 2D ndarray. Provide a stub name (prefix shared by all curves that will be stacked) or a list of curve mnemonic strings. Keyword Arguments: mnemonic (str or list): Supply the first several characters of the channel set to be stacked. Alternatively, supply a list of the curve names (mnemonics strings) to be stacked. sort_curves (bool): Natural sort curves based on mnemonic prior to stacking. Returns: 2-D numpy array """ if isinstance(mnemonic, np.ndarray): mnemonic = list(mnemonic) if (not mnemonic) or (not all([i for i in mnemonic])): raise ValueError("`mnemonic` must not contain empty element") keys = self.curves.keys() if isinstance(mnemonic, str): channels = [i for i in keys if i.startswith(mnemonic)] or [mnemonic] elif isinstance(mnemonic, Sequence): channels = list(mnemonic) else: raise TypeError("`mnemonic` argument must be string or sequence") print(channels) if not set(keys).issuperset(set(channels)): missing = ", ".join(set(channels).difference(set(keys))) raise KeyError("{} not found in LAS curves.".format(missing)) if sort_curves: nat_sort = lambda x: [ int(i) if i.isdigit() else i for i in re.split(r"(\d+)", x) ] channels.sort(key=nat_sort) indices = [keys.index(i) for i in channels] return self.data[:, indices]
@property def index(self): """Return data from the first column of the LAS file data (depth/time). """ return self.curves[0].data @property def depth_m(self): """Return the index as metres.""" if self._index_unit_contains("M"): return self.index elif self._index_unit_contains("F"): return self.index * 0.3048 elif self._index_unit_contains(".1IN"): return (self.index / 120) * 0.3048 else: raise exceptions.LASUnknownUnitError("Unit of depth index not known") @property def depth_ft(self): """Return the index as feet.""" if self._index_unit_contains("M"): return self.index / 0.3048 elif self._index_unit_contains("F"): return self.index elif self._index_unit_contains(".1IN"): return self.index / 120 else: raise exceptions.LASUnknownUnitError("Unit of depth index not known") def _index_unit_contains(self, unit_code): """Check value of index_unit string, ignoring case Args: index unit code (string) e.g. 'M' or 'FT' """ return self.index_unit and (unit_code.upper() in self.index_unit.upper()) def add_curve_raw(self, mnemonic, data, unit="", descr="", value=""): """Deprecated. Use append_curve_item() or insert_curve_item() instead.""" return self.append_curve_item(self, mnemonic, data, unit, descr, value)
[docs] def append_curve_item(self, curve_item): """Add a CurveItem. Args: curve_item (lasio.CurveItem) """ self.insert_curve_item(len(self.curves), curve_item)
[docs] def insert_curve_item(self, ix, curve_item): """Insert a CurveItem. Args: ix (int): position to insert CurveItem i.e. 0 for start curve_item (lasio.CurveItem) """ assert isinstance(curve_item, CurveItem) self.curves.insert(ix, curve_item)
def add_curve(self, *args, **kwargs): """Deprecated. Use append_curve() or insert_curve() instead.""" return self.append_curve(*args, **kwargs)
[docs] def append_curve(self, mnemonic, data, unit="", descr="", value=""): """Add a curve. Arguments: mnemonic (str): the curve mnemonic data (1D ndarray): the curve data Keyword Arguments: unit (str): curve unit descr (str): curve description value (int/float/str): value e.g. API code. """ return self.insert_curve(len(self.curves), mnemonic, data, unit, descr, value)
[docs] def insert_curve(self, ix, mnemonic, data, unit="", descr="", value=""): """Insert a curve. Arguments: ix (int): position to insert curve at i.e. 0 for start. mnemonic (str): the curve mnemonic data (1D ndarray): the curve data Keyword Arguments: unit (str): curve unit descr (str): curve description value (int/float/str): value e.g. API code. """ curve = CurveItem(mnemonic, unit, value, descr, data) self.insert_curve_item(ix, curve)
[docs] def delete_curve(self, mnemonic=None, ix=None): """Delete a curve. Keyword Arguments: ix (int): index of curve in LASFile.curves. mnemonic (str): mnemonic of curve. The index takes precedence over the mnemonic. """ if ix is None: ix = self.curves.keys().index(mnemonic) self.curves.pop(ix)
@property def json(self): """Return object contents as a JSON string.""" return self.to_json() def to_json_old(self): """ deprecated: to_json_old version=0.25.1 since=20200507 remove=20210508 replacement_options: to_json() """ obj = OrderedDict() for name, section in self.sections.items(): try: obj[name] = section.json except AttributeError: obj[name] = json.dumps(section) return json.dumps(obj)
[docs] def to_json(self): return json.dumps(self, cls=JSONEncoder)
@json.setter def json(self, value): raise Exception("Cannot set objects from JSON")
class Las(LASFile): """LAS file object. Retained for backwards compatibility. """ pass class JSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, LASFile): d = {"metadata": {}, "data": {}} for name, section in obj.sections.items(): if isinstance(section, basestring): d["metadata"][name] = section else: try: d["metadata"][name] = section.dictview() except: for item in section: d["metadata"][name].append(dict(item)) for curve in obj.curves: d["data"][curve.mnemonic] = [ None if np.isnan(x) else x for x in curve.data ] return d