import logging
import textwrap
import numpy as np
from copy import deepcopy
from .las_items import HeaderItem, CurveItem, SectionItems, OrderedDict
from . import defaults
from . import exceptions
logger = logging.getLogger(__name__)
[docs]def write(
las,
file_object,
version=None,
wrap=None,
STRT=None,
STOP=None,
STEP=None,
fmt="%.5f",
column_fmt=None,
len_numeric_field=None,
data_width=79,
header_width=60,
):
"""Write a LAS files.
Arguments:
las (:class:`lasio.LASFile`)
file_object (file-like object open for writing): output
version (float or None): version of written file, either 1.2 or 2.
If this is None, ``las.version.VERS.value`` will be used.
wrap (bool or None): whether to wrap the output data section.
If this is None, ``las.version.WRAP.value`` will be used.
STRT (float or None): value to use as STRT (note the data will not
be clipped). If this is None, the data value in the first column,
first row will be used.
STOP (float or None): value to use as STOP (note the data will not
be clipped). If this is None, the data value in the first column,
last row will be used.
STEP (float or None): value to use as STEP (note the data will not
be resampled and/or interpolated). If this is None, the STEP will
be estimated from the first two rows of the first column.
fmt (str): Python string formatting operator for numeric data to be
used.
column_fmt (dict or None): use this to set a different format string
for specific columns from the data ndarray. E.g. to use ``'%.3f'``
for the depth column and ``'%.2f'`` for all the other columns,
you would use ``fmt='%.2f', column_fmt={0: '%.3f'}``.
len_numeric_field (int): width of each numeric field column (must be
greater than than all the formatted numeric values in the file).
data_width (79): width of data field in characters
Creating an output file is not the only side-effect of this function. It
will also modify the STRT, STOP and STEP HeaderItems so that they correctly
reflect the ~Data section's units and the actual first, last, and interval
values.
However, passing a version to this write() function only changes the version
of the object written to. Example: las.write(myfile, version=2).
Lasio's internal-las-object version will remain separate and defined by
las.version.VERS.value
You should avoid calling this function directly - instead use the
:meth:`lasio.LASFile.write` method.
"""
if column_fmt is None:
column_fmt = {}
if wrap is None:
wrap = las.version["WRAP"] == "YES"
elif wrap is True:
las.version["WRAP"] = HeaderItem(
"WRAP", "", "YES", "Multiple lines per depth step"
)
elif wrap is False:
las.version["WRAP"] = HeaderItem("WRAP", "", "NO", "One line per depth step")
lines = []
version_section_to_write = deepcopy(las.version)
assert version in (1.2, 2, None)
if version is None:
version = las.version["VERS"].value
if version == 1.2:
version_section_to_write.VERS = HeaderItem(
"VERS", "", 1.2, "CWLS LOG ASCII STANDARD - VERSION 1.2"
)
elif version == 2:
version_section_to_write.VERS = HeaderItem(
"VERS", "", 2.0, "CWLS log ASCII Standard -VERSION 2.0"
)
if STRT is None:
STRT = las.index[0]
if STOP is None:
STOP = las.index[-1]
if STEP is None:
if (
STOP != STRT
): # prevents an error being thrown in the case of only a single sample being written
STEP = las.index[1] - las.index[0] # Faster than np.gradient
las.well["STRT"].value = STRT
las.well["STOP"].value = STOP
las.well["STEP"].value = STEP
# Check units
if las.curves[0].unit:
unit = las.curves[0].unit
else:
unit = las.well["STRT"].unit
las.well["STRT"].unit = unit
las.well["STOP"].unit = unit
las.well["STEP"].unit = unit
las.curves[0].unit = unit
# Write each section.
# get_formatter_function ( ** get_section_widths )
# ~Version
logger.debug("LASFile.write Version section, Version: %s" % (version))
lines.append("~Version ".ljust(header_width, "-"))
order_func = get_section_order_function("Version", version)
section_widths = get_section_widths("Version", version_section_to_write, version, order_func)
for header_item in version_section_to_write.values():
mnemonic = header_item.original_mnemonic
# logger.debug('LASFile.write ' + str(header_item))
order = order_func(mnemonic)
# logger.debug('LASFile.write order = %s' % (order, ))
logger.debug(
"LASFile.write %s order=%s section_widths=%s"
% (header_item, order, section_widths)
)
formatter_func = get_formatter_function(order, **section_widths)
line = formatter_func(header_item)
lines.append(line)
# ~Well
logger.debug("LASFile.write Well section")
lines.append("~Well ".ljust(header_width, "-"))
order_func = get_section_order_function("Well", version)
section_widths = get_section_widths("Well", las.well, version, order_func)
# logger.debug('LASFile.write well section_widths=%s' % section_widths)
for header_item in las.well.values():
header_item.value = standardize_value(header_item.value, header_item.unit)
mnemonic = header_item.original_mnemonic
order = order_func(mnemonic)
logger.debug(
"LASFile.write %s order=%s section_widths=%s"
% (header_item, order, section_widths)
)
formatter_func = get_formatter_function(order, **section_widths)
line = formatter_func(header_item)
lines.append(line)
# ~Curves
logger.debug("LASFile.write Curves section")
lines.append("~Curve Information ".ljust(header_width, "-"))
order_func = get_section_order_function("Curves", version)
section_widths = get_section_widths("Curves", las.curves, version, order_func)
for header_item in las.curves:
mnemonic = header_item.original_mnemonic
order = order_func(mnemonic)
formatter_func = get_formatter_function(order, **section_widths)
line = formatter_func(header_item)
lines.append(line)
# ~Params
logger.debug("LASFile.write Params section")
lines.append("~Params ".ljust(header_width, "-"))
order_func = get_section_order_function("Parameter", version)
section_widths = get_section_widths("Parameter", las.params, version, order_func)
for header_item in las.params.values():
header_item.value = standardize_value(header_item.value, header_item.unit)
mnemonic = header_item.original_mnemonic
order = order_func(mnemonic)
formatter_func = get_formatter_function(order, **section_widths)
line = formatter_func(header_item)
lines.append(line)
# ~Other
logger.debug("LASFile.write Other section")
lines.append("~Other ".ljust(header_width, "-"))
lines += las.other.splitlines()
logger.debug("LASFile.write ASCII section")
lines.append("~ASCII ".ljust(header_width, "-"))
file_object.write("\n".join(lines))
file_object.write("\n")
line_counter = len(lines)
# data_arr = np.column_stack([c.data for c in las.curves])
data_arr = las.data
nrows, ncols = data_arr.shape
logger.debug("Data section shape: {}".format((nrows, ncols)))
logger.debug("len_numeric_field = {}".format(len_numeric_field))
if len_numeric_field is None:
logger.debug("Calculating len_numeric_field. fmt = {}".format(fmt))
len_numeric_field = 10
test_fmt = fmt % np.pi
while len(test_fmt) > (len_numeric_field - 1):
logger.debug("test_fmt = {}".format(test_fmt))
len_numeric_field += 1
def format_data_section_line(n, fmt, l=len_numeric_field, spacer=" "):
try:
if np.isnan(n):
return spacer + str(las.well["NULL"].value).rjust(l)
else:
return spacer + (fmt % n).rjust(l)
except TypeError:
return spacer + str(n).rjust(l)
twrapper = textwrap.TextWrapper(width=data_width)
for i in range(nrows):
logger.debug("Writing data array row {} of {}".format(i + 1, nrows))
depth_slice = ""
for j in range(ncols):
col_fmt = fmt
if j in column_fmt:
col_fmt = column_fmt[j]
depth_slice += format_data_section_line(data_arr[i, j], col_fmt)
if wrap:
lines = twrapper.wrap(depth_slice)
logger.debug(
"LASFile.write Wrapped %d lines out of %s" % (len(lines), depth_slice)
)
else:
lines = [depth_slice]
for line in lines:
if version_section_to_write.VERS == 1.2 and len(line) > 255:
logger.warning(
"[v1.2] line #{} has {} chars (>256)".format(
line_counter + 1, len(line)
)
)
file_object.write(line + "\n")
line_counter += 1
def standardize_value(value, unit=None):
"""Ensure that 0 is written instead of 'None' for numeric header lines.
Args:
value (anything): object to be written into the value field
of the LAS header line.
unit (str): unit for header line.
Returns: either 0 (integer) or a string.
If an internal representation of a metadata mnemonic has a unit
indicator and a value of zero, an empty string, or None, then on
lasio.write(...) the value written should be 0 for the value field
instead of "None" or "".
"""
# value != 0 prevents overwriting a 0.0 value with 0.
if (unit) and (not value) and (value != 0):
value = 0
if value is None:
value = ""
return value
[docs]def get_section_order_function(
section, version, order_definitions=defaults.ORDER_DEFINITIONS
):
"""Get a function that returns the order per the mnemonic and section.
Arguments:
section (str): either 'well', 'params', 'curves', 'version'
version (float): either 1.2 and 2.0
Keyword Arguments:
order_definitions (dict): see source of defaults.py for more information
Returns:
A function which takes a mnemonic (str) as its only argument, and
in turn returns the order 'value:descr' or 'descr:value'.
"""
section_orders = order_definitions[version][section]
default_order = section_orders[0]
orders = {}
for order, mnemonics in section_orders[1:]:
for mnemonic in mnemonics:
orders[mnemonic] = order
return lambda mnemonic: orders.get(mnemonic, default_order)
[docs]def get_section_widths(section_name, items, version, order_func):
"""Find minimum section widths fitting the content in *items*.
Arguments:
section_name (str): either 'version', 'well', 'curves', or 'params'
items (SectionItems): section items
version (float): either 1.2 or 2.0
order_func (func): see :func:`lasio.writer.get_section_order_function`
"""
section_widths = {"left_width": None, "middle_width": None}
if len(items) > 0:
section_widths["left_width"] = max([len(i.original_mnemonic) for i in items])
middle_widths = []
for i in items:
order = order_func(i.mnemonic)
rhs_element = order.split(":")[0]
logger.debug(
"get_section_widths %s\n\torder=%s rhs_element=%s"
% (i, order, rhs_element)
)
middle_widths.append(len(str(i.unit)) + 1 + len(str(i[rhs_element])))
section_widths["middle_width"] = max(middle_widths)
return section_widths