# Copyright 2022 The TEMPO Collaboration
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module for the process tensor (PT) object. This code is based on [Pollock2018].
**[Pollock2018]**
F. A. Pollock, C. Rodriguez-Rosario, T. Frauenheim,
M. Paternostro, and K. Modi, *Non-Markovian quantumprocesses: Complete
framework and efficient characterization*, Phys. Rev. A 97, 012127 (2018).
"""
import os
import tempfile
from typing import Optional, Text, Union
import numpy as np
from numpy import ndarray
import tensornetwork as tn
import h5py
from oqupy.base_api import BaseAPIClass
from oqupy.config import NpDtype
from oqupy import util
[docs]class BaseProcessTensor(BaseAPIClass):
"""
ToDo
"""
def __init__(
self,
hilbert_space_dimension: int,
dt: Optional[float] = None,
transform_in: Optional[ndarray] = None,
transform_out: Optional[ndarray] = None,
name: Optional[Text] = None,
description: Optional[Text] = None) -> None:
"""ToDo. """
self._hs_dim = hilbert_space_dimension
self._dt = dt
self._rho_dim = self._hs_dim**2
self._trace = (np.identity(self._hs_dim, dtype=NpDtype) \
/ np.sqrt(float(self._hs_dim))).flatten()
self._trace_square = self._trace**2
if transform_in is not None:
tmp_transform_in = np.array(transform_in, dtype=NpDtype)
assert len(tmp_transform_in.shape) == 2
assert tmp_transform_in.shape[0] == self._rho_dim
self._in_dim = tmp_transform_in.shape[1]
self._transform_in = tmp_transform_in
self._trace_in = self._trace @ self._transform_in
else:
self._in_dim = self._rho_dim
self._transform_in = None
self._trace_in = self._trace
if transform_out is not None:
tmp_transform_out = np.array(transform_out, dtype=NpDtype)
assert len(tmp_transform_out.shape) == 2
assert tmp_transform_out.shape[1] == self._rho_dim
self._out_dim = tmp_transform_out.shape[0]
self._transform_out = tmp_transform_out
self._trace_out = self._transform_out @ self._trace
else:
self._out_dim = self._rho_dim
self._transform_out = None
self._trace_out = self._trace
super().__init__(name, description)
@property
def hilbert_space_dimension(self):
"""ToDo. """
return self._hs_dim
@property
def dt(self):
"""ToDo. """
return self._dt
@property
def transform_in(self):
"""ToDo. """
return self._transform_in
@property
def max_step(self) -> Union[int, float]:
"""ToDo"""
return len(self)
@property
def transform_out(self):
"""ToDo. """
return self._transform_out
def __len__(self) -> int:
"""Length of process tensor."""
raise NotImplementedError(
"Class {} has no __len__() implementation.".format(
type(self).__name__))
[docs] def set_initial_tensor(
self,
initial_tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
raise NotImplementedError(
"Class {} has no set_initial_tensor() implementation.".format(
type(self).__name__))
[docs] def get_initial_tensor(self) -> ndarray:
"""ToDo. """
raise NotImplementedError(
"Class {} has no get_initial_tensor() implementation.".format(
type(self).__name__))
[docs] def set_mpo_tensor(
self,
step: int,
tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
raise NotImplementedError(
"Class {} has no set_mpo_tensor() implementation.".format(
type(self).__name__))
[docs] def get_mpo_tensor(
self,
step: int,
transformed: Optional[bool] = True) -> ndarray:
"""ToDo. """
raise NotImplementedError(
"Class {} has no get_mpo_tensor() implementation.".format(
type(self).__name__))
[docs] def set_cap_tensor(
self,
step: int,
tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
raise NotImplementedError(
"Class {} has no set_cap_tensor() implementation.".format(
type(self).__name__))
[docs] def get_cap_tensor(self, step: int) -> ndarray:
"""ToDo. """
raise NotImplementedError(
"Class {} has no get_cap_tensor() implementation.".format(
type(self).__name__))
[docs] def set_lam_tensor(
self,
step: int,
tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
raise NotImplementedError(
"Class {} has no set_lam_tensor() implementation.".format(
type(self).__name__))
[docs] def get_lam_tensor(self, step: int) -> ndarray:
"""ToDo. """
raise NotImplementedError(
"Class {} has no get_lam_tensor() implementation.".format(
type(self).__name__))
[docs] def get_bond_dimensions(self) -> ndarray:
"""Return the bond dimensions of the MPS form of the process tensor."""
raise NotImplementedError(
"Class {} has no get_bond_dimensions() implementation.".format(
type(self).__name__))
[docs]class TrivialProcessTensor(BaseProcessTensor):
"""
ToDo
"""
def __init__(
self,
hilbert_space_dimension: Optional[int] = 1,
name: Optional[Text] = None,
description: Optional[Text] = None) -> None:
"""ToDo. """
super().__init__(
hilbert_space_dimension=hilbert_space_dimension,
name=name,
description=description)
def __len__(self) -> int:
"""Length of process tensor. """
return 0
@property
def max_step(self) -> Union[int, float]:
return float('inf')
def get_initial_tensor(self) -> ndarray:
"""ToDo. """
return None
def get_mpo_tensor(
self,
step: int,
transformed: Optional[bool] = True) -> ndarray:
"""ToDo. """
return None
def get_cap_tensor(self, step: int) -> ndarray:
"""ToDo. """
return np.array([1.0], dtype=NpDtype)
def get_lam_tensor(self, step: int) -> ndarray:
"""ToDo. """
return None
def get_bond_dimensions(self) -> ndarray:
"""Return the bond dimensions of the MPS form of the process tensor."""
return None
[docs]class SimpleProcessTensor(BaseProcessTensor):
"""
ToDo
"""
def __init__(
self,
hilbert_space_dimension: int,
dt: Optional[float] = None,
transform_in: Optional[ndarray] = None,
transform_out: Optional[ndarray] = None,
name: Optional[Text] = None,
description: Optional[Text] = None) -> None:
"""ToDo. """
self._initial_tensor = None
self._mpo_tensors = []
self._cap_tensors = []
self._lam_tensors = []
super().__init__(
hilbert_space_dimension,
dt,
transform_in,
transform_out,
name,
description)
def __len__(self) -> int:
"""Length of process tensor. """
return len(self._mpo_tensors)
def set_initial_tensor(
self,
initial_tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
if initial_tensor is None:
self._initial_tensor = None
self._initial_tensor = np.array(initial_tensor, dtype=NpDtype)
def get_initial_tensor(self) -> ndarray:
"""ToDo. """
self._initial_tensor
def set_mpo_tensor(
self,
step: int,
tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
length = len(self._mpo_tensors)
if step >= length:
self._mpo_tensors.extend([None] * (step - length + 1))
self._mpo_tensors[step] = np.array(tensor, dtype=NpDtype)
def get_mpo_tensor(
self,
step: int,
transformed: Optional[bool] = True) -> ndarray:
"""ToDo. """
length = len(self._mpo_tensors)
if step >= length or step < 0:
raise IndexError("Process tensor index out of bound. ")
tensor = self._mpo_tensors[step]
if len(tensor.shape) == 3:
tensor = util.create_delta(tensor, [0, 1, 2, 2])
if self._transform_in is not None:
tensor = np.dot(np.moveaxis(tensor, -2, -1),
self._transform_in.T)
tensor = np.moveaxis(tensor, -1, -2)
if self._transform_out is not None:
tensor = np.dot(tensor, self._transform_out)
return tensor
def set_cap_tensor(
self,
step: int,
tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
length = len(self._cap_tensors)
if step >= length:
self._cap_tensors.extend([None] * (step - length + 1))
self._cap_tensors[step] = np.array(tensor, dtype=NpDtype)
def get_cap_tensor(self, step: int) -> ndarray:
"""ToDo. """
length = len(self._cap_tensors)
if step >= length or step < 0:
return None
return self._cap_tensors[step]
def set_lam_tensor(
self,
step: int,
tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
length = len(self._lam_tensors)
if step >= length:
self._lam_tensors.extend([None] * (step - length + 1))
self._lam_tensors[step] = np.array(tensor, dtype=NpDtype)
def get_lam_tensor(self, step: int) -> ndarray:
"""ToDo. """
length = len(self._lam_tensors)
if step >= length or step < 0:
return None
return self._lam_tensors[step]
def get_bond_dimensions(self) -> ndarray:
"""Return the bond dimensions of the MPS form of the process tensor."""
bond_dims = []
for mpo in self._mpo_tensors:
if mpo is None:
bond_dims.append(0)
else:
bond_dims.append(mpo.shape[0])
bond_dims.append(self._mpo_tensors[-1].shape[1])
return np.array(bond_dims)
def compute_caps(self) -> None:
"""ToDo. """
length = len(self)
caps = [np.array([1.0], dtype=NpDtype)]
last_cap = tn.Node(caps[-1])
for step in reversed(range(length)):
trace_square = tn.Node(self._trace_square)
trace_in = tn.Node(self._trace_in)
trace_out = tn.Node(self._trace_out)
ten = tn.Node(self._mpo_tensors[step])
if len(ten.shape) == 3:
ten[1] ^ last_cap[0]
ten[2] ^ trace_square[0]
new_cap = ten @ last_cap @ trace_square
else:
ten[1] ^ last_cap[0]
ten[2] ^ trace_in[0]
ten[3] ^ trace_out[0]
new_cap = ten @ last_cap @ trace_in @ trace_out
caps.insert(0, new_cap.get_tensor())
last_cap = new_cap
self._cap_tensors = caps
def export(self, filename: Text, overwrite: bool = False):
"""ToDo. """
if overwrite:
mode = "overwrite"
else:
mode = "write"
pt_file = FileProcessTensor(
mode=mode,
filename=filename,
hilbert_space_dimension=self._hs_dim,
dt=self._dt,
transform_in=self._transform_in,
transform_out=self._transform_out,
name=self.name,
description=self.description)
pt_file.set_initial_tensor(self._initial_tensor)
for step, mpo in enumerate(self._mpo_tensors):
pt_file.set_mpo_tensor(step, mpo)
for step, cap in enumerate(self._cap_tensors):
pt_file.set_cap_tensor(step, cap)
for step, lam in enumerate(self._lam_tensors):
pt_file.set_lam_tensor(step, lam)
pt_file.close()
[docs]class FileProcessTensor(BaseProcessTensor):
"""
ToDo
"""
def __init__(
self,
mode: Text,
filename: Optional[Text] = None,
hilbert_space_dimension: Optional[int] = None,
dt: Optional[float] = None,
transform_in: Optional[ndarray] = None,
transform_out: Optional[ndarray] = None,
name: Optional[Text] = None,
description: Optional[Text] = None) -> None:
"""ToDo. """
if mode == "read":
write = False
overwrite = False
elif mode == "write":
write = True
overwrite = False
elif mode == "overwrite":
write = True
overwrite = True
else:
raise ValueError("Parameter 'mode' must be one of 'read'/"\
+ "'write'/'overwrite'!")
self._f = None
self._initial_tensor_data = None
self._initial_tensor_shape = None
self._mpo_tensors_data = None
self._mpo_tensors_shape = None
self._cap_tensors_data = None
self._cap_tensors_shape = None
self._lam_tensors_data = None
self._lam_tensors_shape = None
if write:
if filename is None:
self._removeable = True
tmp_filename = tempfile._get_default_tempdir() + "/pt_" \
+ next(tempfile._get_candidate_names()) + ".hdf5"
else:
self._removeable = overwrite
tmp_filename = filename
assert isinstance(hilbert_space_dimension, int)
super().__init__(
hilbert_space_dimension,
dt,
transform_in,
transform_out,
name,
description)
self._filename = tmp_filename
self._create_file(tmp_filename, overwrite)
else:
assert filename is not None
self._filename = filename
self._removeable = False
dictionary = self._read_file(filename)
super().__init__(**dictionary)
def _create_file(self, filename: Text, overwrite: bool):
"""ToDo."""
if overwrite:
self._f = h5py.File(filename, "w")
else:
self._f = h5py.File(filename, "x")
data_type = h5py.vlen_dtype(np.dtype('complex128'))
shape_type = h5py.vlen_dtype(np.dtype('i'))
self._f.create_dataset("hs_dim", (1,), dtype='i', data=[self._hs_dim])
if self._dt is None:
self._f.create_dataset("dt", (1,), dtype='float64', data=[0.0])
else:
self._f.create_dataset(
"dt", (1,), dtype='float64', data=[self._dt])
if self._transform_in is None:
self._f.create_dataset("transform_in",
(1,),
dtype='complex128',
data=[0.0])
else:
self._f.create_dataset("transform_in",
self._transform_in.shape,
dtype='complex128',
data=self._transform_in)
if self._transform_out is None:
self._f.create_dataset("transform_out",
(1,),
dtype='complex128',
data=[0.0])
else:
self._f.create_dataset("transform_out",
self._transform_out.shape,
dtype='complex128',
data=self._transform_out)
self._initial_tensor_data = self._f.create_dataset(
"initial_tensor_data", (1,), dtype=data_type)
self._initial_tensor_shape = self._f.create_dataset(
"initial_tensor_shape", (1,), dtype=shape_type)
self._mpo_tensors_data = self._f.create_dataset(
"mpo_tensors_data", (0,), dtype=data_type, maxshape=(None,))
self._mpo_tensors_shape = self._f.create_dataset(
"mpo_tensors_shape", (0,), dtype=shape_type, maxshape=(None,))
self._cap_tensors_data = self._f.create_dataset(
"cap_tensors_data", (0,), dtype=data_type, maxshape=(None,))
self._cap_tensors_shape = self._f.create_dataset(
"cap_tensors_shape", (0,), dtype=shape_type, maxshape=(None,))
self._lam_tensors_data = self._f.create_dataset(
"lam_tensors_data", (0,), dtype=data_type, maxshape=(None,))
self._lam_tensors_shape = self._f.create_dataset(
"lam_tensors_shape", (0,), dtype=shape_type, maxshape=(None,))
self.set_initial_tensor(initial_tensor=None)
def _read_file(self, filename: Text):
"""ToDo."""
self._create = False
self._f = h5py.File(filename, "r")
# hilber space dimension
hs_dim = int(self._f["hs_dim"][0])
# time step dt
dt = float(self._f["dt"][0])
if dt == 0.0:
dt = None
# transforms
transform_in = np.array(self._f["transform_in"])
if np.allclose(transform_in, np.array([0.0])):
transform_in = None
transform_out = np.array(self._f["transform_out"])
if np.allclose(transform_out, np.array([0.0])):
transform_out = None
# initial tensor and mpo/cap/lam tensors
self._initial_tensor_data = self._f["initial_tensor_data"]
self._initial_tensor_shape = self._f["initial_tensor_shape"]
self._mpo_tensors_data = self._f["mpo_tensors_data"]
self._mpo_tensors_shape = self._f["mpo_tensors_shape"]
self._cap_tensors_data = self._f["cap_tensors_data"]
self._cap_tensors_shape = self._f["cap_tensors_shape"]
self._lam_tensors_data = self._f["lam_tensors_data"]
self._lam_tensors_shape = self._f["lam_tensors_shape"]
return {
"hilbert_space_dimension":hs_dim,
"dt":dt,
"transform_in":transform_in,
"transform_out":transform_out,
"name":None,
"description":None,
}
def __len__(self) -> int:
"""Length of process tensor."""
return self._mpo_tensors_shape.shape[0]
@property
def filename(self):
"""ToDo. """
return self._filename
def close(self):
"""ToDo. """
if self._f is not None:
self._f.close()
def remove(self):
"""ToDo. """
self.close()
if self._removeable:
os.remove(self._filename)
else:
raise FileExistsError("This process tensor file cannot be removed.")
def set_initial_tensor(
self,
initial_tensor: Optional[ndarray] = None) -> None:
"""ToDo. """
_set_data_and_shape(step=0,
data=self._initial_tensor_data,
shape=self._initial_tensor_shape,
tensor=initial_tensor)
def get_initial_tensor(self) -> ndarray:
"""ToDo. """
return _get_data_and_shape(step=0,
data=self._initial_tensor_data,
shape=self._initial_tensor_shape)
def set_mpo_tensor(
self,
step: int,
tensor: Optional[ndarray]=None) -> None:
"""ToDo. """
if tensor is None:
tmp_tensor = np.array([0.0])
else:
tmp_tensor = tensor
_set_data_and_shape(step,
data=self._mpo_tensors_data,
shape=self._mpo_tensors_shape,
tensor=tmp_tensor)
def get_mpo_tensor(
self,
step: int,
transformed: Optional[bool] = True) -> ndarray:
"""ToDo. """
tensor = _get_data_and_shape(step,
data=self._mpo_tensors_data,
shape=self._mpo_tensors_shape)
if transformed:
if len(tensor.shape) == 3:
tensor = util.create_delta(tensor, [0, 1, 2, 2])
if self._transform_in is not None:
tensor = np.dot(np.moveaxis(tensor, -2, -1),
self._transform_in.T)
tensor = np.moveaxis(tensor, -1, -2)
if self._transform_out is not None:
tensor = np.dot(tensor, self._transform_out)
return tensor
def set_cap_tensor(
self,
step: int,
tensor: Optional[ndarray]=None) -> None:
"""ToDo. """
_set_data_and_shape(step,
data=self._cap_tensors_data,
shape=self._cap_tensors_shape,
tensor=tensor)
def get_cap_tensor(self, step: int) -> ndarray:
"""ToDo. """
try:
tensor = _get_data_and_shape(step,
data=self._cap_tensors_data,
shape=self._cap_tensors_shape)
except IndexError:
tensor = None
return tensor
def set_lam_tensor(
self,
step: int,
tensor: Optional[ndarray]=None) -> None:
"""ToDo. """
_set_data_and_shape(step,
data=self._lam_tensors_data,
shape=self._lam_tensors_shape,
tensor=tensor)
def get_lam_tensor(self, step: int) -> ndarray:
"""ToDo. """
try:
tensor = _get_data_and_shape(step,
data=self._lam_tensors_data,
shape=self._lam_tensors_shape)
except IndexError:
tensor = None
return tensor
def get_bond_dimensions(self) -> ndarray:
"""Return the bond dimensions of the MPS form of the process tensor."""
bond_dims = []
for tensor_shape in self._mpo_tensors_shape:
bond_dims.append(tensor_shape[0])
bond_dims.append(self._mpo_tensors_shape[-1][1])
return np.array(bond_dims)
def compute_caps(self) -> None:
"""ToDo. """
length = len(self)
cap = np.array([1.0], dtype=NpDtype)
self.set_cap_tensor(length, cap)
last_cap = tn.Node(cap)
for step in reversed(range(length)):
trace_in = tn.Node(self._trace_in)
trace_out = tn.Node(self._trace_out)
ten = tn.Node(self.get_mpo_tensor(step))
ten[1] ^ last_cap[0]
ten[2] ^ trace_in[0]
ten[3] ^ trace_out[0]
new_cap = ten @ last_cap @ trace_in @ trace_out
self.set_cap_tensor(step, new_cap.get_tensor())
last_cap = new_cap
def _set_data_and_shape(step, data, shape, tensor):
"""ToDo."""
if tensor is None:
tensor = np.array([0.0])
if step >= shape.shape[0]:
shape.resize((step+1,))
if step >= data.shape[0]:
data.resize((step+1,))
shape[step] = tensor.shape
tensor = tensor.reshape(-1)
data[step] = tensor
def _get_data_and_shape(step, data, shape) -> ndarray:
"""ToDo."""
if step >= shape.shape[0]:
raise IndexError("Process tensor index out of bound!")
tensor_shape = shape[step]
tensor = data[step]
tensor = tensor.reshape(tensor_shape)
if tensor.shape == (1,) and tensor == 0.0:
tensor = None
return tensor
[docs]def import_process_tensor(
filename: Text,
process_tensor_type: Text = None) -> BaseProcessTensor:
"""
ToDo.
"""
pt_file = FileProcessTensor(mode="read", filename=filename)
if process_tensor_type is None or process_tensor_type == "file":
pt = pt_file
elif process_tensor_type == "simple":
pt = SimpleProcessTensor(
hilbert_space_dimension=pt_file.hilbert_space_dimension,
dt=pt_file.dt,
transform_in=pt_file.transform_in,
transform_out=pt_file.transform_out,
name=pt_file.name,
description=pt_file.description)
pt.set_initial_tensor(pt_file.get_initial_tensor())
step = 0
while True:
try:
mpo = pt_file.get_mpo_tensor(step, transformed=False)
except IndexError:
break
pt.set_mpo_tensor(step, mpo)
step += 1
step = 0
while True:
cap = pt_file.get_cap_tensor(step)
if cap is None:
break
pt.set_cap_tensor(step, cap)
step += 1
step = 0
while True:
lam = pt_file.get_lam_tensor(step)
if lam is None:
break
pt.set_lam_tensor(step, lam)
step += 1
else:
raise ValueError("Parameter 'process_tensor_type' must be "\
+ "'file' or 'simple'!")
return pt