"""QUIP driver.
The main program takes QUIP Operation XML file from WEx.
Then, it starts Ginga with custom plugins.
For more information, see :ref:`running-quip-doc`.
"""
# STDLIB
import glob
import multiprocessing
import os
import platform
import shutil
import sys
import warnings
from functools import partial
# THIRD-PARTY
from astropy.io import fits
from astropy.utils.data import get_pkg_data_filename, get_pkg_data_filenames
from astropy.utils.introspection import minversion
from astropy.wcs import FITSFixedWarning
# GINGA and STGINGA
from ginga.rv import main as gmain
from ginga.misc.Bunch import Bunch
# LOCAL
from . import qio
from ..utils.recenter import recenter
from ..utils.io import output_xml
# Suppress logging "no handlers" message from Ginga
import logging
logging.raiseExceptions = False
try:
logging.lastResort = None
except AttributeError:
pass
# Supress warning from astropy.wcs
warnings.filterwarnings('ignore', category=FITSFixedWarning)
__all__ = ['main', 'get_ginga_plugins', 'copy_ginga_files', 'set_ginga_config',
'shrink_input_images']
__taskname__ = 'QUIP'
_operational = 'false' # 'true' or 'false'
_tempdirname = 'quipcache' # Sub-dir to store temporary intermediate files
_iswin = platform.system() == 'Windows'
_home = None
QUIP_DIRECTIVE = None # Store info from input XML
QUIP_LOG = None # Store info for output log XML
STGINGA_GT_1_2 = minversion('stginga', '1.2.1')
# Set HOME directory
if 'HOME' in os.environ:
_home = os.environ['HOME']
elif _iswin:
_home = os.path.join(os.environ['HOMEDRIVE'], os.environ['HOMEPATH'])
else:
raise ValueError('Cannot find HOME directory')
[docs]def main(args):
"""Driver for command line script.
First argument must be the QUIP Operation XML file.
Other command line options are as accepted by Ginga, *except* for:
* ``--mosaic-thumb-size`` can be used to specify desired width in pixels
for individual images to be mosaicked in ``THUMBNAIL`` mode.
If not given, the default width is 500 pixels. For Segment ID,
the value is 256 regardless of this setting.
* ``--n-cores`` can be used to specify the number of CPU cores used when
rescaling images in ``THUMBNAIL`` mode. If not given, all available
cores will be used.
* ``--nocopy`` can be used with QUIP to instruct
it to *not* copy its Ginga files to user's HOME directory.
* ``--log=filename``, if given in command line, will be ignored
because QUIP always writes Ginga's log to ``ginga.log`` in the
output directory provided by QUIP Operation XML file.
Parameters
----------
args : list of str
Command line arguments.
Raises
------
OSError
Input XML does not exist.
ValueError
Input XML fails to validate built-in schema.
Validation is skipped for Windows.
"""
from stginga.gingawrapper import _locate_plugin
global QUIP_DIRECTIVE, QUIP_LOG
inputxml = args.pop(0)
if not os.path.exists(inputxml):
raise OSError(f'{inputxml} does not exist')
# Validate input XML (compare return code and display stderr if fails).
# Skipped for Windows because no xmllint.
if not _iswin:
schema_v = qio.validate_input_xml(inputxml)
if schema_v[0] != 0:
raise ValueError(schema_v[2])
if '--nocopy' in args:
nocopy = True
args.pop(args.index('--nocopy'))
else:
nocopy = False
# Copy Ginga files to HOME directory
if not nocopy:
copy_ginga_files()
thumb_width = 500
n_cores = None
for i, a in enumerate(args):
# Ignore any custom log file provided by user
if a.startswith('--log='):
args.pop(i)
# Custom width for THUMBNAIL mode
elif a.startswith('--mosaic-thumb-size='):
args.pop(i)
try:
thumb_width = int(a.split('=')[1])
except Exception:
pass # Use default
# Num cores for THUMBNAIL mode
elif a.startswith('--n-cores='):
args.pop(i)
try:
n_cores = int(a.split('=')[1])
except Exception:
pass # Use default
# Extract info from input XML
QUIP_DIRECTIVE = qio.input_xml(inputxml)
gingalog = os.path.join(
QUIP_DIRECTIVE['OUTPUT']['OUTPUT_DIRECTORY'], 'ginga.log')
images = QUIP_DIRECTIVE['IMAGES']['IMAGE_PATH']
op_type = QUIP_DIRECTIVE['OPERATION_TYPE'].lower()
# Create hidden temporary directory, in case we need it later.
# This is NOT automatically deleted.
tempdir = os.path.join(
os.path.dirname(os.path.abspath(inputxml)), _tempdirname)
if not os.path.exists(tempdir):
os.mkdir(tempdir)
# Initialize info for log XML.
# Do this here for time stamp and avoid circular import.
QUIP_LOG = qio.QUIPLog()
# No point keeping Ginga log from last run
if os.path.exists(gingalog):
os.remove(gingalog)
# Wavefront Maintenance will trigger QUIP Automatic Mode run a utility to
# recenter the images if needed and not launch ginga
if op_type == 'wavefront_maintenance':
output_images = recenter(images,
QUIP_DIRECTIVE['OUTPUT']['OUTPUT_DIRECTORY'],
doplot=False)
quipout = QUIP_DIRECTIVE['OUTPUT']['OUT_FILE_PATH']
output_xml(qio.quip_out_dict(output_images), quipout)
return
elif op_type == 'thumbnail':
cfgmode = 'mosaicmode'
ginga_config_py_sfx = op_type
sci_ext = ('SCI', 1)
# Science array can have different EXTNAME values:
# SCI (JWST/HST) or IMAGE (test)
# Assume first image is representative of all the rest.
with fits.open(images[0]) as pf:
if sci_ext not in pf:
sci_ext = ('IMAGE', 1)
# Auto guess the number of CPU cores needed.
if n_cores is None:
n_cores = min(multiprocessing.cpu_count(), len(images))
if STGINGA_GT_1_2:
shrink_extra_kwargs = {'sci_ext': sci_ext, 'use_dq': True}
else:
shrink_extra_kwargs = {'ext': sci_ext}
images = shrink_input_images(
images, new_width=thumb_width, n_cores=n_cores,
outpath=tempdir, **shrink_extra_kwargs)
elif op_type == 'segment_id':
cfgmode = 'mosaicmode'
ginga_config_py_sfx = op_type
images = _segid_mosaics(images, outpath=tempdir, sw_sca_size=256)
else: # different kinds of analysis
cfgmode = 'normalmode'
ginga_config_py_sfx = cfgmode
# Add custom plugins.
# NOTE: There was a bug with setting this in ginga_config.py,
# so we do this here instead.
global_plugins, local_plugins = get_ginga_plugins(ginga_config_py_sfx)
gmain.plugins += global_plugins
gmain.plugins += local_plugins
# Set Ginga config file(s)
set_ginga_config(mode=cfgmode, gcfg_suffix=ginga_config_py_sfx)
# Auto start core global plugins
for gplgname in ('ChangeHistory', ):
gplg = _locate_plugin(gmain.plugins, gplgname)
gplg.start = True
# Start Ginga
sys_args = ['ginga', f'--log={gingalog}'] + args + images
gmain.reference_viewer(sys_args)
[docs]def get_ginga_plugins(op_type):
"""Obtain relevant custom plugins from ``stginga`` and ``wss_tools``
for the given QUIP operation type.
Parameters
----------
op_type : {'normalmode', 'segment_id', 'thumbnail'}
QUIP operation type. Normal mode covers anything that is
neither SEGMENT_ID nor THUMBNAIL.
Returns
-------
global_plugins : list
List of custom Ginga global plugins to load.
local_plugins : list
List of custom Ginga local plugins to load.
"""
stg_pfx = 'stginga.plugins'
wss_pfx = 'wss_tools.quip.plugins'
global_plugins = [
Bunch(module='AboutQUIP', tab='AboutQUIP', workspace='left',
category='Custom', ptype='global', pfx=wss_pfx)]
if op_type == 'segment_id':
local_plugins = []
# Add special plugin for segment ID annotations
global_plugins += [
Bunch(module='SegIDHelper', tab='SegIDHelper', workspace='left',
category='Custom', ptype='global', pfx=wss_pfx)]
elif op_type == 'thumbnail':
local_plugins = [
Bunch(module='MosaicAuto', workspace='dialogs',
category='Custom', ptype='local', pfx=wss_pfx)]
else: # normalmode
global_plugins += [
Bunch(module='SaveQUIP', tab='SaveQUIP', workspace='right',
category='Custom', ptype='global', pfx=wss_pfx)]
local_plugins = [
Bunch(module='BackgroundSub', workspace='dialogs',
category='Custom', ptype='local', pfx=stg_pfx),
Bunch(module='BadPixCorr', workspace='dialogs',
category='Custom', ptype='local', pfx=stg_pfx),
Bunch(module='DQInspect', workspace='dialogs',
category='Custom', ptype='local', pfx=stg_pfx),
Bunch(module='SNRCalc', workspace='dialogs',
category='Custom', ptype='local', pfx=wss_pfx)]
return global_plugins, local_plugins
def _do_copy(src, dst, verbose=False):
"""Copy file."""
if os.path.exists(dst):
os.remove(dst)
shutil.copyfile(src, dst)
if verbose:
print(src, '->', dst)
[docs]def copy_ginga_files(verbose=False):
"""Copy Ginga configuration files to HOME directory.
Parameters
----------
verbose : bool
Print info to screen.
"""
# NOTE: There is no need to copy plugins here anymore.
#
# Copy configuration files.
dstpath = os.path.join(_home, '.ginga')
if not os.path.exists(dstpath):
os.makedirs(dstpath)
for filename in get_pkg_data_filenames('config', pattern='*.*'):
_do_copy(filename, os.path.join(dstpath, os.path.basename(filename)),
verbose=verbose)
[docs]def set_ginga_config(mode='normalmode', gcfg_suffix='normalmode',
verbose=False):
"""Replace Ginga files in HOME directory with the
appropriate version for the given mode.
This must be run *after* :func:`copy_ginga_files`, not before.
For a list of affected configuration files, see
:ref:`quip-doc-ginga-files`.
"normalmode" is set such that all images are always in cache.
This is useful if you want to do background subtraction etc.
in Ginga. However, it is not sustainable if there are too many
images opened at the same time. The auto-levels behavior is
same as Ginga default.
"mosaicmode" is designed such that only certain number of images
will be processed, although all images are still always cached.
As a result, it is prone to memory problem if run in "normalmode".
The auto-levels behavior is disabled by default to allow
blinking the images all at the same scale.
Parameters
----------
mode : {'normalmode', 'mosaicmode'}
Mode of analysis.
gcfg_suffix : {'normalmode', 'thumbnail', 'segment_id'}
Associated ``ginga_config.py`` to use. This is slightly
different from ``mode`` because "mosaicmode" can have
different requirements depending on operation type.
verbose : bool
Print info to screen.
"""
path = os.path.join(_home, '.ginga')
# Copy ginga_config.py
sfx = '.' + gcfg_suffix
src = os.path.join(path, 'ginga_config.py' + sfx)
_do_copy(src, src.replace(sfx, ''), verbose=verbose)
# Copy other Ginga files
sfx = '.' + mode
for src in glob.iglob(os.path.join(path, '*' + sfx)):
_do_copy(src, src.replace(sfx, ''), verbose=verbose)
# Iterable (infile) must be last argument.
def _shrink_one(outpath, ext, new_width, debug, kwargs, infile):
from stginga.utils import scale_image
with fits.open(infile) as pf:
old_width = pf[ext].data.shape[1] # (ny, nx)
# Shrink it.
if old_width > new_width:
path, fname = os.path.split(infile)
# Skipping instead of just returning the input image
# because want to avoid mosaicking large images.
if os.path.abspath(path) == outpath:
print('Input and output directories are the same: '
f'{outpath}; Skipping {fname}')
outfile = ''
else:
outfile = os.path.join(outpath, fname)
zoom_factor = new_width / old_width
scale_image(infile, outfile, zoom_factor, **kwargs)
# Input already small enough.
else:
outfile = infile
if debug:
print(f'{infile} has width {old_width} <= {new_width}; '
'Using input file')
return outfile
# Iterable (infile) must be last argument.
def _shrink_one_with_dq(outpath, sci_ext, new_width, dq_parser, debug, kwargs,
infile):
from stginga.utils import scale_image_with_dq # noqa
with fits.open(infile) as pf:
old_width = pf[sci_ext].data.shape[1] # (ny, nx)
# Shrink it.
if old_width > new_width:
path, fname = os.path.split(infile)
# Skipping instead of just returning the input image
# because want to avoid mosaicking large images.
if os.path.abspath(path) == outpath:
print('Input and output directories are the same: '
f'{outpath}; Skipping {fname}')
outfile = ''
else:
outfile = os.path.join(outpath, fname)
zoom_factor = new_width / old_width
scale_image_with_dq(infile, outfile, zoom_factor, dq_parser,
**kwargs)
# Input already small enough.
else:
outfile = infile
if debug:
print(f'{infile} has width {old_width} <= {new_width}; '
'Using input file')
return outfile
def _segid_mosaics(images, sw_sca_size=256, **kwargs):
"""Generate a scaled-down NIRCam mosaic for each exposure.
The mosaics are not deleted on exit;
User has to remove them manually.
Parameters
----------
images : list
List of input image files.
kwargs
See :meth:`~wss_tools.utils.mosaic.NircamMosaic.make_mosaic`.
Returns
-------
thumbnails : list
List of scaled-down mosaics in output directory.
"""
from ..utils.mosaic import NircamMosaic
m = NircamMosaic(sw_sca_size=sw_sca_size)
return m.make_mosaic(images, **kwargs)
def _main():
"""Run from command line."""
if len(sys.argv) <= 1:
print('USAGE: quip operation_file.xml [--mosaic-thumb-size=500] '
'[--n-cores=8] [--nocopy] [--help]')
elif '--help' in sys.argv:
from ginga.rv.main import reference_viewer
reference_viewer(['ginga', '--help'])
elif '--version' in sys.argv:
try:
from ..version import version
except ImportError:
version = 'unknown'
print(f'{__taskname__} v{version}')
else:
main(sys.argv[1:])