from __future__ import print_function
# Standard library packages
import codecs
try: # will work in Python 3
from collections.abc import Sequence
except ImportError: # Support Python 2.7
from collections import Sequence
import csv
import json
import logging
import os
import re
import sys
import textwrap
import traceback
# get basestring in py3
try:
unicode = unicode
except NameError:
# 'unicode' is undefined, must be Python 3
unicode = str
basestring = (str, bytes)
else:
# 'unicode' exists, must be Python 2
bytes = str
basestring = basestring
# Required third-party packages available on PyPi:
import numpy as np
# internal lasio imports
from . import exceptions
from .las_items import HeaderItem, CurveItem, SectionItems, OrderedDict
from . import defaults
from . import reader
from . import writer
logger = logging.getLogger(__name__)
[docs]class LASFile(object):
"""LAS file object.
Keyword Arguments:
file_ref (file-like object, str): either a filename, an open file
object, or a string containing the contents of a file.
See these routines for additional keyword arguments you can use when
reading in a LAS file:
* :func:`lasio.reader.open_with_codecs` - manage issues relate to character
encodings
* :meth:`lasio.las.LASFile.read` - control how NULL values and errors are
handled during parsing
Attributes:
encoding (str or None): the character encoding used when reading the
file in from disk
"""
def __init__(self, file_ref=None, **read_kwargs):
super(LASFile, self).__init__()
self._text = ""
self.index_unit = None
default_items = defaults.get_default_items()
self.sections = {
"Version": default_items["Version"],
"Well": default_items["Well"],
"Curves": default_items["Curves"],
"Parameter": default_items["Parameter"],
"Other": str(default_items["Other"]),
}
if not (file_ref is None):
self.read(file_ref, **read_kwargs)
[docs] def read(
self,
file_ref,
ignore_data=False,
read_policy="default",
null_policy="strict",
ignore_header_errors=False,
mnemonic_case="upper",
index_unit=None,
**kwargs
):
"""Read a LAS file.
Arguments:
file_ref (file-like object, str): either a filename, an open file
object, or a string containing the contents of a file.
Keyword Arguments:
null_policy (str or list): see
http://lasio.readthedocs.io/en/latest/data-section.html#handling-invalid-data-indicators-automatically
ignore_data (bool): if True, do not read in any of the actual data,
just the header metadata. False by default.
ignore_header_errors (bool): ignore LASHeaderErrors (False by
default)
mnemonic_case (str): 'preserve': keep the case of HeaderItem mnemonics
'upper': convert all HeaderItem mnemonics to uppercase
'lower': convert all HeaderItem mnemonics to lowercase
index_unit (str): Optionally force-set the index curve's unit to "m" or "ft"
See :func:`lasio.reader.open_with_codecs` for additional keyword
arguments which help to manage issues relate to character encodings.
"""
file_obj, self.encoding = reader.open_file(file_ref, **kwargs)
regexp_subs, value_null_subs, version_NULL = reader.get_substitutions(
read_policy, null_policy
)
try:
self.raw_sections = reader.read_file_contents(
file_obj, regexp_subs, value_null_subs, ignore_data=ignore_data
)
finally:
if hasattr(file_obj, "close"):
file_obj.close()
if len(self.raw_sections) == 0:
raise KeyError("No ~ sections found. Is this a LAS file?")
def add_section(pattern, name, **sect_kws):
raw_section = self.match_raw_section(pattern)
drop = []
if raw_section:
self.sections[name] = reader.parse_header_section(
raw_section, **sect_kws
)
drop.append(raw_section["title"])
else:
logger.warning(
"Header section %s regexp=%s was not found." % (name, pattern)
)
for key in drop:
self.raw_sections.pop(key)
add_section(
"~V",
"Version",
version=1.2,
ignore_header_errors=ignore_header_errors,
mnemonic_case=mnemonic_case,
)
# Establish version and wrap values if possible.
try:
version = self.version["VERS"].value
except KeyError:
logger.warning("VERS item not found in the ~V section.")
version = None
try:
wrap = self.version["WRAP"].value
except KeyError:
logger.warning("WRAP item not found in the ~V section")
wrap = None
# Validate version.
#
# If VERS was missing and version = None, then the file will be read in
# as if version were 2.0. But there will be no VERS HeaderItem, meaning
# that las.write(..., version=None) will fail with a KeyError. But
# las.write(..., version=1.2) will work because a new VERS HeaderItem
# will be created.
try:
assert version in (1.2, 2, None)
except AssertionError:
if version < 2:
version = 1.2
else:
version = 2
else:
if version is None:
logger.info("Assuming that LAS VERS is 2.0")
version = 2
add_section(
"~W",
"Well",
version=version,
ignore_header_errors=ignore_header_errors,
mnemonic_case=mnemonic_case,
)
# Establish NULL value if possible.
try:
null = self.well["NULL"].value
except KeyError:
logger.warning("NULL item not found in the ~W section")
null = None
add_section(
"~C",
"Curves",
version=version,
ignore_header_errors=ignore_header_errors,
mnemonic_case=mnemonic_case,
)
add_section(
"~P",
"Parameter",
version=version,
ignore_header_errors=ignore_header_errors,
mnemonic_case=mnemonic_case,
)
s = self.match_raw_section("~O")
drop = []
if s:
self.sections["Other"] = "\n".join(s["lines"])
drop.append(s["title"])
for key in drop:
self.raw_sections.pop(key)
# Deal with nonstandard sections that some operators and/or
# service companies (eg IHS) insist on adding.
drop = []
for s in self.raw_sections.values():
if s["section_type"] == "header":
logger.warning("Found nonstandard LAS section: " + s["title"])
self.sections[s["title"][1:]] = "\n".join(s["lines"])
drop.append(s["title"])
for key in drop:
self.raw_sections.pop(key)
if not ignore_data:
drop = []
s = self.match_raw_section("~A")
s_valid = True
if s is None:
logger.warning("No data section (regexp='~A') found")
s_valid = False
try:
if s["ncols"] is None:
logger.warning("No numerical data found inside ~A section")
s_valid = False
except:
pass
if s_valid:
arr = s["array"]
logger.debug("~A data.shape {}".format(arr.shape))
if version_NULL:
arr[arr == null] = np.nan
logger.debug(
"~A after NULL replacement data.shape {}".format(arr.shape)
)
n_curves = len(self.curves)
n_arr_cols = len(self.curves) # provisional pending below check
logger.debug("n_curves=%d ncols=%d" % (n_curves, s["ncols"]))
if wrap == "NO":
if s["ncols"] > n_curves:
n_arr_cols = s["ncols"]
try:
data = np.reshape(arr, (-1, n_arr_cols))
except ValueError as e:
err_msg = (
"cannot reshape ~A array of "
"size {arr_shape} into "
"{n_arr_cols} columns".format(
arr_shape=arr.shape, n_arr_cols=n_arr_cols
)
)
if sys.version_info.major < 3:
e.message = err_msg
raise e
else:
raise ValueError(err_msg).with_traceback(e.__traceback__)
self.set_data(data, truncate=False)
drop.append(s["title"])
for key in drop:
self.raw_sections.pop(key)
if "m" in str(index_unit):
index_unit = "m"
if index_unit:
self.index_unit = index_unit
else:
check_units_on = []
for mnemonic in ("STRT", "STOP", "STEP"):
if mnemonic in self.well:
check_units_on.append(self.well[mnemonic])
if len(self.curves) > 0:
check_units_on.append(self.curves[0])
for index_unit, possibilities in defaults.DEPTH_UNITS.items():
if all(i.unit.upper() in possibilities for i in check_units_on):
self.index_unit = index_unit
[docs] def write(self, file_ref, **kwargs):
"""Write LAS file to disk.
Arguments:
file_ref (open file-like object or str): a file-like object opening
for writing, or a filename.
All ``**kwargs`` are passed to :func:`lasio.writer.write` -- please
check the docstring of that function for more keyword arguments you can
use here!
Examples:
>>> import lasio
>>> las = lasio.read("tests/examples/sample.las")
>>> with open('test_output.las', mode='w') as f:
... las.write(f, version=2.0) # <-- this method
"""
opened_file = False
if isinstance(file_ref, basestring) and not hasattr(file_ref, "write"):
opened_file = True
file_ref = open(file_ref, "w")
writer.write(self, file_ref, **kwargs)
if opened_file:
file_ref.close()
[docs] def to_excel(self, filename):
"""Export LAS file to a Microsoft Excel workbook.
This function will raise an :exc:`ImportError` if ``openpyxl`` is not
installed.
Arguments:
filename (str)
"""
from . import excel
converter = excel.ExcelConverter(self)
converter.write(filename)
[docs] def to_csv(self, file_ref, mnemonics=True, units=True, units_loc="line", **kwargs):
"""Export to a CSV file.
Arguments:
file_ref (open file-like object or str): a file-like object opening
for writing, or a filename.
Keyword Arguments:
mnemonics (list, True, False): write mnemonics as a header line at the
start. If list, use the supplied items as mnemonics. If True,
use the curve mnemonics.
units (list, True, False): as for mnemonics.
units_loc (str or None): either 'line', '[]' or '()'. 'line' will put
units on the line following the mnemonics (good for WellCAD).
'[]' and '()' will put the units in either brackets or
parentheses following the mnemonics, on the single header line
(better for Excel)
**kwargs: passed to :class:`csv.writer`. Note that if
``lineterminator`` is **not** specified here, then it will be
sent to :class:`csv.writer` as ``lineterminator='\\n'``.
"""
opened_file = False
if isinstance(file_ref, basestring) and not hasattr(file_ref, "write"):
opened_file = True
file_ref = open(file_ref, "w")
if not "lineterminator" in kwargs:
kwargs["lineterminator"] = "\n"
writer = csv.writer(file_ref, **kwargs)
if mnemonics is True:
mnemonics = [c.original_mnemonic for c in self.curves]
if units is True:
units = [c.unit for c in self.curves]
if mnemonics:
if units_loc in ("()", "[]") and units:
mnemonics = [
m + " " + units_loc[0] + u + units_loc[1]
for m, u in zip(mnemonics, units)
]
writer.writerow(mnemonics)
if units:
if units_loc == "line":
writer.writerow(units)
for i in range(self.data.shape[0]):
writer.writerow(self.data[i, :])
if opened_file:
file_ref.close()
[docs] def match_raw_section(self, pattern, re_func="match", flags=re.IGNORECASE):
"""Find raw section with a regular expression.
Arguments:
pattern (str): regular expression (you need to include the tilde)
Keyword Arguments:
re_func (str): either "match" or "search", see python ``re`` module.
flags (int): flags for :func:`re.compile`
Returns:
dict
Intended for internal use only.
"""
for title in self.raw_sections.keys():
title = title.strip()
p = re.compile(pattern, flags=flags)
if re_func == "match":
re_func = re.match
elif re_func == "search":
re_func = re.search
m = re_func(p, title)
if m:
return self.raw_sections[title]
[docs] def get_curve(self, mnemonic):
"""Return CurveItem object.
Arguments:
mnemonic (str): the name of the curve
Returns:
:class:`lasio.las_items.CurveItem` (not just the data array)
"""
for curve in self.curves:
if curve.mnemonic == mnemonic:
return curve
def __getitem__(self, key):
"""Provide access to curve data.
Arguments:
key (str, int): either a curve mnemonic or the column index.
Returns:
1D :class:`numpy.ndarray` (the data for the curve)
"""
# TODO: If I implement 2D arrays, need to check here for :1 :2 :3 etc.
curve_mnemonics = [c.mnemonic for c in self.curves]
if isinstance(key, int):
return self.curves[key].data
elif key in curve_mnemonics:
return self.curves[key].data
else:
raise KeyError("{} not found in curves ({})".format(key, curve_mnemonics))
def __setitem__(self, key, value):
"""Append a curve.
Arguments:
key (str): the curve mnemonic
value (1D data or CurveItem): either the curve data, or a CurveItem
See :meth:`lasio.las.LASFile.append_curve_item` or
:meth:`lasio.las.LASFile.append_curve` for more details.
"""
if isinstance(value, CurveItem):
if key != value.mnemonic:
raise KeyError(
"key {} does not match value.mnemonic {}".format(
key, value.mnemonic
)
)
self.append_curve_item(value)
else:
# Assume value is an ndarray
self.append_curve(key, value)
[docs] def keys(self):
"""Return curve mnemonics."""
return [c.mnemonic for c in self.curves]
[docs] def values(self):
"""Return data for each curve."""
return [c.data for c in self.curves]
[docs] def items(self):
"""Return mnemonics and data for all curves."""
return [(c.mnemonic, c.data) for c in self.curves]
[docs] def iterkeys(self):
return iter(list(self.keys()))
[docs] def itervalues(self):
return iter(list(self.values()))
[docs] def iteritems(self):
return iter(list(self.items()))
@property
def version(self):
"""Header information from the Version (~V) section.
Returns:
:class:`lasio.las_items.SectionItems` object.
"""
return self.sections["Version"]
@version.setter
def version(self, section):
self.sections["Version"] = section
@property
def well(self):
"""Header information from the Well (~W) section.
Returns:
:class:`lasio.las_items.SectionItems` object.
"""
return self.sections["Well"]
@well.setter
def well(self, section):
self.sections["Well"] = section
@property
def curves(self):
"""Curve information and data from the Curves (~C) and data section..
Returns:
:class:`lasio.las_items.SectionItems` object.
"""
return self.sections["Curves"]
@curves.setter
def curves(self, section):
self.sections["Curves"] = section
@property
def curvesdict(self):
"""Curve information and data from the Curves (~C) and data section..
Returns:
dict
"""
d = {}
for curve in self.curves:
d[curve["mnemonic"]] = curve
return d
@property
def params(self):
"""Header information from the Parameter (~P) section.
Returns:
:class:`lasio.las_items.SectionItems` object.
"""
return self.sections["Parameter"]
@params.setter
def params(self, section):
self.sections["Parameter"] = section
@property
def other(self):
"""Header information from the Other (~O) section.
Returns:
str
"""
return self.sections["Other"]
@other.setter
def other(self, section):
self.sections["Other"] = section
@property
def metadata(self):
"""All header information joined together.
Returns:
:class:`lasio.las_items.SectionItems` object.
"""
s = SectionItems()
for section in self.sections:
for item in section:
s.append(item)
return s
@metadata.setter
def metadata(self, value):
raise NotImplementedError("Set values in the section directly")
@property
def header(self):
"""All header information
Returns:
dict
"""
return self.sections
[docs] def df(self):
"""Return data as a :class:`pandas.DataFrame` structure.
The first Curve of the LASFile object is used as the pandas
DataFrame's index.
"""
import pandas as pd
from pandas.api.types import is_object_dtype
df = pd.DataFrame(self.data, columns=[c.mnemonic for c in self.curves])
for column in df.columns:
if is_object_dtype(df[column].dtype):
try:
df[column] = df[column].astype(np.float64)
except ValueError:
pass
if len(self.curves) > 0:
df = df.set_index(self.curves[0].mnemonic)
return df
@property
def data(self):
return np.vstack([c.data for c in self.curves]).T
@data.setter
def data(self, value):
return self.set_data(value)
[docs] def set_data(self, array_like, names=None, truncate=False):
"""Set the data for the LAS; actually sets data on individual curves.
Arguments:
array_like (array_like or :class:`pandas.DataFrame`): 2-D data array
Keyword Arguments:
names (list, optional): used to replace the names of the existing
:class:`lasio.las_items.CurveItem` objects.
truncate (bool): remove any columns which are not included in the
Curves (~C) section.
Note: you can pass a :class:`pandas.DataFrame` to this method.
"""
try:
import pandas as pd
except ImportError:
pass
else:
if isinstance(array_like, pd.DataFrame):
return self.set_data_from_df(
array_like, **dict(names=names, truncate=False)
)
data = np.asarray(array_like)
# Truncate data array if necessary.
if truncate:
data = data[:, len(self.curves)]
# Extend curves list if necessary.
while data.shape[1] > len(self.curves):
self.curves.append(CurveItem(""))
if not names:
names = [c.original_mnemonic for c in self.curves]
else:
# Extend names list if necessary.
while len(self.curves) > len(names):
names.append("")
logger.debug("set_data. names to use: {}".format(names))
for i, curve in enumerate(self.curves):
curve.mnemonic = names[i]
curve.data = data[:, i]
self.curves.assign_duplicate_suffixes()
[docs] def set_data_from_df(self, df, **kwargs):
"""Set the LAS file data from a :class:`pandas.DataFrame`.
Arguments:
df (pandas.DataFrame): curve mnemonics are the column names.
The depth column for the curves must be the index of the
DataFrame.
Keyword arguments are passed to :meth:`lasio.las.LASFile.set_data`.
"""
df_values = np.vstack([df.index.values, df.values.T]).T
if (not "names" in kwargs) or (not kwargs["names"]):
kwargs["names"] = [df.index.name] + [
str(name) for name in df.columns.values
]
self.set_data(df_values, **kwargs)
[docs] def stack_curves(self, mnemonic, sort_curves=True):
"""Stack multi-channel curve data to a numpy 2D ndarray. Provide a
stub name (prefix shared by all curves that will be stacked) or a
list of curve mnemonic strings.
Keyword Arguments:
mnemonic (str or list): Supply the first several characters of
the channel set to be stacked. Alternatively, supply a list
of the curve names (mnemonics strings) to be stacked.
sort_curves (bool): Natural sort curves based on mnemonic prior
to stacking.
Returns:
2-D numpy array
"""
if isinstance(mnemonic, np.ndarray):
mnemonic = list(mnemonic)
if (not mnemonic) or (not all([i for i in mnemonic])):
raise ValueError("`mnemonic` must not contain empty element")
keys = self.curves.keys()
if isinstance(mnemonic, str):
channels = [i for i in keys if i.startswith(mnemonic)] or [mnemonic]
elif isinstance(mnemonic, Sequence):
channels = list(mnemonic)
else:
raise TypeError("`mnemonic` argument must be string or sequence")
print(channels)
if not set(keys).issuperset(set(channels)):
missing = ", ".join(set(channels).difference(set(keys)))
raise KeyError("{} not found in LAS curves.".format(missing))
if sort_curves:
nat_sort = lambda x: [
int(i) if i.isdigit() else i for i in re.split(r"(\d+)", x)
]
channels.sort(key=nat_sort)
indices = [keys.index(i) for i in channels]
return self.data[:, indices]
@property
def index(self):
"""Return data from the first column of the LAS file data (depth/time).
"""
return self.curves[0].data
@property
def depth_m(self):
"""Return the index as metres."""
if self._index_unit_contains("M"):
return self.index
elif self._index_unit_contains("F"):
return self.index * 0.3048
else:
raise exceptions.LASUnknownUnitError("Unit of depth index not known")
@property
def depth_ft(self):
"""Return the index as feet."""
if self._index_unit_contains("M"):
return self.index / 0.3048
elif self._index_unit_contains("F"):
return self.index
else:
raise exceptions.LASUnknownUnitError("Unit of depth index not known")
def _index_unit_contains(self, unit_code):
"""Check value of index_unit string, ignoring case
Args:
index unit code (string) e.g. 'M' or 'FT'
"""
return self.index_unit and (unit_code.upper() in self.index_unit.upper())
[docs] def add_curve_raw(self, mnemonic, data, unit="", descr="", value=""):
"""Deprecated. Use append_curve_item() or insert_curve_item() instead."""
return self.append_curve_item(self, mnemonic, data, unit, descr, value)
[docs] def append_curve_item(self, curve_item):
"""Add a CurveItem.
Args:
curve_item (lasio.CurveItem)
"""
self.insert_curve_item(len(self.curves), curve_item)
[docs] def insert_curve_item(self, ix, curve_item):
"""Insert a CurveItem.
Args:
ix (int): position to insert CurveItem i.e. 0 for start
curve_item (lasio.CurveItem)
"""
assert isinstance(curve_item, CurveItem)
self.curves.insert(ix, curve_item)
[docs] def add_curve(self, *args, **kwargs):
"""Deprecated. Use append_curve() or insert_curve() instead."""
return self.append_curve(*args, **kwargs)
[docs] def append_curve(self, mnemonic, data, unit="", descr="", value=""):
"""Add a curve.
Arguments:
mnemonic (str): the curve mnemonic
data (1D ndarray): the curve data
Keyword Arguments:
unit (str): curve unit
descr (str): curve description
value (int/float/str): value e.g. API code.
"""
return self.insert_curve(len(self.curves), mnemonic, data, unit, descr, value)
[docs] def insert_curve(self, ix, mnemonic, data, unit="", descr="", value=""):
"""Insert a curve.
Arguments:
ix (int): position to insert curve at i.e. 0 for start.
mnemonic (str): the curve mnemonic
data (1D ndarray): the curve data
Keyword Arguments:
unit (str): curve unit
descr (str): curve description
value (int/float/str): value e.g. API code.
"""
curve = CurveItem(mnemonic, unit, value, descr, data)
self.insert_curve_item(ix, curve)
[docs] def delete_curve(self, mnemonic=None, ix=None):
"""Delete a curve.
Keyword Arguments:
ix (int): index of curve in LASFile.curves.
mnemonic (str): mnemonic of curve.
The index takes precedence over the mnemonic.
"""
if ix is None:
ix = self.curves.keys().index(mnemonic)
self.curves.pop(ix)
@property
def json(self):
"""Return object contents as a JSON string."""
obj = OrderedDict()
for name, section in self.sections.items():
try:
obj[name] = section.json
except AttributeError:
obj[name] = json.dumps(section)
return json.dumps(obj)
@json.setter
def json(self, value):
raise Exception("Cannot set objects from JSON")
[docs]class Las(LASFile):
"""LAS file object.
Retained for backwards compatibility.
"""
pass
[docs]class JSONEncoder(json.JSONEncoder):
[docs] def default(self, obj):
if isinstance(obj, LASFile):
d = {"metadata": {}, "data": {}}
for name, section in obj.sections.items():
if isinstance(section, basestring):
d["metadata"][name] = section
else:
try:
d["metadata"][name] = section.dictview()
except:
for item in section:
d["metadata"][name].append(dict(item))
for curve in obj.curves:
d["data"][curve.mnemonic] = [
None if np.isnan(x) else x for x in curve.data
]
return d