Source code for oqupy.process_tensor

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module for the process tensor (PT) object. This code is based on [Pollock2018].

**[Pollock2018]**
F.  A.  Pollock,  C.  Rodriguez-Rosario,  T.  Frauenheim,
M. Paternostro, and K. Modi, *Non-Markovian quantumprocesses: Complete
framework and efficient characterization*, Phys. Rev. A 97, 012127 (2018).
"""


from abc import ABC, abstractmethod
import os
import tempfile
from typing import Optional, Text, Union
import warnings

import numpy as np
from numpy import ndarray
import tensornetwork as tn
import h5py

from oqupy.base_api import BaseAPIClass
from oqupy.config import NpDtype
from oqupy import util
from oqupy.version import __version__

[docs]class BaseProcessTensor(BaseAPIClass, ABC): """ Abstract base class for process tensors in matrix product operator form (PT-MPO). """ def __init__( self, hilbert_space_dimension: int, dt: Optional[float] = None, transform_in: Optional[ndarray] = None, transform_out: Optional[ndarray] = None, name: Optional[Text] = None, description: Optional[Text] = None) -> None: """Constructor of BaseProcessTensor. """ self._hs_dim = hilbert_space_dimension self._dt = dt self._rho_dim = self._hs_dim**2 self._trace = (np.identity(self._hs_dim, dtype=NpDtype) \ / np.sqrt(float(self._hs_dim))).flatten() self._trace_square = self._trace**2 if transform_in is not None: tmp_transform_in = np.array(transform_in, dtype=NpDtype) assert len(tmp_transform_in.shape) == 2 assert tmp_transform_in.shape[0] == self._rho_dim self._in_dim = tmp_transform_in.shape[1] self._transform_in = tmp_transform_in self._trace_in = self._trace @ self._transform_in else: self._in_dim = self._rho_dim self._transform_in = None self._trace_in = self._trace if transform_out is not None: tmp_transform_out = np.array(transform_out, dtype=NpDtype) assert len(tmp_transform_out.shape) == 2 assert tmp_transform_out.shape[1] == self._rho_dim self._out_dim = tmp_transform_out.shape[0] self._transform_out = tmp_transform_out self._trace_out = self._transform_out @ self._trace else: self._out_dim = self._rho_dim self._transform_out = None self._trace_out = self._trace super().__init__(name, description) @abstractmethod def __len__(self) -> int: """Length of process tensor.""" pass @property def hilbert_space_dimension(self): """Dimension of the system Hilbert space.""" return self._hs_dim @property def dt(self): """Time step length.""" return self._dt @property def max_step(self) -> Union[int, float]: """Maximal number of time steps.""" return len(self) @property def transform_in(self): """ Super operator that transforms from the system basis to the process tensor basis. """ return self._transform_in @property def transform_out(self): """ Super operator that transforms from the process tensor basis to the system basis. """ return self._transform_out
[docs] def set_initial_tensor( self, initial_tensor: Optional[ndarray] = None) -> None: """ Set the (possibly correlated) initial system state. """ raise RuntimeError( f"Class {type(self).__name__} doesn't allow to set the initial "\ +"tensor.")
[docs] def set_mpo_tensor( self, step: int, tensor: Optional[ndarray] = None) -> None: """ Set the MPO tensor for time step `step`. The axes correspond to the following legs: [0] ... past bond leg, [1] ... future bond leg, [2] ... input (from system) leg, [3] ... output (to system) leg. If the tensor is rank-3 then it is implicitly assumed that there is a delta between the input and output leg. """ raise RuntimeError( f"Class {type(self).__name__} doesn't allow to set the mpo "\ +"tensors.")
[docs] def set_cap_tensor( self, step: int, tensor: Optional[ndarray] = None) -> None: """ Set the cap tensor (vector) to terminate the PT-MPO at time step `step`. """ raise RuntimeError( f"Class {type(self).__name__} doesn't allow to set the cap "\ +"tensors.")
[docs] @abstractmethod def get_initial_tensor(self) -> ndarray: """ Get the (possibly correlated) initial system state. """ pass
[docs] @abstractmethod def get_mpo_tensor( self, step: int, transformed: Optional[bool] = True) -> ndarray: """ Get the MPO tensor for time step `step`. The axes correspond to the following legs: [0] ... past bond leg, [1] ... future bond leg, [2] ... input (from system) leg, [3] ... output (to system) leg. Applies the transformation (stored in `.transform_in` and `.transform_out`) when `transformed` is true. """ pass
[docs] @abstractmethod def get_cap_tensor(self, step: int) -> ndarray: """ Get the cap tensor (vector) to terminate the PT-MPO at time step `step`. """ pass
[docs] @abstractmethod def get_bond_dimensions(self) -> ndarray: """Return the bond dimensions of the process tensor MPO.""" pass
[docs]class TrivialProcessTensor(BaseProcessTensor): """ Trivial process tensors in matrix product operator form (PT-MPO) for a non-existent environment. """ def __init__( self, hilbert_space_dimension: Optional[int] = 1, name: Optional[Text] = None, description: Optional[Text] = None) -> None: """Constructor for TrivialProcessTensor. """ super().__init__( hilbert_space_dimension=hilbert_space_dimension, name=name, description=description) def __len__(self) -> int: """Length of process tensor. """ return 0 @property def max_step(self) -> Union[int, float]: """Maximal number of time steps.""" return float('inf')
[docs] def get_initial_tensor(self) -> ndarray: """ Get the (possibly correlated) initial system state. """ return None
[docs] def get_mpo_tensor( self, step: int, transformed: Optional[bool] = True) -> ndarray: """ Get the MPO tensor for time step `step`. """ return None
[docs] def get_cap_tensor(self, step: int) -> ndarray: """ Get the cap tensor (vector) to terminate the PT-MPO at time step `step`. """ return np.array([1.0], dtype=NpDtype)
[docs] def get_bond_dimensions(self) -> ndarray: """Return the bond dimensions of the MPS form of the process tensor.""" return None
[docs]class SimpleProcessTensor(BaseProcessTensor): """ Simple process tensors in matrix product operator form (PT-MPO). """ def __init__( self, hilbert_space_dimension: int, dt: Optional[float] = None, transform_in: Optional[ndarray] = None, transform_out: Optional[ndarray] = None, name: Optional[Text] = None, description: Optional[Text] = None) -> None: """Constructor of SimpleProcessTensor. """ self._initial_tensor = None self._mpo_tensors = [] self._cap_tensors = [] self._lam_tensors = [] super().__init__( hilbert_space_dimension, dt, transform_in, transform_out, name, description) def __len__(self) -> int: """Length of process tensor. """ return len(self._mpo_tensors)
[docs] def set_initial_tensor( self, initial_tensor: Optional[ndarray] = None) -> None: """ Set the (possibly correlated) initial system state. """ if initial_tensor is None: self._initial_tensor = None self._initial_tensor = np.array(initial_tensor, dtype=NpDtype)
[docs] def set_mpo_tensor( self, step: int, tensor: Optional[ndarray] = None) -> None: """ Set the MPO tensor for time step `step`. The axes correspond to the following legs: [0] ... past bond leg, [1] ... future bond leg, [2] ... input (from system) leg, [3] ... output (to system) leg. If the tensor is rank-3 then it is implicitly assumed that there is a delta between the input and output leg. """ length = len(self._mpo_tensors) if step >= length: self._mpo_tensors.extend([None] * (step - length + 1)) self._mpo_tensors[step] = np.array(tensor, dtype=NpDtype)
[docs] def set_cap_tensor( self, step: int, tensor: Optional[ndarray] = None) -> None: """ Set the cap tensor (vector) to terminate the PT-MPO at time step `step`. """ length = len(self._cap_tensors) if step >= length: self._cap_tensors.extend([None] * (step - length + 1)) self._cap_tensors[step] = np.array(tensor, dtype=NpDtype)
[docs] def get_initial_tensor(self) -> ndarray: """ Get the (possibly correlated) initial system state. """ return self._initial_tensor
[docs] def get_mpo_tensor( self, step: int, transformed: Optional[bool] = True) -> ndarray: """ Get the MPO tensor for time step `step`. The axes correspond to the following legs: [0] ... past bond leg, [1] ... future bond leg, [2] ... input (from system) leg, [3] ... output (to system) leg. Applies the transformation (stored in `.transform_in` and `.transform_out`) when `transformed` is true. """ length = len(self._mpo_tensors) if step >= length or step < 0: raise IndexError("Process tensor index out of bound. ") tensor = self._mpo_tensors[step] if len(tensor.shape) == 3: tensor = util.create_delta(tensor, [0, 1, 2, 2]) if transformed is False: return tensor if self._transform_in is not None: tensor = np.dot(np.moveaxis(tensor, -2, -1), self._transform_in.T) tensor = np.moveaxis(tensor, -1, -2) if self._transform_out is not None: tensor = np.dot(tensor, self._transform_out) return tensor
[docs] def get_cap_tensor(self, step: int) -> ndarray: """ Get the cap tensor (vector) to terminate the PT-MPO at time step `step`. """ length = len(self._cap_tensors) if step >= length or step < 0: return None return self._cap_tensors[step]
[docs] def get_bond_dimensions(self) -> ndarray: """ Return the bond dimensions of the MPS form of the process tensor. """ bond_dims = [] for mpo in self._mpo_tensors: if mpo is None: bond_dims.append(0) else: bond_dims.append(mpo.shape[0]) bond_dims.append(self._mpo_tensors[-1].shape[1]) return np.array(bond_dims)
[docs] def compute_caps(self) -> None: """ Compute and store all caps from the PT-MPO. """ length = len(self) caps = [np.array([1.0], dtype=NpDtype)] last_cap = tn.Node(caps[-1]) for step in reversed(range(length)): trace_square = tn.Node(self._trace_square) trace_in = tn.Node(self._trace_in) trace_out = tn.Node(self._trace_out) ten = tn.Node(self._mpo_tensors[step]) if len(ten.shape) == 3: ten[1] ^ last_cap[0] ten[2] ^ trace_square[0] new_cap = ten @ last_cap @ trace_square else: ten[1] ^ last_cap[0] ten[2] ^ trace_in[0] ten[3] ^ trace_out[0] new_cap = ten @ last_cap @ trace_in @ trace_out caps.insert(0, new_cap.get_tensor()) last_cap = new_cap self._cap_tensors = caps
[docs] def export(self, filename: Text, overwrite: bool = False): """Export the process tensor as a FileProcessTensor.""" if overwrite: mode = "overwrite" else: mode = "write" pt_file = FileProcessTensor( mode=mode, filename=filename, hilbert_space_dimension=self._hs_dim, dt=self._dt, transform_in=self._transform_in, transform_out=self._transform_out, name=self.name, description=self.description) pt_file.set_initial_tensor(self._initial_tensor) for step, mpo in enumerate(self._mpo_tensors): pt_file.set_mpo_tensor(step, mpo) for step, cap in enumerate(self._cap_tensors): pt_file.set_cap_tensor(step, cap) pt_file.close()
HDF5None = [np.nan] def _is_hdf5_none(tensor: ndarray): """Check if a tensor is the 'None' substitute for HDF5.""" return np.array(tensor).shape == (1,) and np.isnan(tensor[0])
[docs]class FileProcessTensor(BaseProcessTensor): """ Process tensors in matrix product operator form (PT-MPO) stored in a HDF5 file. """ def __init__( self, mode: Text, filename: Optional[Text] = None, hilbert_space_dimension: Optional[int] = None, dt: Optional[float] = None, transform_in: Optional[ndarray] = None, transform_out: Optional[ndarray] = None, name: Optional[Text] = None, description: Optional[Text] = None) -> None: """Constructor of FileProcessTensor. """ if mode == "read": self._write = False self._overwrite = False elif mode == "write": self._write = True self._overwrite = False elif mode == "overwrite": self._write = True self._overwrite = True else: raise ValueError("Parameter 'mode' must be one of 'read'/"\ + "'write'/'overwrite'!") self._f = None self._initial_tensor_data = None self._initial_tensor_shape = None self._mpo_tensors_data = None self._mpo_tensors_shape = None self._cap_tensors_data = None self._cap_tensors_shape = None if self._write: if filename is None: self._removeable = True tmp_filename = tempfile._get_default_tempdir() + "/pt_" \ + next(tempfile._get_candidate_names()) + ".hdf5" else: self._removeable = self._overwrite tmp_filename = filename assert isinstance(hilbert_space_dimension, int) super().__init__( hilbert_space_dimension, dt, transform_in, transform_out, name, description) self._filename = tmp_filename self._create_file(tmp_filename) else: assert filename is not None self._filename = filename self._removeable = False dictionary = self._read_file(filename) super().__init__(**dictionary) def _create_file(self, filename: Text): """Create HDF5 file with process tensor meta data.""" if self._overwrite: self._f = h5py.File(filename, "w") else: self._f = h5py.File(filename, "x") self._f.attrs["oqupy_version"] = __version__ self._f.attrs["name"] = self.name self._f.attrs["description"] = self.description self._f.attrs["writing"] = True data_type = h5py.vlen_dtype(np.dtype('complex128')) shape_type = h5py.vlen_dtype(np.dtype('i')) self._f.create_dataset("hs_dim", (1,), dtype='i', data=[self._hs_dim]) if self._dt is None: self._f.create_dataset("dt", (1,), dtype='float64', data=HDF5None) else: self._f.create_dataset( "dt", (1,), dtype='float64', data=[self._dt]) if self._transform_in is None: self._f.create_dataset("transform_in", (1,), dtype='complex128', data=HDF5None) else: self._f.create_dataset("transform_in", self._transform_in.shape, dtype='complex128', data=self._transform_in) if self._transform_out is None: self._f.create_dataset("transform_out", (1,), dtype='complex128', data=HDF5None) else: self._f.create_dataset("transform_out", self._transform_out.shape, dtype='complex128', data=self._transform_out) self._initial_tensor_data = self._f.create_dataset( "initial_tensor_data", (1,), dtype=data_type) self._initial_tensor_shape = self._f.create_dataset( "initial_tensor_shape", (1,), dtype=shape_type) self._mpo_tensors_data = self._f.create_dataset( "mpo_tensors_data", (0,), dtype=data_type, maxshape=(None,)) self._mpo_tensors_shape = self._f.create_dataset( "mpo_tensors_shape", (0,), dtype=shape_type, maxshape=(None,)) self._cap_tensors_data = self._f.create_dataset( "cap_tensors_data", (0,), dtype=data_type, maxshape=(None,)) self._cap_tensors_shape = self._f.create_dataset( "cap_tensors_shape", (0,), dtype=shape_type, maxshape=(None,)) self.set_initial_tensor(initial_tensor=None) def _read_file(self, filename: Text): """Read in process tensor meta data from HDF5 file.""" self._create = False self._f = h5py.File(filename, "r") try: if self._f.attrs["oqupy_version"] != __version__: warnings.warn( "HDF5 file appears to have been created with version "\ +f"{self._f.attrs['oqupy_version']}, while this is "\ +f"version {__version__}. "\ +"Correct import is not guaranteed.", UserWarning) except KeyError: warnings.warn( "HDF5 file does not seem to have an 'oqupy_version' "\ +"attribute. Correct import is not guaranteed.", UserWarning) name = self._f.attrs["name"] description = self._f.attrs["description"] if self._f.attrs["writing"] is True: warnings.warn( "File was closed during writing process and hence " \ "may be corrupt.", UserWarning) # hilber space dimension hs_dim = int(self._f["hs_dim"][0]) # time step dt dt = self._f["dt"] if _is_hdf5_none(dt): dt = None else: dt = dt[0] # transforms transform_in = np.array(self._f["transform_in"]) if _is_hdf5_none(transform_in): transform_in = None transform_out = np.array(self._f["transform_out"]) if _is_hdf5_none(transform_out): transform_out = None # initial tensor and mpo/cap/lam tensors self._initial_tensor_data = self._f["initial_tensor_data"] self._initial_tensor_shape = self._f["initial_tensor_shape"] self._mpo_tensors_data = self._f["mpo_tensors_data"] self._mpo_tensors_shape = self._f["mpo_tensors_shape"] self._cap_tensors_data = self._f["cap_tensors_data"] self._cap_tensors_shape = self._f["cap_tensors_shape"] return { "hilbert_space_dimension":hs_dim, "dt":dt, "transform_in":transform_in, "transform_out":transform_out, "name":name, "description":description, } def __len__(self) -> int: """Length of process tensor.""" return self._mpo_tensors_shape.shape[0] @property def name(self): """Name of the object. """ return self._name @name.setter def name(self, new_name: Text = None): if new_name is None: new_name = "__unnamed__" else: assert isinstance(new_name, Text), "Name must be text." self._name = new_name if self._write and self._f is not None: self._f.attrs['name'] = self._name @property def description(self): """Detailed description of the object. """ return self._description @description.setter def description(self, new_description: Text = None): if new_description is None: new_description = "__no_description__" else: assert isinstance(new_description, Text), \ "Description must be text." self._description = new_description if self._write and self._f is not None: self._f.attrs['description'] = self._description @property def filename(self): """Filename of the HDF5 file. """ return self._filename
[docs] def close(self): """Close the HDF5 file.""" if self._f is not None: if self._f.attrs["writing"] is True: self._f.attrs["writing"] = False self._f.close()
[docs] def remove(self): """Delete the HDF5 file. """ self.close() if self._removeable: os.remove(self._filename) else: raise FileExistsError("This process tensor file cannot be removed.")
[docs] def set_initial_tensor( self, initial_tensor: Optional[ndarray] = None) -> None: """ Set the (possibly correlated) initial system state. """ _set_data_and_shape(step=0, data=self._initial_tensor_data, shape=self._initial_tensor_shape, tensor=initial_tensor)
[docs] def set_mpo_tensor( self, step: int, tensor: Optional[ndarray]=None) -> None: """ Set the MPO tensor for time step `step`. The axes correspond to the following legs: [0] ... past bond leg, [1] ... future bond leg, [2] ... input (from system) leg, [3] ... output (to system) leg. If the tensor is rank-3 then it is implicitly assumed that there is a delta between the input and output leg. """ _set_data_and_shape(step, data=self._mpo_tensors_data, shape=self._mpo_tensors_shape, tensor=tensor)
[docs] def set_cap_tensor( self, step: int, tensor: Optional[ndarray]=None) -> None: """ Set the cap tensor (vector) to terminate the PT-MPO at time step `step`. """ _set_data_and_shape(step, data=self._cap_tensors_data, shape=self._cap_tensors_shape, tensor=tensor)
[docs] def get_initial_tensor(self) -> ndarray: """ Get the (possibly correlated) initial system state. """ return _get_data_and_shape(step=0, data=self._initial_tensor_data, shape=self._initial_tensor_shape)
[docs] def get_mpo_tensor( self, step: int, transformed: Optional[bool] = True) -> ndarray: """ Get the MPO tensor for time step `step`. The axes correspond to the following legs: [0] ... past bond leg, [1] ... future bond leg, [2] ... input (from system) leg, [3] ... output (to system) leg. Applies the transformation (stored in `.transform_in` and `.transform_out`) when `transformed` is true. """ tensor = _get_data_and_shape(step, data=self._mpo_tensors_data, shape=self._mpo_tensors_shape) if transformed: if len(tensor.shape) == 3: tensor = util.create_delta(tensor, [0, 1, 2, 2]) if self._transform_in is not None: tensor = np.dot(np.moveaxis(tensor, -2, -1), self._transform_in.T) tensor = np.moveaxis(tensor, -1, -2) if self._transform_out is not None: tensor = np.dot(tensor, self._transform_out) return tensor
[docs] def get_cap_tensor(self, step: int) -> ndarray: """ Get the cap tensor (vector) to terminate the PT-MPO at time step `step`. """ try: tensor = _get_data_and_shape(step, data=self._cap_tensors_data, shape=self._cap_tensors_shape) except IndexError: tensor = None return tensor
[docs] def get_bond_dimensions(self) -> ndarray: """Return the bond dimensions of the MPS form of the process tensor.""" bond_dims = [] for tensor_shape in self._mpo_tensors_shape: bond_dims.append(tensor_shape[0]) bond_dims.append(self._mpo_tensors_shape[-1][1]) return np.array(bond_dims)
[docs] def compute_caps(self) -> None: """ Compute and store all caps from the PT-MPO. """ length = len(self) cap = np.array([1.0], dtype=NpDtype) self.set_cap_tensor(length, cap) last_cap = tn.Node(cap) for step in reversed(range(length)): trace_in = tn.Node(self._trace_in) trace_out = tn.Node(self._trace_out) ten = tn.Node(self.get_mpo_tensor(step)) ten[1] ^ last_cap[0] ten[2] ^ trace_in[0] ten[3] ^ trace_out[0] new_cap = ten @ last_cap @ trace_in @ trace_out self.set_cap_tensor(step, new_cap.get_tensor()) last_cap = new_cap
def _set_data_and_shape(step, data, shape, tensor): """Write arbitrarily shaped tensor into HDF5 file.""" if tensor is None: tensor = np.array(HDF5None) if step >= shape.shape[0]: shape.resize((step+1,)) if step >= data.shape[0]: data.resize((step+1,)) shape[step] = tensor.shape tensor = tensor.reshape(-1) data[step] = tensor def _get_data_and_shape(step, data, shape) -> ndarray: """Read arbitrarily shaped tensor from HDF5 file.""" if step >= shape.shape[0]: raise IndexError("Process tensor index out of bound!") tensor_shape = shape[step] tensor = data[step] tensor = tensor.reshape(tensor_shape) if _is_hdf5_none(tensor): tensor = None return tensor
[docs]def import_process_tensor( filename: Text, process_tensor_type: Text = None) -> BaseProcessTensor: """ Import a process tensor from a file. Parameters ---------- filename: Text Filepath to the .hdf5 file. process_tensor_type: Text Type of process tensor object to create. May be 'file' to create `FileProcessTensor`, or 'simple', to create a `SimpleProcessTensor`. Returns ------- process_tensor: BaseProcessTensor The process tensor object with the data from the .hdf5 file. """ pt_file = FileProcessTensor(mode="read", filename=filename) if process_tensor_type is None or process_tensor_type == "file": pt = pt_file elif process_tensor_type == "simple": pt = SimpleProcessTensor( hilbert_space_dimension=pt_file.hilbert_space_dimension, dt=pt_file.dt, transform_in=pt_file.transform_in, transform_out=pt_file.transform_out, name=pt_file.name, description=pt_file.description) pt.set_initial_tensor(pt_file.get_initial_tensor()) step = 0 while True: try: mpo = pt_file.get_mpo_tensor(step, transformed=False) except IndexError: break pt.set_mpo_tensor(step, mpo) step += 1 step = 0 while True: cap = pt_file.get_cap_tensor(step) if cap is None: break pt.set_cap_tensor(step, cap) step += 1 else: raise ValueError("Parameter 'process_tensor_type' must be "\ + "'file' or 'simple'!") return pt