Source code for dclab.lme4.wrapr

"""R lme4 wrapper"""
import logging
import numbers
import pathlib
import tempfile

import importlib_resources
import numpy as np

from .. import definitions as dfn
from ..rtdc_dataset.core import RTDCBase

from . import rsetup


logger = logging.getLogger(__name__)


[docs] class Rlme4(object): def __init__(self, model="lmer", feature="deform"): """Perform an R-lme4 analysis with RT-DC data Parameters ---------- model: str One of: - "lmer": linear mixed model using lme4's ``lmer`` - "glmer+loglink": generalized linear mixed model using lme4's ``glmer`` with an additional a log-link function via the ``family=Gamma(link='log'))`` keyword. feature: str Dclab feature for which to compute the model """ #: modeling method to use (e.g. "lmer") self.model = None #: dclab feature for which to perform the analysis self.feature = None #: list of [RTDCBase, column, repetition, chip_region] self.data = [] self.set_options(model=model, feature=feature) # Make sure that lme4 is available if not rsetup.has_lme4(): logger.info("Installing lme4, this may take a while!") rsetup.require_lme4()
[docs] def add_dataset(self, ds, group, repetition): """Add a dataset to the analysis list Parameters ---------- ds: RTDCBase Dataset group: str The group the measurement belongs to ("control" or "treatment") repetition: int Repetition of the measurement Notes ----- - For each repetition, there must be a "treatment" (``1``) and a "control" (``0``) group. - If you would like to perform a differential feature analysis, then you need to pass at least a reservoir and a channel dataset (with same parameters for `group` and `repetition`). """ assert group in ["treatment", "control"] assert isinstance(ds, RTDCBase) assert isinstance(repetition, numbers.Integral) region = ds.config["setup"]["chip region"] # make sure there are no doublets for ii, dd in enumerate(self.data): if dd[1] == group and dd[2] == repetition and dd[3] == region: raise ValueError("A dataset with group '{}', ".format(group) + "repetition '{}', and ".format(repetition) + "'{}' region has already ".format(region) + "been added (index {})!".format(ii)) self.data.append([ds, group, repetition, region])
[docs] def check_data(self): """Perform sanity checks on ``self.data``""" # Check that we have enough data if len(self.data) < 3: msg = "Linear mixed effects models require repeated " \ + "measurements. Please add more repetitions." raise ValueError(msg)
[docs] def fit(self, model=None, feature=None): """Perform (generalized) linear mixed-effects model fit The response variable is modeled using two linear mixed effect models: - model: "feature ~ group + (1 + group | repetition)" (random intercept + random slope model) - the null model: "feature ~ (1 + group | repetition)" (without the fixed effect introduced by the "treatment" group). Both models are compared in R using "anova" (from the R-package "stats" :cite:`Everitt1992`) which performs a likelihood ratio test to obtain the p-Value for the significance of the fixed effect (treatment). If the input datasets contain data from the "reservoir" region, then the analysis is performed for the differential feature. Parameters ---------- model: str (optional) One of: - "lmer": linear mixed model using lme4's ``lmer`` - "glmer+loglink": generalized linear mixed model using lme4's ``glmer`` with an additional log-link function via ``family=Gamma(link='log'))`` :cite:`lme4` feature: str (optional) dclab feature for which to compute the model Returns ------- results: dict Dictionary with the results of the fitting process: - "anova p-value": Anova likelihood ratio test (significance) - "feature": name of the feature used for the analysis ``self.feature`` - "fixed effects intercept": Mean of ``self.feature`` for all controls; In the case of the "glmer+loglink" model, the intercept is already back transformed from log space. - "fixed effects treatment": The fixed effect size between the mean of the controls and the mean of the treatments relative to "fixed effects intercept"; In the case of the "glmer+loglink" model, the fixed effect is already back transformed from log space. - "fixed effects repetitions": The effects (intercept and treatment) for each repetition. The first axis defines intercept/treatment; the second axis enumerates the repetitions; thus the shape is (2, number of repetitions) and ``np.mean(results["fixed effects repetitions"], axis=1)`` is equivalent to the tuple (``results["fixed effects intercept"]``, ``results["fixed effects treatment"]``) for the "lmer" model. This does not hold for the "glmer+loglink" model, because of the non-linear inverse transform back from log space. - "is differential": Boolean indicating whether or not the analysis was performed for the differential (bootstrapped and subtracted reservoir from channel data) feature - "model": model name used for the analysis ``self.model`` - "model converged": boolean indicating whether the model converged - "r model summary": Summary of the model - "r model coefficients": Model coefficient table - "r script": the R script used - "r output": full output of the R script """ self.set_options(model=model, feature=feature) self.check_data() # Assemble dataset if self.is_differential(): # bootstrap and compute differential features using reservoir features, groups, repetitions = self.get_differential_dataset() else: # regular feature analysis features = [] groups = [] repetitions = [] for dd in self.data: features.append(self.get_feature_data(dd[1], dd[2])) groups.append(dd[1]) repetitions.append(dd[2]) # concatenate and populate arrays for R features_c = np.concatenate(features) groups_c = np.zeros(len(features_c), dtype=str) repetitions_c = np.zeros(len(features_c), dtype=int) pos = 0 for ii in range(len(features)): size = len(features[ii]) groups_c[pos:pos+size] = groups[ii][0] repetitions_c[pos:pos+size] = repetitions[ii] pos += size # Run R with the given template script rscript = importlib_resources.read_text("dclab.lme4", "lme4_template.R") _, script_path = tempfile.mkstemp(prefix="dclab_lme4_", suffix=".R", text=True) script_path = pathlib.Path(script_path) rscript = rscript.replace("<MODEL_NAME>", self.model) rscript = rscript.replace("<FEATURES>", arr2str(features_c)) rscript = rscript.replace("<REPETITIONS>", arr2str(repetitions_c)) rscript = rscript.replace("<GROUPS>", arr2str(groups_c)) script_path.write_text(rscript, encoding="utf-8") result = rsetup.run_command((rsetup.get_r_script_path(), script_path)) ret_dict = self.parse_result(result) ret_dict["is differential"] = self.is_differential() ret_dict["feature"] = self.feature ret_dict["r script"] = rscript ret_dict["r output"] = result assert ret_dict["model"] == self.model return ret_dict
[docs] def get_differential_dataset(self): """Return the differential dataset for channel/reservoir data The most famous use case is differential deformation. The idea is that you cannot tell what the difference in deformation from channel to reservoir, because you never measure the same object in the reservoir and the channel. You usually just have two distributions. Comparing distributions is possible via bootstrapping. And then, instead of running the lme4 analysis with the channel deformation data, it is run with the differential deformation (subtraction of the bootstrapped deformation distributions for channel and reservoir). """ features = [] groups = [] repetitions = [] # compute differential features for grp in sorted(set([dd[1] for dd in self.data])): # repetitions per groups grp_rep = sorted(set([dd[2] for dd in self.data if dd[1] == grp])) for rep in grp_rep: feat_cha = self.get_feature_data(grp, rep, region="channel") feat_res = self.get_feature_data(grp, rep, region="reservoir") bs_cha, bs_res = bootstrapped_median_distributions(feat_cha, feat_res) # differential feature features.append(bs_cha - bs_res) groups.append(grp) repetitions.append(rep) return features, groups, repetitions
[docs] def get_feature_data(self, group, repetition, region="channel"): """Return array containing feature data Parameters ---------- group: str Measurement group ("control" or "treatment") repetition: int Measurement repetition region: str Either "channel" or "reservoir" Returns ------- fdata: 1d ndarray Feature data (Nans and Infs removed) """ assert group in ["control", "treatment"] assert isinstance(repetition, numbers.Integral) assert region in ["reservoir", "channel"] for dd in self.data: if dd[1] == group and dd[2] == repetition and dd[3] == region: ds = dd[0] break else: raise ValueError("Dataset for group '{}', repetition".format(group) + " '{}', and region".format(repetition) + " '{}' not found!".format(region)) fdata = ds[self.feature][ds.filter.all] fdata_valid = fdata[~np.logical_or(np.isnan(fdata), np.isinf(fdata))] return fdata_valid
[docs] def is_differential(self): """Return True if the differential feature is computed for analysis This effectively just checks the regions of the datasets and returns True if any one of the regions is "reservoir". See Also -------- get_differential_features: for an explanation """ for dd in self.data: if dd[3] == "reservoir": return True else: return False
[docs] def parse_result(self, result): resd = result.split("OUTPUT") ret_dict = {} for item in resd: string = item.split("#*#")[0] key, value = string.split(":", 1) key = key.strip() value = value.strip().replace("\n\n", "\n") if key == "fixed effects repetitions": rows = value.split("\n")[1:] reps = [] for row in rows: reps.append([float(vv) for vv in row.split()[1:]]) value = np.array(reps).transpose() elif key == "model converged": value = value == "TRUE" elif value == "NA": value = np.nan else: try: value = float(value) except ValueError: pass ret_dict[key] = value return ret_dict
[docs] def set_options(self, model=None, feature=None): """Set analysis options""" if model is not None: assert model in ["lmer", "glmer+loglink"] self.model = model if feature is not None: assert dfn.scalar_feature_exists(feature) self.feature = feature
[docs] def arr2str(a): """Convert an array to a string""" if isinstance(a.dtype.type, np.integer): return ",".join(str(dd) for dd in a.tolist()) elif a.dtype.type == np.str_: return ",".join(f"'{dd}'" for dd in a.tolist()) else: return ",".join(f"{dd:.16g}" for dd in a.tolist())
[docs] def bootstrapped_median_distributions(a, b, bs_iter=1000, rs=117): """Compute the bootstrapped distributions for two arrays. Parameters ---------- a, b: 1d ndarray of length N Input data bs_iter: int Number of bootstrapping iterations to perform (output size). rs: int Random state seed for random number generator Returns ------- median_dist_a, median_dist_b: 1d arrays of length bs_iter Boostrap distribution of medians for ``a`` and ``b``. See Also -------- `<https://en.wikipedia.org/wiki/Bootstrapping_(statistics)>`_ Notes ----- From a programmatic point of view, it would have been better to implement this method for just one input array (because of redundant code). However, due to historical reasons (testing and comparability to Shape-Out 1), bootstrapping is done interleaved for the two arrays. """ # Seed random numbers that are reproducible on different machines prng_object = np.random.RandomState(rs) # Initialize median arrays median_a = np.zeros(bs_iter) median_b = np.zeros(bs_iter) # If this loop is still too slow, we could get rid of it and # do everything with arrays. Depends on whether we will # eventually run into memory problems with array sizes # of y*bs_iter and yR*bs_iter. lena = len(a) lenb = len(b) for q in range(bs_iter): # Compute random indices and draw from a, b draw_a_idx = prng_object.randint(0, lena, lena) median_a[q] = np.median(a[draw_a_idx]) draw_b_idx = prng_object.randint(0, lenb, lenb) median_b[q] = np.median(b[draw_b_idx]) return median_a, median_b