"""
Defining Observables
--------------------
Module ``obsterms`` defines the observables on quantum systems. Each observable
contains the corresponding methods to write the fortran files and to read the
results from the fortran outputs. The following observables are defined
Observables on any quantum sytem in OSMPS are specified by instantiating an
object of the :py:class:`obsterms.Observables` class, for example
.. code-block:: python
myObservables = Observables(Operators)
One then adds observables using the :py:func:`obsterms.Observables.AddObservable`
method. Each observable is itself a python class and has the corresponding
read/write methods Examples of the use of
:py:func:`obsterms.Observables.AddObservable` are:
* ``site`` observables (:py:class:`obsterms.SiteObservable`). Example :
.. code-block:: python
myObservables.AddObservable('site', 'nbtotal', name='n')
will measure :math:`\langle \hat{n} \\rangle` at each site and return the result
as a length-:math:`L` vector with the tag ``'n'``. Recall that :math:`L` is
either the system size in the case of a finite MPS or the unit cell length in
the case of iMPS as specified in the table in the description of
:py:func:`WriteFiles`
* Site entropy measurement (:py:class:`obsterms.SiteEntropyObservable`).
* Bond entropy measurement (:py:class:`obsterms.BondEntropyObservable`).
* ``corr`` observables (:py:class:`obsterms.CorrObservable` and
:py:class:`obsterms.FCorrObservable`). Example:
.. code-block:: python
myObservables.AddObservable('corr', ['nbtotal', 'nbtotal'], name='nn')
will measure :math:`\langle \hat{n}_i \hat{n}_j \\rangle` at each independent
pair of sites :math:`i \le j` and return the result with the tag ``'nn'``.
For finite-size MPS simulations the result is returned as an :math:`L \times L`
matrix, and for infinite-size MPS simulations the result is returned as a
length-:math:`L` vector, where :math:`L` is specified by
the ``SpecifyCorrelationRange`` method, see Sec. :ref:`sec_imps`. For
observables involving odd numbers of fermionic operators on a given site,
there is an optional ``Phase`` argument such as existed for the
:py:func:`mpo.MPO.AddMPOTerm` method. An example is
.. code-block:: python
myObservables.AddObservable('corr', ['fdagger', 'f'], name='spdm',
Phase=True)
which measures the fermionic correlation function
:math:`\langle \hat{f}_i^{\dagger}\hat{f}_j \\rangle`.
* ``string`` observables (:py:class:`obsterms.StringCorrObservable`). Example
(from ``Infiniteheisenbergspinone.py``):
.. code-block:: python
myObservables.AddObservable('string', ['sz', 'sz', 'phaseString'],
name='StringOP')
will measure :math:`\langle \hat{S}^z_i\prod_{k=i+1}^{j-1}\left(-1 \\right)^{i\pi
\hat{S}^z_k}\hat{S}^z_j \\rangle` at each independent pair of sites :math:`i\le j`
and return the result with the tag ``'StringOP'``. Note that ``phaseString``
has been defined to be :math:`\left(-1 \\right)^{i\pi \hat{S}^z_k}` only in this
particular case.
* ``MPO`` observables (:py:class:`obsterms.MPOObservable`). Example:
.. code-block:: python
DefectDensity = mps.MPO(Operators)
DefectDensity.AddMPOTerm('bond', ['sz', 'sz'])
myStaticObservables.AddObservable('MPO', DefectDensity, 'Defect Density')
will measure :math:`\sum_{\langle i,j \\rangle}\hat{S}^z_i\hat{S}^z_j` and return
the result with the tag ``'Defect Density'``. In this way, essentially
arbitrary many-body operators may be measured.
* Reduced density matrix, limited to one (:py:class:`obsterms.RhoiObservable`)
or two (:py:class:`obsterms.RhoijObservable`) sites at present, can be
measured as well. They can be added once as single item or list. An
empty list indicates to report the singular values at all bipartitions.
If passed as list, entries have to sorted ascending.
.. code-block:: python
myObservables.AddObservables('DensityMatrix_i', [])
myObservables.AddObservables('DensityMatrix_ij', [])
* It is possible to measure the mutual information matrix without writing
all the information about the reduced density matrices to the result files
(:py:class:`obsterms.MutualInformationObservable`).
.. code-block:: python
myObservables.AddObservables('MI', True)
* The Schmidt values (:py:class:`obsterms.LambdaObservable`) at the
bipartitions are another measurement, which gives a more detailed
picture than the bond entropy. They can be added once as single item
or list. An empty list indicates to report the singular values
at all bipartitions.
.. code-block:: python
myObservables.AddObservables('Lambda', [])
* Distance to a pure state (:py:class:`obsterms.DistancePsiObs`)
* Save state independent from checkpoints etc. (MPS only,
:py:class:`obsterms.StateObservable`)
* Gate observables (ED only, :py:class:`obsterms.GateObservable`).
* Distance to a density matrix (ED only, :py:class:`obsterms.DistanceRhoObs`).
Observables for the different algorithms in OSMPS are specified as part of the
``parameters`` dictionary, see Table in :py:func:`tools.WriteFiles`.
**Developer's Guide for new observables**
* Implement new class in this module, i.e. :py:mod:`obsterms`. We need the
minimal class functions ``__init__``, ``add``, ``write``, and ``read``.
* Add new observable to the ``data`` set and ``mpslist`` in :py:class:``Observables``.
* Modify ``ObsOps_types_templates.f90`` by adding a new observable to ``OBS_TYPE``
and possibly defining a new derived type for this observable.
* Define read, write, and destroy methods as far as applicable.
* Carry out measurements in all ``observe`` subroutines.
"""
import numpy as np
#import numpy.linalg as la
import hashlib
#import os.path
from copy import deepcopy
#from scipy.optimize import curve_fit
#from .mpoterm import hash_dict, DecomposeOperator
#from .convergence import FiniteTConvParam
[docs]def check_restore_timeevo(Params):
"""
Returns default value.
**Arguments**
Params : dictionary
the dictionary containing the parameters for the simulation.
"""
if(not Params['timeevo_restore']): return False
return True
[docs]class _ObsTerm():
[docs] def get_hash(self):
"""
Generate hash for this object based on the class dictionary
provided as default by python.
"""
return hash_dict(self.__dict__)
[docs] def __getitem__(self, key):
"""
Return the corresponding variable from the set of class variabels.
**Arguments**
key : string
String corresponding to a class variable.
"""
return self.__dict__[key]
[docs] def line_to_floats(self, Obsfile, nn):
"""
Read line in file handle and convert to 1d numpy array of floats.
**Arguments**
Obsfile : file handle
read next line in this file handle
nn : int
number of floats in the line.
"""
vec = Obsfile.readline().replace('\n', '').split(' ')[1:nn + 1]
try:
return np.array(list(map(float, vec)))
except:
print('Filename', Obsfile.name)
# Raise original exception
raise
[docs] def read_rmatrix(self, fh, d1, d2):
"""
Read a matrix in file handle and convert to 2d numpy array of floats.
**Arguments**
Obsfile : file handle
read next line in this file handle
d1 : int
number of rows in the matrix.
d2 : int
number of floats in one line corresponding to the number of
columns
"""
mat = np.zeros((d1, d2))
for ii in range(d1):
mat[ii, :] = self.line_to_floats(fh, d2)
return mat
[docs] def ignored(self, optargs, kwargs):
"""
Print warnings about ignored optional keyword parameters:
**Arguments**
optargs : list of strings
contains all the string identifiers of the keyword arguments
considered.
kwargs : dictionary
contains all the keywords arguments
"""
for key in kwargs:
if(key not in optargs):
print("The optional argument " + key + " is not considered "
+ "for the observable " + type(self).__name__ + ".")
[docs] def required(self, args, kwargs):
"""
Raise exception for required arguments not present in the optional
arguments.
**Arguments**
args : list of strings
contains all the string identifiers of the keyword arguments
considered.
kwargs : dictionary
contains all the keywords arguments
"""
for elem in args:
if(elem not in kwargs):
raise Exception("Required argument not passed to "
+ "AddObservables for "
+ type(self).__name__ + ".")
[docs] def setargs(self, args, argsval, kwargs):
"""
Set non key-worded argument in corresponding dictionary.
**Arguments**
args : list of strings
contains all the string identifiers of the keyword arguments
considered in the correct order.
argsval : list
contains the values passed by the user as optional
non-keyword arguments. Order must be as indicated
in documentation.
kwargs : dict
Dictionary with all arguments to be constructed. Here, we
set the corresponding non-keyword arguments passed.
"""
if(len(args) < len(argsval)):
raise Exception('Too many non-keyword arguments passed.')
for ii in range(len(argsval)):
if(args[ii] in kwargs):
raise Exception('Cannot pass non-keyword argument '
+ 'and keyword arguments to set same '
+ 'variable.')
kwargs[args[ii]] = argsval[ii]
[docs] def default(self, defargs, kwargs):
"""
Set the default arguments for a term if not passed as optional
arguments.
"""
for key in defargs:
if(key not in kwargs):
kwargs[key] = defargs[key]
[docs]class SiteObservable(_ObsTerm):
"""
Measurement of local observables on each site.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
temptype : str
Identification to add site observable. Must be ``site``.
term : str
Name of the operator to be measured.
name : str, keyword argument
Key to address measurement in dictionary of results.
Default to ``<`` + term + ``>``.
weight : float, keyword argument
Weighting the measurement.
Default to 1.0
**Example**
By default the bond entropy is measured. The measurement can be turned off
by calling the :py:func:`obsterms.Observables.AddObservable` function as follows.
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('site', 'nbtotal')
**Variables**
The following class variables are defined.
Op : list of strings
List contains the names of the operators to be measured.
weight : list of floats
The weights for each measurement.
name : list of strings
The key under which the results should be stored.
Ops : instance of :py:class:`Ops.OperatorList`
Contains the operators for checks.
"""
def __init__(self, Ops):
self.Op = []
self.weight = []
self.name = []
self.Ops = Ops
def __len__(self):
return len(self.Op)
def add(self, term, *args, **kwargs):
self.ignored(['name', 'weight'], kwargs)
self.setargs(['name', 'weight'], args, kwargs)
self.default({'weight' : 1.0, 'name' : '<' + term + '>'}, kwargs)
if(not term in self.Ops):
raise Exception("Operator passed to SiteObservable not found")
self.Op.append(term)
self.weight.append(kwargs['weight'])
self.name.append(kwargs['name'])
return
def write(self, Obsfile, **kwargs):
nn = len(self)
Obsfile.write('%16i\n'%(nn))
if(nn == 0): return
IntOp = kwargs['IntOp']
for ii in range(nn):
Hstring = '%30.15E'%(self.weight[ii])
Hstring += '%16i'%(IntOp[self.Op[ii]])
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results from file. The filehandle must be at the
corresponding position.
fh : filehandle
Open file with results. The next lines contain the
measurements of the site observables.
"""
if(len(self) == 0): return
param = kwargs['param']
for ii in range(len(self)):
val = self.line_to_floats(fh, param['L'])
yield self.name[ii], val
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key not in self.name): return
idx = self.name.index(key)
dh5 = gh5['site']
tmp = dh5[idx, :]
yield key, tmp
[docs]class SiteEntropyObservable(_ObsTerm):
"""
Measurement of the site entropy of each site along the chain.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
term : bool
Status of the site entropy measurement' ``True`` for measuring,
``False`` for not measuring.
**Example**
By default the site entropy is measured. The measurement can be turned off
by calling the :py:func:`obsterms.Observables.AddObservable` function as follows.
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('SiteEntropy', False)
Replacing ``False`` with ``True``, the site entropy can be turned on.
**Variables**
The following class variables are defined.
has : bool
``False`` if site entropy should not be saved, ``True`` otherwise
measuring the site entropy at every splitting.
"""
def __init__(self):
self.has = True
[docs] def add(self, term, *args, **kwargs):
"""
Add functions as method to turn site entropy measure on and off. Last
call determines if the site entropy is measured or not. For arguments
see :py:class:`obsterms.SiteEntropyObservable`
"""
self.ignored([], kwargs)
self.setargs([], args, kwargs)
self.has = term
[docs] def write(self, Obsfile, **kwargs):
"""
Write the information to a given file handle.
**Arguments**
Obsfile : filehandle
Write to this file.
"""
if(self.has):
Obsfile.write('T\n')
else:
Obsfile.write('F\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results from file. The filehandle must be at the
corresponding position.
**Arguments**
fh : filehandle
Open file with results. The next lines contain the
measurements of the Schmidt decomposition.
ll : int, keyword argument
The system size must be given as keyword argument.
"""
# Quick return (no measurement of site entropy)
if(not self.has): return
value = self.line_to_floats(fh, kwargs['ll'])
yield 'SiteEntropy', value
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key != 'SiteEntropy'): return
dh5 = gh5[key]
tmp = dh5[:]
yield key, tmp
[docs]class BondEntropyObservable(_ObsTerm):
"""
Measurement of the bond entropy at each splitting along the chain.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
termtype : str
Identification to modify bond entropy measure. Must be
``BondEntropy``.
term : bool
Status of the bond entropy measurement' ``True`` for measuring,
``False`` for not measuring.
**Example**
By default the bond entropy is measured. The measurement can be turned off
by calling the :py:func:`obsterms.Observables.AddObservable` function as follows.
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('BondEntropy', False)
Replacing ``False`` with ``True``, the bond entropy can be turned on.
**Variables**
The following class variables are defined.
has : bool
``False`` if bond entropy should not be saved, ``True`` otherwise
measuring the bond entropy at every splitting.
"""
def __init__(self):
self.has = True
[docs] def add(self, term, *args, **kwargs):
"""
Add functions as method to turn bond entropy measure on and off. Last
call determines if the bond entropy is measured or not. For arguments
see :py:class:`obsterms.BondEntropyObservable`
"""
self.ignored([], kwargs)
self.setargs([], args, kwargs)
self.has = term
[docs] def write(self, Obsfile, **kwargs):
"""
Write the information to a given file handle.
**Arguments**
Obsfile : filehandle
Write to this file.
"""
if(self.has):
Obsfile.write('T\n')
else:
Obsfile.write('F\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results from file. The filehandle must be at the
corresponding position.
**Arguments**
fh : filehandle
Open file with results. The next lines contain the
measurements of the Schmidt decomposition.
ll : int, keyword argument
The system size must be given as keyword argument.
"""
# Quick return (no measurement of bond entropy)
if(not self.has): return
value = self.line_to_floats(fh, kwargs['ll'] + 1)
yield 'BondEntropy', value
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key != 'BondEntropy'): return
dh5 = gh5[key]
tmp = dh5[:]
yield key, tmp
[docs]class CorrObservable(_ObsTerm):
"""
Measurement of correlation observables on all two-site combinations.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
temptype : str
Identification to add correlation observable. Must be ``corr``.
term : list of two strings or numpy array
Name of the two operators to be measured or numpy 2d array
representing an operator on two sites in the Hilbert space
to be decomposed.
name : str, keyword argument
Key to address measurement in dictionary of results.
Required.
weight : float, keyword argument
Weighting the measurement.
Default to 1.0
Phase : bool, optional
If given, must be ``False``, otherwise Fermi correlation will
be measured instead.
Default to ``False`` if not present.
**Example**
A two site correlation measurement can be added to the observables as follows.
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('corr', ['nbtotal', 'nbtotal'], name='<nn>')
**Variables**
The following class variables are defined.
Op : list of strings
List contains the names of the operators to be measured.
intid : list of integers
Index to which correlation pair of operators should be added.
weight : list of floats
The weights for each measurement.
name : list of strings
The key under which the results should be stored.
isherm : list of bools
Checks if observable is hermitian. If not hermitian, the complex
part of the correlation is measured as well.
Ops : instance of :py:class:`Ops.OperatorList`
Contains the operators for checks.
"""
def __init__(self, Ops, FCorr):
self.count = 0
self.Op = []
self.intid = []
self.name = []
self.weight = []
self.isherm = []
self.Ops = Ops
self.FCorr = FCorr
def __len__(self):
return len(self.Op)
def add(self, term, *args, **kwargs):
self.ignored(['name', 'weight', 'Phase'], kwargs)
self.setargs(['name', 'weight', 'Phase'], args, kwargs)
self.required(['name'], kwargs)
self.default({'weight' : 1.0, 'Phase' : False}, kwargs)
if(kwargs['Phase']):
# Keep old convenient syntax - Fermi correlation is taken
# care of here.
self.FCorr.add(term, **kwargs)
return
self.count += 1
if(isinstance(term, list) or isinstance(term, tuple)):
if(not (term[0] in self.Ops) and (term[1] in self.Ops)):
raise Exception("Operator passed to CorrObservable not found")
tmp0 = self.Ops[term[0]]
tmp1 = self.Ops[term[1]]
isherm0 = (np.max(np.abs(tmp0 - tmp0.transpose().conj())) < 1e-14)
isherm1 = (np.max(np.abs(tmp1 - tmp1.transpose().conj())) < 1e-14)
self.Op.append(term)
self.weight.append(kwargs['weight'])
self.intid.append(self.count)
self.name.append(kwargs['name'])
self.isherm.append(isherm0 and isherm1)
else:
# Decomposition request
isherm = (np.max(np.abs(term - term.transpose().conj())) < 1e-14)
# Decompose can consider symmetries - not considered here for
# observables
OperList = DecomposeOperator(term, None)
for ii in OperList:
[Ol, weightl] = self.Operators.check(ii[0])
[Or, weightr] = self.Operators.check(ii[1])
self.Op.append([Ol, Or])
self.weight.append(kwargs['weight'] * weightl * weightr)
self.intid.apend(self.count)
self.name.append(kwargs['name'])
self.isherm.append(isherm)
return
def write(self, Obsfile, **kwargs):
nn = len(self)
Obsfile.write('%16i%16i\n'%(nn, len(set(self.intid))))
if(nn == 0): return
IntOp = kwargs['IntOp']
for ii in range(nn):
Hstring = '%30.15E'%(self.weight[ii])
Hstring += '%16i'%(IntOp[self.Op[ii][0]])
Hstring += '%16i'%(IntOp[self.Op[ii][1]])
Hstring += '%16i'%(self.intid[ii])
Hstring += 'T' if(self.isherm[ii]) else 'F'
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results for the correlations from an open file
handle at the corresponding position.
fh : filehandle
Open file with results. The next lines contain the
measurements of the correlation matrices.
"""
if(len(self) == 0): return
param = kwargs['param']
dim = param['L']
mat = None
for ii in range(len(self)):
cmplx = (fh.readline().replace('\n', '') == 'C')
if((mat is None) and cmplx):
mat = np.zeros((dim, dim), dtype=np.complex128)
elif(mat is None):
mat = np.zeros((dim, dim), dtype=np.float64)
elif(cmplx and (mat.dtype == np.float64)):
mat = np.array(mat, dtype=np.complex128)
for jj in range(dim):
mat[jj, :] += self.line_to_floats(fh, dim)
if(cmplx):
mat[jj, :] += 1j * self.line_to_floats(fh, dim)
if(ii + 1 == len(self)):
yield self.name[ii], mat
mat = None
continue
if(self.name[ii] != self.name[ii + 1]):
yield self.name[ii], mat
mat = None
continue
[docs] def read_imps(self, fh, corr_range):
"""
Read the results for the correlation for an iMPS simulations.
Filehandle must be already open.
fh : filehandle
Open file handle with results at right position.
corr_range : int
Determines number of entries in correlations. For iMPS,
the correlations are not a square matrix.
"""
if(len(self) == 0): return
dim = corr_range + 1
mat = None
for ii in range(len(self)):
cmplx = (fh.readline().replace('\n', '') == 'C')
if((mat is None) and cmplx):
mat = np.zeros((dim), dtype=np.complex128)
elif(mat is None):
mat = np.zeros((dim), dtype=np.float64)
elif(cmplx and (mat.dtype == np.float64)):
mat = np.array(mat, dtype=np.complex128)
mat[:] += self.line_to_floats(fh, dim)
if(cmplx):
mat[:] += 1j * self.line_to_floats(fh, dim)
if(ii + 1 == len(self)):
yield self.name[ii], mat
mat = None
continue
if(self.name[ii] != self.name[ii + 1]):
yield self.name[ii], mat
mat = None
continue
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key not in self.name): return
idx = self.name.index(key)
dh5 = gh5['corr_r']
tmp = np.transpose(dh5[idx, :, :])
has_c = gh5['corr_rc'][idx]
if(has_c):
dh5 = gh5['corr_c']
tmp += 1j * np.transpose(dh5[idx, :, :])
yield key, tmp
[docs]class Corr2NNObservable(_ObsTerm):
"""
Measure the 4-site correlator between two pairs of nearest neighbors.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
temptype : str
Identification to add correlation observable. Must be ``corr2nn``.
term : list of four strings
Name of the operators acting on the sites i, i+1, j, j+1
name : str, keyword argument
Key to address measurement in dictionary of results.
Required.
weight : float, keyword argument
Weighting the measurement.
Default to 1.0
**Example**
This type of four-site correlator of two pairs of nearest neighbors can be
added to a measurement as
>>> Obs = mps.Observables(Operators)
>>> myObs.AddObservable('corr2nn', ['sigmaz', 'sigmaz', 'sigmaz', 'sigmaz'],
name='zzzz')
**Variables**
The following class variables are defined.
Op : list of strings
List contains the names of the operators to be measured
weight : list of floats
The weights for each measurement.
name : list of strings
The key under which the results should be stored.
isherm : list of bools
Checks if observable is hermitian. If not hermitian, the complex
part of the correlation is measured as well.
Ops : instance of :py:class:`Ops.OperatorList`
Contains the operators for checks.
**Details**
The resulting matrix is always upper triangular. The measurement of i, i+1, j, j+1
is stored in the matrix element [i, j] and i < j for all measurements. Therefore,
the first two columns and the last column is empty. Moreover, the last three rows
are empty.
"""
def __init__(self, Ops):
self.Op = []
self.name = []
self.weight = []
self.isherm = []
self.Ops = Ops
def __len__(self):
return len(self.Op)
def add(self, term, *args, **kwargs):
self.ignored(['name', 'weight'], kwargs)
self.setargs(['name', 'weight'], args, kwargs)
self.required(['name'], kwargs)
self.default({'weight' : 1.0}, kwargs)
self.Op.append(term)
self.weight.append(kwargs['weight'])
self.name.append(kwargs['name'])
isherm = True
for ii in range(4):
tmp = self.Ops[term[1]]
ishermii = (np.max(np.abs(tmp - tmp.transpose().conj())) < 1e-14)
isherm = isherm and ishermii
self.isherm.append(isherm)
return
def write(self, Obsfile, **kwargs):
nn = len(self)
Obsfile.write('%16i\n'%(nn))
if(nn == 0): return
IntOp = kwargs['IntOp']
for ii in range(nn):
Hstring = '%30.15E'%(self.weight[ii])
Hstring += '%16i'%(IntOp[self.Op[ii][0]])
Hstring += '%16i'%(IntOp[self.Op[ii][1]])
Hstring += '%16i'%(IntOp[self.Op[ii][2]])
Hstring += '%16i'%(IntOp[self.Op[ii][3]])
Hstring += 'T' if(self.isherm[ii]) else 'F'
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results for the correlations from an open file
handle at the corresponding position.
fh : filehandle
Open file with results. The next lines contain the
measurements of the correlation matrices.
"""
if(len(self) == 0): return
param = kwargs['param']
dim = param['L']
mat = None
for ii in range(len(self)):
cmplx = (fh.readline().replace('\n', '') == 'C')
if((mat is None) and cmplx):
mat = np.zeros((dim, dim), dtype=np.complex128)
elif(mat is None):
mat = np.zeros((dim, dim), dtype=np.float64)
elif(cmplx and (mat.dtype == np.float64)):
mat = np.array(mat, dtype=np.complex128)
for jj in range(dim):
mat[jj, :] += self.line_to_floats(fh, dim)
if(cmplx):
mat[jj, :] += 1j * self.line_to_floats(fh, dim)
if(ii + 1 == len(self)):
yield self.name[ii], mat
mat = None
continue
if(self.name[ii] != self.name[ii + 1]):
yield self.name[ii], mat
mat = None
continue
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key not in self.name): return
idx = self.name.index(key)
# Data handle
dh5 = gh5['corr2nn_r']
tmp = np.transpose(dh5[idx, :, :])
has_c = gh5['corr2nn_rc'][idx]
if(has_c):
dh5 = gh5['corr2nn_c']
tmp += 1j * np.transpose(dh5[idx, :, :])
yield key, tmp
[docs]class FCorrObservable(_ObsTerm):
"""
Measurement of correlation observables on all two-site combinations
with a Fermi phase yielded from a Jordan-Wigner transformation.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
temptype : str
Identification to add correlation observable. Must be ``corr``.
term : list of two strings or numpy array
Name of the two operators to be measured or numpy 2d array
representing an operator on two sites in the Hilbert space
to be decomposed.
name : str, keyword argument
Key to address measurement in dictionary of results.
Required.
weight : float, keyword argument
Weighting the measurement.
Default to 1.0
Phase : bool, keyword argument
Must be given and must be true for Fermi correlation.
**Example**
By default the bond entropy is measured. The measurement can be turned off
by calling the :py:func:`obsterms.Observables.AddObservable` function as follows.
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('corr', ['nftotal', 'nftotal'], name='<nn>',
>>> Phase=True)
**Variables**
The following class variables are defined.
Op : list of strings
List contains the names of the operators to be measured.
intid : list of integers
Index to which correlation pair of operators should be added.
weight : list of floats
The weights for each measurement.
name : list of strings
The key under which the results should be stored.
isherm : list of bools
Checks if observable is hermitian. If not hermitian, the complex
part of the correlation is measured as well.
Ops : instance of :py:class:`Ops.OperatorList`
Contains the operators for checks.
"""
def __init__(self, Ops):
self.count = 0
self.Op = []
self.intid = []
self.name = []
self.weight = []
self.isherm = []
self.Ops = Ops
[docs] def __len__(self):
"""
Return the number of Fermi correlation observables.
"""
return len(self.Op)
def add(self, term, **kwargs):
# ignored, setargs, required and default are already called
# in CorrObservable, every other way to get here is not
# supported a priori and user should then know what is
# going on
self.count += 1
if(isinstance(term, list) or isinstance(term, tuple)):
if(not (term[0] in self.Ops) and (term[1] in self.Ops)):
raise Exception("Operator passed to FCorrObservable not found")
tmp0 = self.Ops[term[0]]
tmp1 = self.Ops[term[1]]
isherm0 = (np.max(np.abs(tmp0 - tmp0.transpose().conj())) < 1e-14)
isherm1 = (np.max(np.abs(tmp1 - tmp1.transpose().conj())) < 1e-14)
self.Op.append(term)
self.weight.append(kwargs['weight'])
self.intid.append(self.count)
self.name.append(kwargs['name'])
# Check for hermitian would have to include Fermi phase
self.isherm.append(False)
else:
# Decomposition request
print("Warning: phased operator is being decomposed - be careful!")
isherm = (np.max(np.abs(term - term.transpose().conj())) < 1e-14)
OperList = DecomposeOperator(term)
for ii in OperList:
[Ol, weightl] = self.Operators.check(ii[0])
[Or, weightr] = self.Operators.check(ii[1])
self.Op.append([Ol, Or])
self.weight.append(kwargs['weight'] * weightl * weightr)
self.intid.apend(self.count)
self.name.append(kwargs['name'])
# Check for hermitian would have to include Fermi phase
self.isherm.append(False)
return
def write(self, Obsfile, **kwargs):
nn = len(self)
Obsfile.write('%16i%16i\n'%(nn, len(set(self.intid))))
if(nn == 0): return
IntOp = kwargs['IntOp']
# Operator for Fermi phase
Obsfile.write('%16i\n'%(IntOp['FermiPhase']))
for ii in range(nn):
Hstring = '%30.15E'%(self.weight[ii])
Hstring += '%16i'%(IntOp[self.Op[ii][0]])
Hstring += '%16i'%(IntOp[self.Op[ii][1]])
Hstring += '%16i'%(self.intid[ii])
Hstring += 'T' if(self.isherm[ii]) else 'F'
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results for the correlations with Fermi phase from an
open file handle at the corresponding position.
fh : filehandle
Open file with results. The next lines contain the
measurements of the correlation matrices.
"""
if(len(self) == 0): return
param = kwargs['param']
dim = param['L']
mat = None
for ii in range(len(self)):
cmplx = (fh.readline().replace('\n', '') == 'C')
if((mat is None) and cmplx):
mat = np.zeros((dim, dim), dtype=np.complex128)
elif(mat is None):
mat = np.zeros((dim, dim), dtype=np.float64)
elif(cmplx and (mat.dtype == np.float64)):
mat = np.array(mat, dtype=np.complex128)
for jj in range(dim):
mat[jj, :] += self.line_to_floats(fh, dim)
if(cmplx):
mat[jj, :] += 1j * self.line_to_floats(fh, dim)
if(ii + 1 >= len(self)):
yield self.name[ii], mat
mat = None
continue
if(self.name[ii] != self.name[ii + 1]):
yield self.name[ii], mat
mat = None
continue
[docs] def read_imps(self, fh, corr_range):
"""
Read the results for the Fermi correlation for an iMPS simulations.
Filehandle must be already open.
fh : filehandle
Open file handle with results at right position.
corr_range : int
Determines number of entries in correlations. For iMPS,
the correlations are not a square matrix.
"""
if(len(self) == 0): return
dim = corr_range + 1
mat = None
for ii in range(len(self)):
cmplx = (fh.readline().replace('\n', '') == 'C')
if((mat is None) and cmplx):
mat = np.zeros((dim), dtype=np.complex128)
elif(mat is None):
mat = np.zeros((dim), dtype=np.float64)
elif(cmplx and (mat.dtype == np.float64)):
mat = np.array(mat, dtype=np.complex128)
mat[:] += self.line_to_floats(fh, dim)
if(cmplx):
mat[:] += 1j * self.line_to_floats(fh, dim)
if(ii + 1 >= len(self)):
yield self.name[ii], mat
mat = None
continue
if(self.name[ii] != self.name[ii + 1]):
yield self.name[ii], mat
mat = None
continue
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key not in self.name): return
idx = self.name.index(key)
# Data handle
dh5 = gh5['fcor_r']
tmp = np.transpose(dh5[idx, :, :])
has_c = gh5['fcor_rc'][idx]
if(has_c):
dh5 = gh5['fcor_c']
tmp += 1j * np.transpose(dh5[idx, :, :])
yield key, tmp
[docs]class StringCorrObservable(_ObsTerm):
"""
Measurement of a string correlation observables on all two-site
combinations with an additional operator applied to sites in
between, e.g., a Fermi phase yielded from a Jordan-Wigner
transformation.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
temptype : str
Identification to add correlation observable. Must be ``corr``.
term : list of three strings
Name of the two operators to be measured. The third operator is
the phase operator.
name : str, keyword argument
Key to address measurement in dictionary of results.
Required.
weight : float, keyword argument
Weighting the measurement.
Default to 1.0
**Example**
You can add a string correlation observable by calling the
:py:func:`obsterms.Observables.AddObservable` function as follows.
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('string', ['sz', 'sz', 'phasestring'],
>>> name='StringOP', Phase=True)
**Variables**
The following class variables are defined.
Op : list of strings
List contains the names of the operators to be measured. It
is a listed list.
weight : list of floats
The weights for each measurement.
name : list of strings
The key under which the results should be stored.
Ops : instance of :py:class:`Ops.OperatorList`
Contains the operators for checks.
"""
def __init__(self, Ops):
self.Op = []
self.weight = []
self.name = []
self.Ops = Ops
def __len__(self):
return len(self.Op)
def add(self, term, *args, **kwargs):
self.ignored(['name', 'weight'], kwargs)
self.setargs(['name', 'weight'], args, kwargs)
self.required(['name'], kwargs)
self.default({'weight' : 1.0}, kwargs)
if(not (isinstance(term, list) or isinstance(term, tuple))):
raise Exception("Adding a string observable requires a "
+ "three-element list for term!")
if(len(term) != 3):
raise Exception("Adding a string observable requires a "
+ "three-element list for term!")
if(not ((term[0] in self.Ops) and (term[1] in self.Ops) and
(term[2] in self.Ops))):
raise Exception("Operator passed in to AddStringObservable "
+ "is not in Operator list.")
self.Op.append(term)
self.weight.append(kwargs['weight'])
self.name.append(kwargs['name'])
return
def write(self, Obsfile, **kwargs):
nn = len(self)
Obsfile.write('%16i\n'%(nn))
if(nn == 0): return
IntOp = kwargs['IntOp']
for ii in range(nn):
Hstring = '%30.15E'%(self.weight[ii])
Hstring += '%16i'%(IntOp[self.Op[ii][0]])
Hstring += '%16i'%(IntOp[self.Op[ii][1]])
Hstring += '%16i'%(IntOp[self.Op[ii][2]])
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results for the correlations with Fermi phase from an
open file handle at the corresponding position.
fh : filehandle
Open file with results. The next lines contain the
measurements of the correlation matrices.
"""
if(len(self) == 0): return
param = kwargs['param']
dim = param['L']
for ii in range(len(self)):
mat = np.zeros((dim, dim), dtype=np.float64)
for jj in range(dim):
mat[jj, :] = self.line_to_floats(fh, dim)
yield self.name[ii], mat
[docs] def read_imps(self, fh, corr_range):
"""
Read the results for the string correlation for an iMPS simulations.
Filehandle must be already open.
fh : filehandle
Open file handle with results at right position.
corr_range : int
Determines number of entries in correlations. For iMPS,
the correlations are not a square matrix.
"""
if(len(self) == 0): return
dim = corr_range + 1
for ii in range(len(self)):
mat = np.zeros((dim), dtype=np.float64)
mat[:] = self.line_to_floats(fh, dim)
yield self.name[ii], mat
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key not in self.name): return
idx = self.name.index(key)
# Data handle
dh5 = gh5['string_obs']
tmp = np.transpose(dh5[idx, :, :])
yield key, tmp
[docs]class MPOObservable(_ObsTerm):
"""
Measurement of an MPO beyond the default measurement of the Hamiltonian
MPO.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
termtype : str
Identification as MPO measurement, string must be 'MPO'.
term : instance of class :py:class:`MPO.MPO`.
The MPO to be measured.
name : str, keyword arguments
The key to access the result in the result dictionary.
Must be present.
The MPO observables should not depend on time-dependent Hamiltonian parameters.
They are not constructed for each time step.
**Example**
You can add an MPO observable by calling the
:py:func:`obsterms.Observables.AddObservable` function as follows:
>>> DefectDensity = mps.MPO(Operators)
>>> DefectDensity.AddMPOTerm('bond', ['sz', 'sz'])
>>>
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('MPO', DefectDensity, name='DefectDensity')
**Variables**
Op : list of :py:class:`MPO.MPO`
Contains the MPO for each measurement in the list.
name : list of strings
The key under which the results should be stored.
weight : list of floats
The weights for each measurement.
"""
def __init__(self):
self.Op = []
self.name = []
self.weight = []
def __len__(self):
return len(self.Op)
def add(self, term, *args, **kwargs):
self.ignored(['name', 'weight'], kwargs)
self.setargs(['name', 'weight'], args, kwargs)
self.required(['name'], kwargs)
self.default({'weight' : 1.0}, kwargs)
self.Op.append(term)
self.name.append(kwargs['name'])
self.weight.append(kwargs['weight'])
return
def write(self, Obsfile, **kwargs):
nn = len(self)
Obsfile.write('%16i\n'%(nn))
if(nn == 0): return
filestub = kwargs['filestub']
param = kwargs['param']
IntOp = kwargs['IntOp']
IntHparam = kwargs['IntHparam']
for ii in range(nn):
mpofilename = filestub + 'ObsMPO_' + str(ii + 1) + '.dat'
param['int_files'].append(mpofilename)
fhmpo = open(mpofilename, 'w+')
self.Op[ii].write(fhmpo, param, IntOp, IntHparam)
fhmpo.close()
if('/' in mpofilename): mpofilename = mpofilename.split('/')[-1]
Obsfile.write(mpofilename + '\n')
return
def read(self, fh, **kwargs):
if(len(self) == 0): return
for ii in range(len(self)):
tmp = self.line_to_floats(fh, 1)[0]
yield self.name[ii], tmp * self.weight[ii]
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key not in self.name): return
idx = self.name.index(key)
# Data handle
dh5 = gh5['mpo_obs']
tmp = dh5[idx] * self.weight[idx]
yield key, tmp
[docs]class RhoiObservable(_ObsTerm):
"""
Defines the measurement of single site reduced density matrices.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
termtype : str
Identification as single site reduced density matrig, string
must be 'DensityMatrix_i'.
term : list of integers
Indices of the sites to be measured. If list is empty, it is
filled with all sites.
**Example**
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservables('DensityMatrix_i', [])
**Variables**
has_rhoi : bool
Flag if any density matrix is measured
ij : ``None``, list of integers
Contains the indices of the reduced density matrices to be
measured.
ij_orig : ``None``, list of integers
Contains the original list past by the user; this needs to be
stored since empty list in 'ij' are overwritten with the sites
1 .. L.
"""
def __init__(self):
self.has_rhoi = False
self.ij = None
self.ij_orig = None
def add(self, term, *args, **kwargs):
self.ignored([], kwargs)
self.setargs([], args, kwargs)
if(self.has_rhoi):
raise Exception("Only call AddObservable once for Density"
+ "Matrices! If multiple density matrices are "
+ "desired, pass them as a list!")
self.has_rhoi = True
self.ij = term
self.i_orig = deepcopy(term)
[docs] def __len__(self):
"""
Length is either zero or one depending on boolean ``has_rhoi``.
"""
return int(self.has_rhoi)
[docs] def update(self, param):
"""
Update the observable for the current setting of the simulation
dictionary.
**Arguments**
param : dictionary
mainly passed to extract the number of sites for each simulation.
"""
if(not self.has_rhoi): return
self.ij = deepcopy(self.i_orig)
if(len(self.ij) == 0):
# Empty list for data means fill with all possible values
for ii in range(param['L']):
self.ij.append(ii + 1)
else:
# Check consistency of data only
if(any([not isinstance(x, int) for x in self.ij])):
raise Exception("Set of indices passed to DensityMatrix_i "
+ "should be integers!")
if(any([x < 1 for x in self.ij])):
raise Exception("Set of indices passed to DensityMatrix_i "
+ "should be in the range [1,L]!")
if(any([x > param['L'] for x in self.ij])):
raise Exception("Set of indices passed to DensityMatrix_i "
+ "should be in the range [1,L]!")
if(len(self.ij)!=len(set(self.ij))):
raise Exception("Set of indices passed to DensityMatrix_i "
+ "should not contain repetitions!")
return
def write(self, Obsfile, **kwargs):
if(not self.has_rhoi):
Obsfile.write('%16i\n'%(0))
return
param = kwargs['param']
self.update(param)
nn = len(self.ij)
Obsfile.write('%16i\n'%(nn))
if(nn == 0): return
Hstring = ''
for ii in sorted(self.ij):
Hstring += '%16i'%(ii)
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the local density matrices.
**Arguments**
fh : filehandle
Open file with results. The next lines contain the
measurements of the correlation matrices.
param : dictionary (as keyword argument)
Contains the settings for the simulation.
dynamic : bool (as keyword argument)
Flag if dynamics (True) or statics (False) are read.
ld : int (as keyword argument)
Local dimension of the Hilbert space.
"""
if(not self.has_rhoi): return
param = kwargs['param']
self.update(param)
dynamics = kwargs['dynamic']
ld = kwargs['ld']
cmplx = dynamics or param['has_complex_op']
for ii in sorted(self.ij):
mat = self.line_to_floats(fh, ld**2)
mat = np.reshape(mat, (ld, ld)).transpose()
if(cmplx):
imat = self.line_to_floats(fh, ld**2)
imat = np.reshape(imat, (ld, ld)).transpose()
mat = mat + 1j * imat
yield 'rho_' + str(ii), mat
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(len(key) < 5): return
if(key[:4] != 'rho_'): return
if(key.count('_') != 1): return
dh5 = gh5[key]
tmp = dh5[:, :]
if(tmp.shape[0] == 2):
tmp = tmp[0, :] + 1j * tmp[1, :]
dim = int(np.sqrt(tmp.shape[-1]))
tmp = np.reshape(tmp, [dim, dim])
tmp = np.transpose(tmp)
yield key, tmp
[docs]class RhoijObservable(_ObsTerm):
"""
Defines the measurement of two-site site reduced density matrices.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
termtype : str
Identification as two-site reduced density matrix, string
must be 'DensityMatrix_ij'.
term : list of integers
Pairs of Indices of the sites to be measured. If list is empty,
it is filled with all sites. Indices must be ascending.
**Example**
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservables('DensityMatrix_ij', [])
**Variables**
has_rhoij : bool
Flag if any density matrix is measured
ij : ``None``, list of integers
Contains the pairs of indices of the reduced density matrices to be
measured.
ij_orig : ``None``, list of integers
Contains the original list past by the user; this needs to be
stored since empty list in 'ij' are overwritten.
"""
def __init__(self):
self.has_rhoij = False
self.ij = None
self.ij_orig = None
def add(self, term, *args, **kwargs):
self.ignored([], kwargs)
self.setargs([], args, kwargs)
if(self.has_rhoij):
raise Exception("Only call AddObservable once for Density"
+ "Matrices! If multiple density matrices are "
+ "desired, pass them as a list!")
self.has_rhoij = True
self.ij = term
self.ij_orig = deepcopy(term)
[docs] def __len__(self):
"""
Length is either zero or one depending on boolean ``has_rhoij``.
"""
return int(self.has_rhoij)
[docs] def update(self, param):
"""
Update the observable for the current setting of the simulation
dictionary.
**Arguments**
param : dictionary
mainly passed to extract the number of sites for each simulation.
"""
if(not self.has_rhoij): return
self.ij = deepcopy(self.ij_orig)
if(len(self.ij) == 0):
# Empty list for data means fill with all possible values
for ii in range(param['L'] - 1):
for jj in range(ii + 1, param['L']):
self.ij.append([ii + 1, jj + 1])
else:
if not isinstance(self.ij, list):
raise Exception("Set of indices passed into DensityMatrix_ij "
+ "should be a list of lists, e.g. [[1,2], "
+ "[1,3]]")
unrolledpairs = []
for pair in self.ij:
if(not isinstance(pair, list)):
raise Exception("Set of indices passed into "
+ "DensityMatrix_ij should be a list of "
+ "lists, e.g. [[1,2], [1,3]]")
if(pair[0] >= pair[1]):
raise Exception("Set of indices passed into "
+ "DensityMatrix_ij should have i<j!")
if(any([not isinstance(x, int) for x in pair])):
raise Exception("Set of indices passed to "
+ "DensityMatrix_ij should be integers!")
if(any([x < 1 for x in pair])):
raise Exception("Set of indices passed to "
+ "DensityMatrix_ij should be in the "
+ "range [1,L]!")
if(any([x > param['L'] for x in pair])):
raise Exception("Set of indices passed to "
+ "DensityMatrix_ij should be in the "
+ "range [1,L]!")
# generate identifier integer to check for unique elements
# and if sorted.
unrolledpairs.append((pair[0] - 1) * param['L'] + pair[1])
if(len(set(unrolledpairs)) != len(unrolledpairs)):
raise Exception("Set of indices passed to DensityMatrix_ij "
+ "should not contain repetitions!")
if(unrolledpairs != sorted(unrolledpairs)):
raise Exception("Set of indices passed to DensityMatrix_ij "
+ "should be sorted, e.g. [1,2] occurs before "
+ "[1,3] and [2,3]!")
return
def write(self, Obsfile, **kwargs):
if(not self.has_rhoij):
Obsfile.write('%16i\n'%(0))
return
param = kwargs['param']
self.update(param)
unrolledpairs = []
for pair in self.ij:
unrolledpairs.append((pair[0] - 1) * param['L'] + pair[1])
numis = len(set([pairs[0] for pairs in self.ij]))
Hstring = '%16i\n'%(numis)
qq = 0
while(qq < len(unrolledpairs)):
ival = self.ij[qq][0]
jvals = [self.ij[qq][1]]
if(qq == len(unrolledpairs) - 1):
Hstring += '%16i%16i\n'%(len(jvals), ival)
for jj in jvals:
Hstring += '%16i'%(jj)
break
while(self.ij[qq + 1][0] == self.ij[qq][0]):
jvals.append(self.ij[qq + 1][1])
qq += 1
if(qq == len(unrolledpairs) - 1):
break
Hstring += '%16i%16i\n'%(len(jvals), ival)
for jj in jvals:
Hstring += '%16i'%(jj)
Hstring += '\n'
qq += 1
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the two-site density matrices.
**Arguments**
fh : filehandle
Open file with results. The next lines contain the
measurements of the correlation matrices.
param : dictionary (as keyword argument)
Contains the settings for the simulation.
dynamic : bool (as keyword argument)
Flag if dynamics (True) or statics (False) are read.
ld : int (as keyword argument)
Local dimension of the Hilbert space.
"""
if(not self.has_rhoij): return
param = kwargs['param']
self.update(param)
dynamic = kwargs['dynamic']
ld = kwargs['ld']
cmplx = dynamic or param['has_complex_op']
ldsq = ld**2
for pair in self.ij:
mat = self.line_to_floats(fh, ldsq**2)
mat = np.reshape(mat, (ldsq, ldsq)).transpose()
if(cmplx):
imat = self.line_to_floats(fh, ldsq**2)
imat = np.reshape(imat, (ldsq, ldsq)).transpose()
mat = mat + 1j * imat
# Account for column major ordering of sub-indices
mat = np.reshape(np.transpose(np.reshape(mat, 4 * [ld]),
[1, 0, 3, 2]), 2 * [ldsq])
yield 'rho_' + str(pair[0]) + '_' + str(pair[1]), mat
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(len(key) < 7): return
if(key[:4] != 'rho_'): return
if(key.count('_') != 2): return
dh5 = gh5[key]
tmp = dh5[:, :]
if(tmp.shape[0] == 2):
tmp = tmp[0, :] + 1j * tmp[1, :]
dim = int(np.sqrt(np.sqrt(tmp.shape[-1])))
tmp = np.reshape(tmp, [dim] * 4)
tmp = np.transpose(tmp, [3, 2, 1, 0])
tmp = np.reshape(tmp, 2 * [dim**2])
yield key, tmp
[docs]class RhoredObservable(_ObsTerm):
"""
Defines the measurement of a reduced density matrix of an
arbitrary subsystem
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable`
are
termtype : str
Identification as an arbitrary reduced density matrix must be
'DensityMatrix_red'.
term : list of integers
All sites to be included in the reduced density matrix. Python
indexing starting at 0 for first site.
maxdim : int, optional
Prevents memory problems by avoiding larger reduced density
matrices setting a maximal size of the total Hilbert space
of the subsystem.
Default to 4096 (corresponds to 12 qubits).
**Example**
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservables('DensityMatrix_red', [0, 1, 2])
Indices must be sorted ascending. The corresponding dictionary key
for accessing the reduced density matrix is the tuple ('rho', 0, 1, 2).
**Information**
The quick estimate is that the permutation of the indices to be kept
followed by a partial trace has a reasonable performance. For this
reason, the reduced density matrix involves truncations induced by
the swapping of sites.
**Developers note**
The swapping scales with :math:`O(\chi^3 d^3)`. The contraction
of the reduced density matrix can be done completely within the
ket-state (:math:`d^l \chi^2`) except the last contraction of
the ket with the bra (:math:`d^{2l} \chi`), where :math:`l` is
the dimension of the subsystem of the reduced density matrix.
If we do not permute sites, we have to trace out intermediate
sites, which pushes the contraction of bra-ket states to the
front of the algorithm. The contraction of the reduced density
matrix up to site k is contracted with the ket of site (k+1)
followed by the contraction of the bra of site (k+1) at costs
of :math:`O(d^{2 l_k + 1} \chi^2) + O(d^{2 l_k + 1} \chi^3)`,
where :math:`l_k < l` contains the number site remaining in
the reduced density matrix up to now. The final contraction
of the l-th site can use orthogonality and scales with
:math:`O(d^{2l} \chi^2)`.
Thus, we consider the swapping method as better scaling despite
the introduction of an error and possible overhead for doing
a significant amount of swapping operations.
"""
def __init__(self, Ops):
self.inds = []
self.ld = Ops['I'].shape[0]
def __len__(self):
return len(self.inds)
def add(self, term, *args, **kwargs):
self.ignored(['maxdim'], kwargs)
self.setargs(['maxdim'], args, kwargs)
self.default({'maxdim' : 4096}, kwargs)
if(self.ld**len(term) > kwargs['maxdim']):
raise Exception("Reduced density matrix exceeds maximal "
+ "dimension %5d."%(self.maxdim))
if(np.any(np.array(term)[:-1] >= np.array(term)[1:])):
raise Exception("Indices not sorted or not unique.")
for ii in range(len(self)):
if(np.all(np.array(self.inds[ii]) == np.array(term))):
# The indices are already added
return
self.inds.append(term)
return
def write(self, Obsfile, **kwargs):
nn = len(self)
Obsfile.write('%16i\n'%(nn))
if(nn == 0): return
for ii in range(nn):
mm = len(self.inds[ii])
Obsfile.write('%16i%16i\n'%(mm, self.continuous(ii)))
hstr = ''
for jj in range(mm):
# +1 for converting python index to fortran
hstr += '%16i'%(self.inds[ii][jj] + 1)
Obsfile.write(hstr + '\n')
return
[docs] def continuous(self, ii):
"""
Check if subsystem is a continuous set of sites, which can
avoid permutations. In this case, 0 is returned, otherwise 1.
"""
val = np.all(np.array(self.inds[ii])[1:] - np.array(self.inds[ii])[:-1] == 1)
if(val): return 0
return 1
[docs] def read(self, fh, **kwargs):
"""
Read the reduced density matrices.
**Arguments**
"""
nn = len(self)
if(nn == 0): return
param = kwargs['param']
dynamic = kwargs['dynamic']
cmplx = dynamic or param['has_complex_op']
for ii in range(nn):
mm = len(self.inds[ii])
dim = self.ld**mm
mat = self.line_to_floats(fh, dim**2)
mat = np.reshape(mat, (dim, dim)).transpose()
if(cmplx):
imat = self.line_to_floats(fh, dim**2)
imat = np.reshape(imat, (dim, dim)).transpose()
mat = mat + 1j * imat
# Account for column major ordering of sub-indices
inda = np.array(np.linspace(0, mm - 1, mm), dtype=np.int32)[::-1]
indb = inda + mm
ind = list(inda) + list(indb)
mat = np.reshape(np.transpose(np.reshape(mat, 2 * mm * [self.ld]),
ind), 2 * [dim])
key = tuple(['rho'] + self.inds[ii])
yield key, mat
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(not isinstance(key, tuple)): return
keyp = tuple(list(key)[1:])
idx = -1
for ii in range(len(self)):
if(keyp == tuple(self.inds[ii])):
idx = ii
break
if(idx == -1): return
ll = len(self.inds[idx])
obskey = 'rho_ijk_' + str(idx + 1)
# Data handle
dh5 = gh5[obskey]
tmp = dh5[:, :]
if(tmp.shape[0] == 2):
tmp = tmp[0, :] + 1j * tmp[1, :]
dim = int(np.exp(np.log(tmp.shape[-1]) / (2.0 * ll)))
tmp = np.reshape(tmp, (2 * ll) * [dim])
perm = list(range(2 * ll))[::-1]
tmp = np.transpose(tmp)
tmp = np.reshape(tmp, 2 * [dim**ll])
yield key, tmp
[docs]class LambdaObservable(_ObsTerm):
"""
Lambda observable taking care of the measurements of the singular values
of the reduced density matrices of the bipartitions.
**Details**
The arguments of the :py:func:`obsterms.Observables.AddObservable` are
termType : str
string must be ``Lambda``.
term : list
Empty list to measure at all splittings or specifying the splittngs
to be saved.
**Example**
In order to indicate the measurement of the singular values, you call
:py:func:`obsterms.Observables.AddObservable` as follows:
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('Lambda', [])
**Variables**
has : bool
``False`` if no singular values should be saved, ``True`` otherwise.
List : list of indices
contains the list with the indices of bipartitions to be measured.
Orig : list of indices
contains the original list passed as argument. This original list is
references to generate the list for each simulations. This is
important in case of an empty list where the final argument depends
on the system size and may vary.
"""
def __init__(self):
self.has = False
self.List = []
self.Orig = []
[docs] def add(self, term, *args, **kwargs):
"""
Add term for measuring the singular values. Can only be called once.
For arguments see class description :py:class:`obsterms.LambdaObservable`
"""
self.ignored([], kwargs)
self.setargs([], args, kwargs)
if(self.List):
# List already set before
raise Exception("Only call AddObservable once for Lambda. If " +
"multiple Lambdas are desired, pass a list.")
self.has = True
if(hasattr(term, '__len__')):
# Tuple, list, etc: convert to list
self.List = list(term)
self
else:
# Single splitting: put into list
self.List = [term]
self.Orig = deepcopy(self.List)
[docs] def __len__(self):
"""
Length is either zero or one depending on boolean ``has``.
"""
return int(self.has)
[docs] def update(self, param):
"""
Update the observable for the current setting of the simulation
dictionary.
**Arguments**
param : dictionary
mainly passed to extract the number of sites for each simulation.
"""
# Quick return
if(not self.has): return
# Checks
if(param['simtype'] == 'Infinite'):
raise Exception("Output of Lambda not supported for iMPS!")
self.List = deepcopy(self.Orig)
if(self.List):
# List with data - check consistency
if len(self.List) != len(set(self.List)):
raise Exception("Set of indices passed to Lambda " +
"should not contain repetitions!")
else:
# Empty list - fill with all possible values
for ii in self.all_values(param['L']):
self.List.append(ii)
[docs] def all_values(self, ll):
"""
Iterate over all possible values which can be measured.
**Arguments**
ll : int
system size
"""
for ii in range(ll - 1):
yield (ii + 1)
[docs] def write(self, Obsfile, **kwargs):
"""
Write the information to a given file handle.
**Arguments**
Obsfile : filehandle
Write to this file.
param : dictionary
Contains the settings for the simulation.
"""
if(self.has):
Obsfile.write('T\n')
else:
Obsfile.write('F\n')
return
param = kwargs['param']
self.update(param)
Hstring = '%16i'%(len(self.List)) + '\n'
for ii in sorted(self.List):
Hstring += '%16i'%(ii)
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results from file. The filehandle must be at the
corresponding position.
**Arguments**
fh : filehandle
Open file with results. The next lines contain the
measurements of the Schmidt decomposition.
"""
# Quick return (empty iterator)
if(not self.has): return
param = kwargs['param']
self.update(param)
value = {}
for ii in self.List:
# Read number of singular values
chi = int(fh.readline().replace('\n', ''))
# Read singular values
value[ii] = self.line_to_floats(fh, chi)
yield 'Lambda', value
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key != 'Lambda'): return
if(not self.has): return
param = kwargs['param']
self.update(param)
value = {}
for ii in self.List:
h5key = 'Lambda_' + str(ii)
# Data handle
dh5 = gh5[h5key]
value[ii] = dh5[:]
yield 'Lambda', value
[docs]class DistancePsiObs(_ObsTerm):
"""
DistanceObs measures the distance to a pure state.
**Details**
term : list of strings
Contains the filename containing the state to measure the distance
to.
name : str, keyword argument
The key to access the result in the result dictionary.
Must be present.
dist : str, optional
Either ``Trace`` or ``Infidelity`` to specify which quantum
distance is used.
Default to ``Infidelity``.
**Example**
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservables('dist_psi', 'somempsstate.bin', name='distpsi')
**Variables**
term : list of strings
Contains the filename containing the state to measure the distance
to.
name : list of strings
Keys for access in the result dictionary.
"""
def __init__(self):
self.term = []
self.name = []
self.dist = []
[docs] def add(self, term, *args, **kwargs):
"""
Add a new distance measurement. The type of distance measure
can be choosen.
**Arguments**
term : str
Contains the filename containing the state to measure the distance
to.
dist : str
The type of distance measures used. Can be ``Trace`` or
``Infidelity``.
name : str
String identifier to address measurement in result dictionary.
"""
self.ignored(['name', 'dist'], kwargs)
self.setargs(['name', 'dist'], args, kwargs)
self.default({'dist' : 'Infidelity'}, kwargs)
self.required(['name'], kwargs)
if(not kwargs['dist'] in ['Trace', 'Infidelity']):
raise Exception('Unknown string specifier for distance measure.')
self.term.append(term)
self.name.append(kwargs['name'])
self.dist.append(kwargs['dist'])
[docs] def __len__(self):
"""
Return the number of distance measurements.
"""
return len(self.term)
[docs] def __getitem__(self, key):
"""
Return the corresponding variable.
"""
return self.__dict__[key]
[docs] def get_hash(self):
"""
Generate hash for this object.
"""
return hash_dict(self.__dict__)
def write(self, Obsfile, **kwargs):
nn = len(self)
Obsfile.write('%16i\n'%(nn))
for ii in range(nn):
Hstring = self.term[ii] + ' ' * (256 - len(self.term[ii])) + '\n'
Hstring += 'B' if(self.term[ii].split('.')[-1] == 'bin') else 'H'
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results from file. The filehandle must be at the
corresponding position.
**Arguments**
fh : filehandle
Open file with results. The next lines contain the
distance measurements.
"""
for ii in range(len(self)):
value = self.line_to_floats(fh, 1)
yield self.name[ii], value
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Read the results from an open HDF5 file.
**Arguments**
fh : id of HDF5 group
This is the HDF5 group containing the observable.
"""
if(key not in self.name): return
idx = self.name.index(key)
# Data handle
dh5 = gh5['dist_psi_obs']
dim = dh5[:].shape[0] // 2
tmp = dh5[idx] + 1j * dh5[dim + idx]
yield key, tmp
[docs]class StateObservable(_ObsTerm):
"""
Write out the state as file to have access to it lateron.
**Details**
The arguments of :py:func:obsterms.Observables.AddObservable` for
setting the state observable are
term : bool
Status of writing the state. ``True`` for writing it to file,
``False`` for not measuring.
form : character, optional
Output format of the file which can be binary 'B' or 'H' for
human readable text.
Default to 'B' (binary).
**Example**
By default the states are not written to file as this measurement
can be memory intensive. In order to set this measurement, follow
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservable('State', True)
Replacing ``True`` with ``False``, the measurement can be turned
off again. (Currently not used in ED.)
**Variables**
The following class variables are defined.
has : bool
``False`` if states are not written out. ``True`` otherwise.
file_format : str
Defines if binary or text is written.
"""
def __init__(self):
self.has = False
self.file_format = 'B'
[docs] def add(self, term, *args, **kwargs):
"""
Add functions as method to turn state measurements on and off. Last
call determines if the site entropy is measured or not. For arguments
see :py:class:`obsterms.StateObservable`
"""
self.ignored(['form'], kwargs)
self.setargs(['form'], args, kwargs)
self.default({'form' : 'B'}, kwargs)
self.has = term
self.file_format = kwargs['form']
[docs] def __len__(self):
"""
Length is either zero or one depending on boolean ``has``.
"""
return int(self.has)
[docs] def write(self, Obsfile, **kwargs):
"""
Write the information to a given file handle.
**Arguments**
Obsfile : filehandle
Write to this file.
"""
if(self.has):
Obsfile.write(self.file_format + '\n')
else:
Obsfile.write('N\n')
return
[docs] def read(self, fh, **kwargs):
"""
This result is not stored in the result file, but a different file. Therefore,
read is empty. (Still have to make it an iterator.)
"""
if(True): return
yield (None, None)
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
This result is not stored in the result file, but a different file. Therefore,
read is empty. (Still have to make it an iterator.)
"""
if(True): return
yield (None, None)
[docs]class ETHObservable(_ObsTerm):
"""
Check the eigenstate thermalization hypothesis (ETH) for a subsystem.
This measurement is calculated a posteriori within the python frontend
based on the reduced density matrix. At present, it is for the first
sites of the system.
**Variables**
The arguments of the :py:func:`obsterms.Observables.AddObservable`
are
termtype : str
Identification as ETH measurement must be 'ETH'.
term : integer
ETH is checked for the first site until site ``term``.
Ham : instance of MPO
The Hamiltonian of the system. The rule sets are used to
build a truncated Hamiltonian on the subsystem.
kB : float, optional
Value of Boltzmann constant.
Default to 1.0
maxdim : int, optional
Prevents memory problems by avoiding larger reduced density
matrices and diagonalization of the Hamiltonian of the same
size.
Default to 4096 (corresponds to 12 qubits).
SymmSec : None or instance of SymmetrySector, optional
If symmetry sector is passed, ETH is checked in the corresponding
sector. As the symmetries in OSMPS are global symmetries, a
corresponding symmetry in a subsystem only occurs in special
cases.
sparse : bool, optional
Control for using sparse matrices when building the Hamiltonian.
Default to dense matrices.
**Example**
>>> Obs = mps.Observables(Operators)
>>> Obs.AddObservables('ETH', 10)
"""
def __init__(self, rhoijk):
self.rhoijk = rhoijk
self.inds = []
self.maxdim = []
self.SymmSec = []
self.sparse = []
self.kB = []
self.Ham = []
def __len__(self):
return len(self.inds)
def add(self, term, *args, **kwargs):
self.ignored(['Ham', 'maxdim', 'SymmSec', 'sparse', 'kB'], kwargs)
self.setargs(['Ham', 'maxdim', 'SymmSec', 'sparse', 'kB'], args, kwargs)
self.required(['Ham'], kwargs)
self.default({'maxdim' : 4096, 'SymmSec' : None,
'sparse' : False, 'kB' : 1.0}, kwargs)
# Add reduced density matrix
sites = list(range(term + 1))
self.rhoijk.add(sites, maxdim=kwargs['maxdim'])
self.inds.append(sites)
self.maxdim.append(kwargs['maxdim'])
self.SymmSec.append(kwargs['SymmSec'])
self.sparse.append(kwargs['sparse'])
self.kB.append(kwargs['kB'])
self.Ham.append(kwargs['Ham'])
return
[docs] def get(self, params, Res):
"""
Calculate the temperature and error from fit for ETH.
"""
# Array with number of measurements and its error
eth = np.zeros((len(self), 2))
for ii in range(len(self)):
# Get the density matrix
key = tuple(['rho'] + self.inds[ii])
rho = Res[key]
# Number of sites in the subsystem
lsub = len(self.inds[ii])
# Change system size in a copy of the parameters
subparams = deepcopy(params)
subparams['L'] = lsub
ld = self.rhoijk.ld
for key in subparams.keys():
elem = subparams[key]
if(not hasattr(elem, '__len__')): continue
if(len(elem) != params['L']): continue
subparams[key] = elem[:lsub]
# Check for time
time = Res['time'] if('time' in Res) else None
# Build Hamiltonian
Mat = self.Ham[ii].build_hamiltonian(subparams,
self.SymmSec[ii],
self.sparse[ii],
self.maxdim[ii],
quenchtime=time)
# Calculate eigenvalues / vectors
vals, vecs = la.eigh(Mat)
# Calculate probabilities on density matrix
pi = np.zeros((Mat.shape[0]))
for jj in range(Mat.shape[0]):
pi[jj] = np.dot(np.conj(vecs[:, jj]), np.dot(rho, vecs[:, jj]))
def fitfunc(xval, Tval, ll=lsub, ld=ld, kB=self.kB[ii]):
if(xval.shape[0] != ld**ll): raise Exception('Sanity check')
fx = np.exp(- (xval - np.min(xval)) / (kB * Tval))
fx /= np.sum(fx)
return fx
pval, pcov = curve_fit(fitfunc, vals, pi, p0=[0.1])
eth[ii, 0] = pval[0]
eth[ii, 1] = pcov[0, 0]
Res['ETH'] = eth
return
[docs] def write(self, Obsfile, **kwargs):
"""
Nothing to be done on fortran side. So write is empty.
"""
pass
[docs] def read(self, fh, **kwargs):
"""
Nothing written on fortran side. So read is empty. See
``get`` for how to obtain results.
"""
pass
[docs] def read_hdf5(self, gh5, key, **kwargs):
"""
Nothing written on fortran side. So read is empty. See
``get`` for how to obtain results.
"""
pass
[docs]class GateObservable(_ObsTerm):
"""
Measure the overlap between state :math:`| A \\rangle` and state
:math:`| B \\rangle`, where a series of gates :math:`G_i` is applied to
:math:`| A \\rangle` to obtain :math:`| B \\rangle`.
.. math::
\langle A | G_N \ldots G_1 | A \\rangle
At present this feature is only supported for pure states in the EDLib and
two site gates.
**Variables**
Gates : list
Defines the quantum circuits.
name : list of strings
String identifier of the meaurement.
weight : list of floats
Weight for the each measurement.
**Details**
In order to add a new gate measurement, call
:py:func:`obsterms.Observables.AddObservable` with the following arguments:
termType : str
string must be ``Gates``
term : tuple / list
The first entry contains the list of gates/matrices, the second
contains the list of indices.
name : str (optional keyword argument)
Identifier for this measurement in the results.
weight : float (optional keyword argument)
Overall weight for measurement.
"""
def __init__(self):
self.Gates = []
self.name = []
self.weight = []
[docs] def add(self, term, *args, **kwargs):
"""
Add a new gate measurement consisting of a list of gates and the
corresponding indices where it should be applied.
**Arguments**
term : list/tuple of np 2d arrays; single np 2d array
d^m x d^m matrices which are the gates to be applied to the state.
They should be defined on the local Hilbert spaces.
idx : list/tuple of integers; single int
specifies the left-most site where the gate should be applied.
name : str (optional keyword argument)
Identifier for this measurement in the results.
weight : float (optional keyword argument)
Overall weight for measurement.
"""
self.ignored(['name', 'weight'], kwargs)
self.setargs(['name', 'weight'], args, kwargs)
self.default({'weight' : 1.0}, kwargs)
self.required(['name'], kwargs)
self.Gates.append(term)
self.name.append(kwargs['name'])
self.weight.append(kwargs['weight'])
[docs] def __len__(self):
"""
Return the number of gate measurements.
"""
return len(self.Gates)
[docs] def __getitem__(self, key):
"""
Return the corresponding variable.
"""
return self.__dict__[key]
[docs] def get_hash(self):
"""
Generate hash for this object.
"""
return hash_dict(self.__dict__)
[docs] def get_swap(self, ld):
"""
Generate a SWAP gate for two sites.
**Arguments**
ld : int
specifies the local dimension of the Hilbert space
"""
swap = np.zeros((ld**2, ld**2))
for i1 in range(ld):
for i2 in range(ld):
j1 = i1 + i2 * ld
j2 = i2 + i1 * ld
swap[j1, j2] = 1.0
return swap
def write(self, Obsfile):
if(len(self) > 0):
print("Warning: GateObservable is not supported for MPS "
+ "simulations.")
return
[docs] def read(self, fh, **kwargs):
"""
Read the results from file. The filehandle must be at the
corresponding position.
**Arguments**
fh : filehandle
Open file with results. The next lines contain the
gate measurements.
"""
for ii in range(len(self)):
tmp = self.line_to_floats(fh, 2)
value = tmp[0] + 1j * tmp[1]
yield self.name[ii], value
[docs]class DistanceRhoObs(_ObsTerm):
"""
DistanceObs measures the distance to another density matrix.
**Variables**
term : list of strings
Contains the filename containing the state to measure the distance
to.
name : string
Label for the distance measurement. Required.
dist : list of strings
The type of distance measures used. Can be ``Trace`` or
``Infidelity``.
Default to ``Infideltiy``.
"""
def __init__(self):
self.term = []
self.dist = []
self.name = []
[docs] def add(self, term, *args, **kwargs):
"""
Add a new distance measurement. The type of distance measure
can be choosen.
**Arguments**
term : str
Contains the filename containing the state to measure the distance
to.
dist : str
The type of distance measures used. Can be ``Trace`` or
``Infidelity``.
name : str
String identifier to address measurement in result dictionary.
"""
self.ignored(['name', 'dist'], kwargs)
self.setargs(['name', 'dist'], args, kwargs)
self.default({'dist' : 'Infidelity'}, kwargs)
self.required(['name'], kwargs)
if(not kwargs['dist'] in ['Trace', 'Infidelity']):
raise Exception('Unknown string specifier for distance measure.')
self.term.append(term)
self.name.append(kwargs['name'])
self.dist.append(kwargs['dist'])
[docs] def __len__(self):
"""
Return the number of distance measurements.
"""
return len(self.term)
[docs] def __getitem__(self, key):
"""
Return the corresponding variable.
"""
return self.__dict__[key]
[docs] def get_hash(self):
"""
Generate hash for this object.
"""
return hash_dict(self.__dict__)
def write(self, Obsfile):
nn = len(self)
Obsfile.write('%16i\n'%(nn))
for ii in range(nn):
Hstring = self.term[ii] + '\n'
Hstring += 'B' if(self.term[ii].split('.')[-1] == 'bin') else 'H'
Obsfile.write(Hstring + '\n')
return
[docs] def read(self, fh, **kwargs):
"""
Read the results from file. The filehandle must be at the
corresponding position.
**Arguments**
fh : filehandle
Open file with results. The next lines contain the
distance measurements.
"""
for ii in range(len(self)):
value = self.line_to_floats(fh, 1)
yield self.name[ii], value
[docs]class Observables():
def __init__(self, Operators):
self.Operators = Operators
self.data = {}
self.data['site'] = SiteObservable(Operators)
self.data['SiteEntropy'] = SiteEntropyObservable()
self.data['BondEntropy'] = BondEntropyObservable()
self.data['fermicorr'] = FCorrObservable(Operators)
self.data['corr'] = CorrObservable(Operators, self.data['fermicorr'])
self.data['string'] = StringCorrObservable(Operators)
self.data['MPO'] = MPOObservable()
self.data['DensityMatrix_i'] = RhoiObservable()
self.data['DensityMatrix_ij'] = RhoijObservable()
self.data['DensityMatrix_red'] = RhoredObservable(Operators)
self.data['MI'] = MutualInformationObservable()
self.data['Lambda'] = LambdaObservable()
self.data['dist_psi'] = DistancePsiObs()
self.data['State'] = StateObservable()
self.data['corr2nn'] = Corr2NNObservable(Operators)
# MPS measurement mainly handled during post processing
self.data['ETH'] = ETHObservable(self.data['DensityMatrix_red'])
# ED only
self.data['Gates'] = GateObservable()
self.data['dist_rho'] = DistanceRhoObs()
# Only referenced for iMPS
self.correlation_range = 100
# List for iterations
self.mpslist = ['site', 'SiteEntropy', 'BondEntropy', 'corr',
'fermicorr', 'string', 'MPO', 'DensityMatrix_i',
'DensityMatrix_ij', 'DensityMatrix_red', 'MI',
'Lambda', 'dist_psi', 'State', 'corr2nn']
[docs] def __getitem__(self, key):
"""
Return the corresponding observable from the class.
"""
if(key == 'correlation_range'):
return self.correlation_range
else:
return self.data[key]
[docs] def AddObservable(self, termtype, term, *args, **kwargs):
"""
Add observable to be measured.
"""
if(termtype in self.data):
self.data[termtype].add(term, *args, **kwargs)
else:
raise Exception("termType = " + str(termType) + " is not "
+ "recognized for AddObservable!")
return
[docs] def write(self, filestub, Parameters, IntOperators, IntHparam,
PostProcess=False):
"""
Write the Observables object to a Fortran-readable file.
**Arguments**
fileStub : str
basis part of the filename for this simulation extended
with ``Obs.dat``.
Parameters : dictionary
dictionary representing the simulation to be written.
IntOperators : dictionary
mapping the string identifiers for the operators to integer
identifiers used within fortran. Needed to write MPO
measurements.
IntHparam : dictionary
mapping the string identifiers for Hamiltonian parameters to
integer identifiers used within fortran. Needed to write MPO
measurements.
PostProcess : bool, optional
The fortran files are only written if ``False``. Otherwise
only the necessary data is updated, e.g. the list containing
the settings for the reduced density matrices.
Default to ``False``
"""
Obsfilename = filestub + 'Obs.dat'
if(PostProcess):
self.data['Lambda'].update(Parameters)
self.data['DensityMatrix_i'].update(Parameters)
self.data['DensityMatrix_ij'].update(Parameters)
else:
Parameters['int_files'].append(Obsfilename)
Obsfile = open(Obsfilename, 'w')
Obsfile.write('%16i\n'%(self.correlation_range))
for elem in self.mpslist:
# rho_ij depends on MI - goes last
if(elem == 'DensityMatrix_ij'): continue
self.data[elem].write(Obsfile, filestub=filestub,
param=Parameters, IntOp=IntOperators,
IntHparam=IntHparam)
elem = 'DensityMatrix_ij'
self.data[elem].write(Obsfile, filestub=filestub,
param=Parameters, IntOp=IntOperators,
IntHparam=IntHparam)
#self.data['Gates'].write(Obsfile)
#self.data['rho_dist'].write(Obsfile)
Obsfile.close()
return Obsfilename
def read_hdf5(self, gh5, key, param, dynamic, qtid=None):
Res = {}
found = False
for key, val in self.read_hdf5_header(gh5, key, param=param,
dynamic=dynamic, qtid=qtid):
Res[key] = val
if(found): return Res
for elem in self.mpslist:
found = False
for key, val in self.data[elem].read_hdf5(gh5, key, param=param):
Res[key] = val
found = True
if(found):
break
return Res
def read_hdf5_header(self, gh5, key, **kwargs):
is_dyn = kwargs['dynamic']
if(is_dyn):
header_float = {'time': 0, 'energy': 1, 'Loschmidt_Echo': 2, 'error': 4}
header_int = {'bond_dimension': 0, 'kappa_dimension': 1}
else:
header_float = {'energy' : 0, 'variance' : 1, 'LdL_eigenvalue' : 2}
header_int = {'converged': 0, 'bond_dimension': 1}
if(key in header_float):
idx = header_float[key]
# Data handle
dh5 = gh5['header_float']
if(key == 'Loschmidt_Echo'):
tmp = dh5[idx] + 1j * dh5[idx + 1]
else:
tmp = dh5[idx]
yield key, tmp
elif(key in header_int):
idx = header_int[key]
# Data handle
dh5 = gh5['header_int']
if(key == 'converged'):
tmp = bool(dh5[idx])
else:
tmp = dh5[idx]
yield key, tmp
elif(key == 'CPU_time'):
tmp = get_runtime(param, qtid=qtid)
yield key, tmp
return
def read(self, fh, param, dynamic, qtid=None):
ld = self.Operators['I'].shape[0]
Res = {}
# The first line summarizing general information
line = fh.readline().replace('\n', '')
if(dynamic):
Res['time'] = float(line[0:30])
Res['energy'] = float(line[30:60])
Res['bond_dimension'] = int(line[60:76])
Res['kappa_dimension'] = int(line[76:92])
Res['Loschmidt_Echo'] = float(line[92:122]) \
+ 1j * float(line[122:152])
Res['error'] = float(line[152:182])
else:
Res['energy'] = float(line[0:30])
Res['variance'] = float(line[30:60])
Res['converged'] = (line[60:64].replace(" ","") == 'T')
Res['bond_dimension'] = int(line[64:80])
Res['LdL_eigenvalue'] = float(line[80:110])
Res['CPU_time'] = get_runtime(param, qtid=qtid)
for elem in self.mpslist:
for key, val in self.data[elem].read(fh, param=param, ld=ld,
dynamic=dynamic,
ll=param['L']):
Res[key] = val
# ETH sets its own dictionary entry
self.data['ETH'].get(param, Res)
edlist = ['Gates', 'dist_rho']
for elem in edlist:
for key, val in self.data[elem].read(fh, param=param, ld=ld,
dynamic=dynamic,
ll=param['L']):
Res[key] = val
return Res
[docs] def read_imps(self, fh, param):
"""
Read the results of an iMPS simulation for one convergence
parameter set. The possible degenerate states are accessible
as a nested dictionary, key is ``('DegMeas', ii)`` for the
ii-th degenerate state. The values of ``ii = 0`` are present
in the usual result dictionary.
**Arguments**
fh : filehandle
open filehandle at correct position to read iMPS results.
param : dictionary
Simulation setup as dictionary to retrieve settings.
"""
ld = self.Operators['I'].shape[0]
corr_range = self.correlation_range
Res = {}
Res['error_code'] = int(fh.readline().replace('\n',''))
if(Res['error_code'] != 0): return Res
ndeg = int(fh.readline().replace('\n',''))
ncorr = int(fh.readline().replace('\n',''))
bond_dimension = int(fh.readline().replace('\n',''))
raw_str = fh.readline().replace('\n','')
convvec = fh.readline().replace('\n', '')
envec = fh.readline().replace('\n', '')
Res['N_degeneracies'] = ndeg
Res['N_Ground_States'] = ncorr
Res['bond_dimension'] = bond_dimension
try:
corr_length = float(raw_str)
except:
print('warning: could not convert ' + raw_str + ' to a float. '
+ 'Converting to 0.0 instead!')
corr_length = 0.0
Res['Correlation_length'] = corr_length
# Convert convergence and energy values
convvec = list(map(float,convvec.split(' ')[1:3+1]))
envec = list(map(float,envec.split(' ')[1:2]))
converged = 'T' if((1.0 - convvec[0]) <= convvec[2]) else 'F'
Res['Orthogonality_Fidelity'] = convvec[0]
Res['Truncation_error'] = convvec[1]
Res['Fixed_Point_Tolerance'] = convvec[2]
Res['converged'] = converged
Res['energy_density'] = envec[0]
# Print at least some warning for measures not implemented for iMPS
for elem in ['MPO', 'DensityMatrix_i', 'DensityMatrix_ij',
'DensityMatrix_red', 'MI', 'Lambda', 'dist_psi',
'State']:
if(len(self.data[elem]) > 0):
print('Warning: measurement of ' + elem + ' not available '
+ 'for iMPS.')
for state in range(ncorr):
Resdeg = {}
for elem in ['site', 'SiteEntropy', 'BondEntropy']:
for key, val in self.data[elem].read(fh, param=param,
ld=ld, ll=param['L']):
Resdeg[key] = val
if(state == 0): Res[key] = val
for elem in ['corr', 'fermicorr', 'string']:
for key, val in self.data[elem].read_imps(fh, corr_range):
Resdeg[key] = val
if(state == 0): Res[key] = val
Res[('DegMeas', state)] = Resdeg
return Res
[docs] def copy(self):
"""
Return a deepcopy of self.
"""
return deepcopy(self)
[docs] def SpecifyCorrelationRange(self, corr_range):
"""
Set the range of the correlation functions for an iMPS simulation
**Arguments**
corr_range : Integer
distance for the correlation measurement in a iMPS simulation.
"""
self.correlation_range = corr_range
[docs] def get_hash(self):
"""
Generate the hash for this object
"""
hstr = str(self.correlation_range)
for term in self.mpslist:
hstr += self.data[term].get_hash()
for term in ['Gates' , 'dist_rho']:
hstr += self.data[term].get_hash()
hh = hashlib.sha512()
hh.update(str.encode(hstr))
return hh.hexdigest()
[docs]class Result():
"""
**Arguments**
param : dictionary
parameter dictionary setting up a single simulation
case : str
Specifies the measurement, either 'MPS' (ground state), 'eMPS'
(excited state), 'dynamics' (time evolution).
state : integer, optional
select between the states, especially for excited states.
For dynamics, store the time step in there.
default to 0
conv_ii : integer, optional
selects i-th element in the set of convergence parameters
counting from 1 on. It is taken into account by an offset that
python indices start at 0.
For dynamics, store the number of the quench in there.
default to 1
dynamic : logical, optional
Layout of the output differs from static to dynamic. Specify
`dynamic=True` for reading dynamic simulations.
default to False
FiniteT : logical, optional
To treat different formatting on convergence parameters in
regard to Finite T simulations.
Default to False.
"""
def __init__(self, param, case, state=0, conv_ii=1, FiniteT=False):
self.case = case
self.state = state
self.cp = conv_ii
self.FiniteT = FiniteT
self.imps = (param['simtype'] == 'Infinite')
self.data = {'state' : state, 'convergence_parameter' : conv_ii}
self.delkeys = None
dynamic = (self.case == 'dynamics')
for key in param:
if((key == 'MPSConvergenceParameters') and (state == 0)
and (not dynamic) and (not FiniteT)):
for subkey in param[key].data[conv_ii - 1]:
self.data[subkey] = \
param[key].data[conv_ii - 1][subkey]
elif((key == 'eMPSConvergenceParameters') and (state != 0)):
for subkey in param[key].data[conv_ii - 1]:
self.data[subkey] = \
param[key].data[conv_ii - 1][subkey]
elif((key == 'dynamics') and dynamic):
self.data[key] = param[key]
else:
# get all data from parameters list
self.data[key] = param[key]
def __getitem__(self, key):
if(key in self.data):
return self.data[key]
if(self.data['hdf5']):
# Read HDF5 file
try:
Res = self.read_hdf5(key)
return Res[key]
except OSMPS_H5File_Error:
# We cannot be sure that HDF5 was enabled on compiler side ...
# try reading the dat files
print('Warning: Could not locate HDF5 file.')
if(self.delkeys is None):
# Read the data
Res = self.read()
self.delkeys = list(Res.keys())
self.data.update(Res)
del Res
return self.data[key]
def __iter__(self):
for key in self.data.keys():
yield key
def keys(self):
return self.data.keys()
[docs] def deallocate(self):
"""
Delete all data from the measurement to keep memory usage down.
"""
for key in self.delkeys:
del self.data[key]
self.delkeys = None
return
[docs] def read(self):
"""
"""
has_qt = ('QT' in self.data) and (self.data['QT'] > 0)
if((self.case == 'MPS') and (not self.FiniteT) and (not self.imps)):
# Ground state results
# ....................
# Quick link to observables
Obs = self.data['MPSObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + 'ObsOut' + str(0)
flnm += '_' + str(self.cp) + '.dat'
if(os.path.isfile(flnm)):
# Result file under simulation name available
Obsfile = open(flnm, 'r')
else:
# Simulation with identic hash might exist - check
match = self.find_hash_in_map()
if(match is None):
raise Exception('Ground state file not found; ground'
+ ' state file with equal hash not'
+ ' found.')
match = match + 'ObsOut' + str(0)
match += '_' + str(self.cp) + '.dat'
Obsfile = open(match, 'r')
Tmp = Obs.read(Obsfile, self.data, False)
Obsfile.close()
elif((self.case == 'MPS') and (not self.imps)):
# Finite temperature simulation
# .............................
# Quick link to observables
Obs = self.data['MPSObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + 'FiniteT_'
flnm += str(self.cp) + '.dat'
if(os.path.isfile(flnm)):
# Result file under simulation name available
Obsfile = open(flnm, 'r')
else:
# Simulation with identic hash might exist - check
raise NotImplementedError
# Finite-T simulations use formatting from
# dynamics results --> dynamic=True
Tmp = Obs.read(Obsfile, self.data, True)
Obsfile.close()
elif((self.case == 'eMPS') and (not self.imps)):
# Excited state results
# .....................
# Quick link to observables
Obs = self.data['eMPSObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + 'ObsOut' + str(self.state)
flnm += '_' + str(self.cp) + '.dat'
if(os.path.isfile(flnm)):
# Result file under simulation name available
Obsfile = open(flnm, 'r')
else:
# Simulation with identic hash might exist - check
raise NotImplementedError
Tmp = Obs.read(Obsfile, self.data, False)
Obsfile.close()
elif(self.imps):
# Results from infinite MPS simulations
# -------------------------------------
# Quick link to observables
Obs = self.data['MPSObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + 'ObsOut' + str(0)
flnm += '_' + str(self.cp) + '.dat'
if(os.path.isfile(flnm)):
# Result file under simulation name available
Obsfile = open(flnm, 'r')
else:
# Simulation with identic hash might exist - check
raise Exception('Result file ' + flnm + ' for iMPS not found!')
# Finite-T simulations use formatting from
# dynamics results --> dynamic=True
Tmp = Obs.read_imps(Obsfile, self.data)
Obsfile.close()
elif((self.case == 'dynamics') and (not has_qt)):
# Results from time evolution
# ---------------------------
# Quick link to observables
Obs = self.data['DynamicsObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + 'ObsOutDynamics/Dynamics_'
flnm += str(self.cp + 1)
if('extradt' not in self.data):
flnm += 'step_%06d'%(self.state) + '.dat'
else:
flnm += 'step_extra.dat'
if(os.path.isfile(flnm)):
# Result file under simulation name available
Obsfile = open(flnm, 'r')
else:
# File not there
raise Exception('File not found for dynamic results ' +
flnm + '.')
Tmp = Obs.read(Obsfile, self.data, True)
Obsfile.close()
elif(self.case == 'dynamics'):
# Results from quantum trajectory time evolution
# -----------------------------------------------
# Quick link to observables
Obs = self.data['DynamicsObservables']
# Construct filename
base = self.data['Output_Directory'] + self.data['job_ID']
base += self.data['unique_ID'] + 'ObsOutDynamics/Dynamics_'
base += str(self.cp + 1)
if('extradt' not in self.data):
base += 'step_%06d'%(self.state) + '.dat'
else:
base += 'step_extra.dat'
# Read first trajectory
flnm = base.replace('.dat', '_qt%09d.dat'%(1))
if(os.path.isfile(flnm)):
# Result file under simulation name available
Obsfile = open(flnm, 'r')
else:
# File not there
raise Exception('File not found for dynamic results.')
Tmp = Obs.read(Obsfile, self.data, True, qtid=1)
Obsfile.close()
# Read all other trajectories
for jj in range(1, self.data['QT']):
flnm = base.replace('.dat', '_qt%09d.dat'%(jj + 1))
if(os.path.isfile(flnm)):
# Result file under simulation name available
Obsfile = open(flnm, 'r')
else:
# File not there
raise Exception('File not found for dynamic results.')
Tmpjj = Obs.read(Obsfile, self.data, True, qtid=jj + 1)
Obsfile.close()
self.add_qt(Tmp, Tmpjj)
self.average_qt(Tmp)
return Tmp
[docs] def read_hdf5(self, key):
"""
Read one variable from hdf5 file.
**Arguments**
key : str
Identifier of the measurement.
"""
import h5py # 'h5py' in sys.modules
has_qt = ('QT' in self.data) and (self.data['QT'] > 0)
if((self.case == 'MPS') and (not self.FiniteT) and (not self.imps)):
# Ground state results
# ....................
# Quick link to observables
Obs = self.data['MPSObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + '.h5'
if(os.path.isfile(flnm)):
fh5 = h5py.File(flnm, 'r')
else:
# Simulation with identic hash might exist - check
match = self.find_hash_in_map()
if(match is None):
raise OSMPS_H5File_Error(flnm + ' or equivalent hash')
match = match + '.h5'
if(os.path.isfile(match)):
fh5 = h5py.File(match, 'r')
else:
raise OSMPS_H5File_Error(flnm + ', i.e., its equivalent hash')
# Group handle
group = 'ObsOut' + str(0) + '_' + str(self.cp) + '.dat'
gh5 = fh5[group]
Tmp = Obs.read_hdf5(gh5, key, self.data, False)
fh5.close()
elif((self.case == 'MPS') and (not self.imps)):
# Finite temperature simulations
# ..............................
# Quick link to observables
Obs = self.data['MPSObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + '.h5'
if(os.path.isfile(flnm)):
# Result file under simulation name available
fh5 = h5py.File(flnm, 'r')
else:
# Simulation with identic hash might exist - check
raise OSMPS_H5File_Error(flnm)
# Group handle
group = 'ObsOutDynamics_%06d_%06d'%(self.cp, 0)
gh5 = fh5[group]
# Finite-T simulations use formatting from
# dynamics results --> dynamic=True
Tmp = Obs.read_hdf5(gh5, key, self.data, True)
fh5.close()
elif((self.case == 'eMPS') and (not self.imps)):
# Excited state results
# .....................
# Quick link to observables
Obs = self.data['eMPSObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + '.h5'
if(os.path.isfile(flnm)):
fh5 = h5py.File(flnm, 'r')
else:
# Simulation with identic hash might exist - check
print('Did not check for simulation with identic hash.')
raise OSMPS_H5File_Error(flnm)
# Group handle
group = 'ObsOut' + str(self.state) + '_' + str(self.cp) + '.dat'
gh5 = fh5[group]
Tmp = Obs.read_hdf5(gh5, key, self.data, False)
fh5.close()
elif(self.imps):
# Results from infinite MPS simulations
# -------------------------------------
# imps has its own methods to write results - they are not
# enabled for HDF5
raise OSMPS_H5Method_Error('iMPS')
elif((self.case == 'dynamics') and (not has_qt)):
# Results from time evolution
# ---------------------------
# Quick link to observables
Obs = self.data['DynamicsObservables']
# Construct filename
flnm = self.data['Output_Directory'] + self.data['job_ID']
flnm += self.data['unique_ID'] + '.h5'
if(os.path.isfile(flnm)):
fh5 = h5py.File(flnm, 'r')
else:
raise OSMPS_H5File_Error(flnm)
# Group handle
if('extradt' not in self.data):
group = 'ObsOutDynamics_%06d_%06d'%(self.cp + 1, self.state)
else:
group = 'ObsOutDynamics_%06d_extras'%(self.cp + 1)
gh5 = fh5[group]
Tmp = Obs.read_hdf5(gh5, key, self.data, True)
fh5.close()
elif(self.case == 'dynamics'):
# Results from quantum trajectory time evolution
# -----------------------------------------------
# Quick link to observables
Obs = self.data['DynamicsObservables']
# Construct filename
base = self.data['Output_Directory'] + self.data['job_ID']
base += self.data['unique_ID'] + 'ObsOutDynamics/Sim'
# Read first trajectory
flnm = base + '_qt%09d.h5'%(1)
if(os.path.isfile(flnm)):
# Result file under simulation name available
fh5 = h5py.File(flnm, 'r')
else:
# File not there
raise OSMPS_H5File_Error(flnm)
# Group handle
if('extradt' not in self.data):
group = 'ObsOutDynamics_%06d_%06d'%(self.cp + 1, self.state)
else:
group = 'ObsOutDynamics_%06d_extras'%(self.cp + 1)
gh5 = fh5[group]
Tmp = Obs.read_hdf5(gh5, key, self.data, True)
fh5.close()
# Read all the other trajectories
for jj in range(1, self.data['QT']):
flnm = base + '_qt%09d.h5'%(jj + 1)
if(os.path.isfile(flnm)):
# Result file under simulation name available
fh5 = h5py.File(flnm, 'r')
else:
# File not there
raise OSMPS_H5File_Error(flnm)
# Group handle
if('extradt' not in self.data):
group = 'ObsOutDynamics_%06d_%06d'%(self.cp + 1, self.state)
else:
group = 'ObsOutDynamics_%06d_extras'%(self.cp + 1)
gh5 = fh5[group]
Tmpjj = Obs.read_hdf5(gh5, key, self.data, True)
fh5.close()
self.add_qt(Tmp, Tmpjj)
self.average_qt(Tmp)
else:
raise Exception('Case not defined. Please report this to '
+ 'the OSMPS team.')
if(len(Tmp) == 0): raise OSMPS_H5Key_Error(key)
return Tmp
[docs] def add_qt(self, dic, dicjj):
"""
Combine results for quantum trajectories.
**Arguments**
dic : dictionary
Target dictionary with the overall results.
dicjj : dictionary
Dictionary with the results of one trajectory.
"""
for key in dicjj.keys():
if(key == 'error'):
dic['error'] = max(dic['error'], dicjj['error'])
else:
dic[key] = dic[key] + dicjj[key]
return
[docs] def average_qt(self, dic):
"""
Averaging procedure for quantum trajectories.
**Arguments**
dic : dictionary
Target dictionary with the overall results.
"""
for key in dic.keys():
if(key in ['error', 'CPU_time']):
# Error is maximum
# CPU should be added up
pass
else:
dic[key] = dic[key] / float(self.data['QT'])
return
[docs] def find_hash_in_map(self):
"""
Find where the observables can be found of identic ground states
"""
try:
flnm = self.data['Output_Directory']
except KeyError:
flnm = ''
cid = flnm + self.data['job_ID'] + self.data['unique_ID']
flnm += self.data['job_ID'] + "_static_mapping.dat"
# Set hash and match to None in case of empty file / no match
Hash = None
match = None
# Search for the hash
fh = open(flnm, 'r')
for line in fh:
if cid in line:
Hash = line.split()[1][:-1]
break
fh.close()
# Search for the file with the same hash
fh = open(flnm, 'r')
for line in fh:
if(Hash in line):
match = line.split()[0]
break
fh.close()
return match
[docs]def ReadStaticObservables(Parameters):
"""
Read the observables written out by Fortran program.
**Arguments**
Parameters : list
list of dictionaries where the dictionaries contain the
simulation settings.
**Details**
For infinite simulations see :py:func:`ReadInfiniteStaticObservables`.
The finite results consists of a list of dictionaries. Each dictionary
corresponds to one simulation. Simulation which could not be read
(missing/incomplete files) can either throw an exception or deliver ``None``
as entry in the list.
The default static finite system measurements are listed in the
following tables. In addition the measurments defined in
:py:class:`obsterms.Observables` are available:
+-----------------------+---------------------------------------+
| Tag | Meaning |
+=======================+=======================================+
| state | Eigenstate index increasing in energy |
| | with 0 the ground state. |
+-----------------------+---------------------------------------+
| convergence_parameter | Integer index indicating convergence |
| | criteria used. |
+-----------------------+---------------------------------------+
| energy | :math:`\langle \hat{H} \\rangle` |
+-----------------------+---------------------------------------+
| variance | :math:`\langle \hat{H}^2 - \langle` |
| | :math:`\hat{H} \\rangle^2 \\rangle` |
+-----------------------+---------------------------------------+
| converged | T (True) or F (False) according to |
| | whether the variance satisfies the |
| | desired tolerance. |
+-----------------------+---------------------------------------+
| bond_dimension | The maximal bond dimension of the |
| | MPS. |
+-----------------------+---------------------------------------+
The default static infinite system measurements can be found in the
corresponding class description :py:class:`obsterms.ResultsiMPS`. An instance
of this class is returned.
Errors in the process of reading can be catched upon the first
access to the any measurement. ``ReadStaticObservables`` just
generates an array of the result class, which will process the
requests to access results. An error handling could look as
follows
>>> Outputs = mps.ReadStaticObservables(params)
>>> energies = []
>>>
>>> for elem in Outputs:
>>> try:
>>> print(Outputs['energy'])
>>> energies.append(Outputs['energy'])
>>> except:
>>> print('Error processing result.')
>>> energies.append(np.nan)
The list energies has the expected length after reading the results
and non-existent values are set with nan (not a number).
"""
Outputs = []
for elem in Parameters:
# Read ground state / finite-T results
# ------------------------------------
if(elem['MPS'] and
isinstance(elem['MPSConvergenceParameters'], FiniteTConvParam)):
whichconv = 0
Tgrid = elem['MPSConvergenceParameters'].get_Tgrid()
for tt in Tgrid:
whichconv += 1
UnitObservable = Result(elem, 'MPS', conv_ii=whichconv,
FiniteT=True)
UnitObservable.data['Temperature'] = tt
Outputs.append(UnitObservable)
elif(elem['MPS']):
whichconv = 0
for ii in elem['MPSConvergenceParameters'].data:
whichconv += 1
Outputs.append(Result(elem, 'MPS', conv_ii=whichconv))
# Read excited states results
# ---------------------------
if(elem['eMPS']):
for ii in range(1, elem['n_excited_states'] + 1):
whichconv = 0
for qq in elem['eMPSConvergenceParameters'].data:
whichconv += 1
Outputs.append(Result(elem, 'eMPS', state=ii,
conv_ii=whichconv))
return Outputs
[docs]def ReadDynamicObservables(Parameters):
"""
Read the observables written out by Fortran program.
**Arguments**
Parameters : list
list of dictionaries where every dictionary contains the simulation
setting.
**Details**
The default dynamic measurements are listed in the following
tables. In addition the measurments defined in
:py:class:`obsterms.Observables` are available:
+-----------------------+---------------------------------------+
| Tag | Meaning |
+=======================+=======================================+
| time | current time |
+-----------------------+---------------------------------------+
| energy | :math:`\langle \hat{H} \\rangle` |
+-----------------------+---------------------------------------+
| Loschmidt_Echo | :math:`\langle \psi(0)` |
| | :math:`| \psi(t) \\rangle` |
+-----------------------+---------------------------------------+
| bond_dimension | The maximal bond dimension of the |
| | MPS. |
+-----------------------+---------------------------------------+
Errors in the process of reading can be catched upon the first
access to the any measurement. ``ReadDynamicObservables`` just
generates an array of the result class, which will process the
requests to access results. An error handling could look as
follows
>>> Outputs = mps.ReadDynamicObservables(params)
>>> energies = []
>>>
>>> for elem in Outputs:
>>> try:
>>> print(Outputs['energy'])
>>> energies.append(Outputs['energy'])
>>> except:
>>> print('Error processing result.')
>>> energies.append(np.nan)
The list energies has the expected length after reading the results
and non-existent values are set with nan (not a number).
"""
Outputs = []
for elem in Parameters:
if(elem['Quenches'].classname() == 'QuantumCircuits'):
Outputs.append(ReadCircuitsObservables(elem))
continue
if(not 'Dynamics' in elem):
raise Exception("No dynamics found in Parameters passed to " +
"ReadDynamicObservables!")
InnerOutputs = []
time = 0.0
Quenches = elem['Quenches'].data
for ii in range(elem['nquenches']):
myHparams = Quenches[ii]['Hparams']
nchanged = len(myHparams)
nsteps = int(np.floor(Quenches[ii]['time']
/ Quenches[ii]['deltat']))
extradt = Quenches[ii]['time'] - nsteps * Quenches[ii]['deltat']
hasextradt = (extradt > 0.000001)
for jj in range(1, nsteps + 1):
time += Quenches[ii]['deltat']
if(jj%Quenches[ii]['stepsforoutput'] == 0):
Res = Result(elem, 'dynamics', conv_ii=ii, state=jj)
cntr = 0
for hparams in myHparams:
Res.data[hparams] = Quenches[ii]['funcs'][cntr](time)
cntr += 1
InnerOutputs.append(Res)
if(hasextradt):
time += extradt
Res = Result(elem, 'dynamics', conv_ii=ii, state=jj)
Res.data['extradt'] = True
cntr = 0
for hparams in myHparams:
Res.data[hparams] = Quenches[ii]['funcs'][cntr](time)
cntr += 1
InnerOutputs.append(Res)
Outputs.append(InnerOutputs)
return Outputs
[docs]def ReadCircuitsObservables(elem):
"""
Read the observables from a quantum circuit simulation.
"""
if(('QT' in elem) and(elem['QT'] > 0)):
raise Exception("No QT in circuits.")
InnerOutputs = []
QC = elem['Quenches']
for jj in range(1, len(QC) + 1):
Res = Result(elem, 'dynamics', conv_ii=0, state=jj)
InnerOutputs.append(Res)
return InnerOutputs
[docs]def GetObservables(Outputs,tags,values):
"""
Find the set of all Outputs which have the values specified in tags.
**Arguments**
Outputs : list of results
This is a list containing all result dictionaries.
tags : variables or list of variables
These are the keys, i.e., the parameters you use for selection.
values : variable, list of variables
Values for the keys in the same order and of same length if list.
"""
if(not isinstance(tags, list)):
tags = [tags]
if(not isinstance(values, list)):
values = [values]
if(len(tags) != len(values)):
raise Exceptions("Exactly as many tags as values should be "
+ "specified in GetObservables!")
for tag in tags:
if tag not in Outputs[0]:
raise Exception("tag " + str(tag) + " not found in Outputs "
+ "passed to GetObservables!")
results = [match for match in Outputs \
if all(match[tags[i]]==values[i] for i in range(len(tags)))]
if(not results):
print('No results found with the desired values in GetObservables!')
return results
[docs]def get_runtime(param, qtid=None):
"""
Read runtime from log-file.
**Arguments**
param : dict
dictionary with simulation setup.
"""
flnm = param['Output_Directory'] + param['job_ID'] + param['unique_ID']
if(qtid is None):
flnm += '.log'
else:
flnm += '_qt%09d.log'%(qtid)
try:
fh = open(flnm, 'r')
for line in fh:
tmp = line
fh.close()
return float(tmp.split()[2])
except:
return 0.0
[docs]class OSMPS_H5Method_Error(Exception):
"""
Excpetion raised if an OSMPS method is not enabled for H5File at all,
but was trying to read with HDF5.
**Arguments __init__**
ret_val : str
Description of the case.
"""
def __init__(self, ret_val):
self.val = ret_val
return
[docs] def __str__(self):
"""
String representing error message.
"""
return "OSMPS cannot read the following with HDF5: " + str(self.val) + "!"
[docs]class OSMPS_H5File_Error(Exception):
"""
Excpetion raised if an H5File does not exist.
**Arguments __init__**
ret_val : str
The filename which does not exist
"""
def __init__(self, ret_val):
self.val = ret_val
return
[docs] def __str__(self):
"""
String representing error message.
"""
return "OSMPS did not find the following HDF5 file: " + str(self.val) + "!"
[docs]class OSMPS_H5Group_Error(Exception):
"""
Exception raised if an H5 group does not exist.
**Arguments __init__**
ret_val : str
Name of the group.
"""
def __init__(self, ret_val):
self.val = ret_val
return
[docs] def __str__(self):
"""
String representing error message.
"""
return "OSMPS did not find the following HDF5 file: " + str(self.val) + "!"
[docs]class OSMPS_H5Key_Error(Exception):
"""
Exception raised if an H5 did not find a key despite file, group, and data
set existing.
**Arguments __init__**
ret_val : str
Name of the key.
"""
def __init__(self, ret_val):
self.val = ret_val
return
[docs] def __str__(self):
"""
String representing error message.
"""
return "OSMPS did not find the following observable key with HDF5: " \
+ str(self.val) + "!"