Examples

Stream live signals

from time import sleep

from radiens import AllegoClient
from radiens.utils.enums import SignalType


def main():
    ac = AllegoClient()

    # this script assums you're already streaming
    # if you wanted to start streaming within this script, uncomment this line:
    # ac.set_streaming('on')

    # duration of each signal chunk during streaming
    # divide by 1000 to convert to seconds
    loop_dur = ac.get_stream_loop_dur_ms()/1000

    # channel_metadata describes all the channel metadata
    chan_meta = ac.get_channel_metadata()

    # set time to primary cache head (live signals)
    ac.set_time_to_cache_head()

    # wait for cache to build up before starting aquisition loop
    sleep(loop_dur)

    # aquisition loop
    while True:
        # gets all signals up to primary cache head since last call (or since last set_time_range_to_head())
        sigs, time_range = ac.get_signals()

        # do somthing

        # wait for cache to build up before getting more signals
        sleep(loop_dur)


if __name__ == '__main__':
    main()

Stream and write signal metrics

"""
Sample signal metrics from the Radiens API and save results to CSV.
"""

from time import sleep

import pandas as pd
from radiens import AllegoClient
from radiens.utils.enums import MetricID, MetricMode, MetricName


def main():
    # initialize AllegoClient to interact with Radiens API
    client = AllegoClient()

    # use the client to get a signal metrics client
    metrics_client = client.signal_metrics()

    # define the metrics to be sampled from the Radiens API
    desired_metrics = [
        MetricID(MetricMode.BASE, MetricName.NOISE_UV),
        MetricID(MetricMode.BASE, MetricName.EVENT_MAX_MIN_DIFF_ABS),
        MetricID(MetricMode.BASE, MetricName.MAX_ABS)
    ]

    # define the number of samples to collect and the sample window length in seconds
    num_samples = 5
    sample_window_sec = 1

    # initialize dictionary to hold the sampled metric results
    results = {metric.name.name.lower(): [] for metric in desired_metrics}

    # initialize list to hold the timestamps of each sample
    sample_times = []

    # start a loop to collect the defined number of samples
    for _ in range(num_samples):

        # get the latest metrics
        metrics = metrics_client.get_metrics(
            sample_window_sec,
            metrics=desired_metrics,
            tail=True  # option to get only the last packet of each sample window
        )

        # record results and timestamp
        for (i, metric_id) in enumerate(metrics.metric_ids):
            results[metric_id.name.name.lower()].append(metrics.val[0, :, i])
        sample_times.append(metrics.time_range[0])

        # pause for the length of the sample window before collecting the next sample
        sleep(sample_window_sec)

    # save each metric's samples to a separate CSV file
    for metric_id, samples in results.items():

        # create a DataFrame from the samples
        df = pd.DataFrame(samples)

        # set the DataFrame's index to be the sample_times and its columns to be the native indexes from the metrics
        df.index = pd.Index(sample_times, name='time')
        df.columns = pd.Index(metrics.ntv_idxs, name='ntv_idx')

        # save to csv file
        df.to_csv(f"stream_{num_samples*sample_window_sec}sec_{metric_id}.csv")


if __name__ == '__main__':
    main()

Closed loop stimulation (X-Series)

from time import sleep

from radiens import AllegoClient

# before running this, set the system mode to "XDAQ One with XSR Headstage"
# the corresponding python command would be allego.restart('xdaq-one-stim')

# integer in the range [1,8] (inclusive); arbitrary but must be specified and consistent
TRIGGER_SOURCE = 1


def main():
    allego = AllegoClient()

    # make sure streaming is off
    allego.set_streaming('off')

    # channels that you want the stimulation on
    stim_chans_sys_idxs = [0, 1]

    # set some stim params
    for sys_idx in stim_chans_sys_idxs:
        allego.set_stim_params({
            'stim_sys_chan_idx': sys_idx,
            'enabled': True,  # default is False
            'first_phase_amplitude_uA': 10,  # default is 0
            'first_phase_duration_us': 200,  # default is 100
            'trigger_source_idx': TRIGGER_SOURCE,
            'trigger_source_is_keypress': True,  # default is True
        })

    # set streaming AFTER modifying params (cannot be modified while streaming - this will throw an exception)
    allego.set_streaming('on')

    # repeated stimulation on stim_chans_sys_idxs every 1 sec
    while True:
        # must subtract 1 for this call
        allego.set_manual_stim_trigger(TRIGGER_SOURCE-1)
        sleep(1)


if __name__ == '__main__':
    main()

Get signals from a data set


from radiens import VidereClient

dpath = "~/radix/data/"
base_name = "DATASET_NAME"


def main():
    # create a VidereClient object
    vc = VidereClient()

    # meta is returned by link_data_file
    meta = vc.link_data_file(dpath+base_name)

    # get signals
    sigs = vc.signals().get_signals(
        dataset_metadata=meta,
        # time_range=[10, 20],  # time range (sec) optional
    )

    # sigs.time_range contains the returned time range
    print(sigs.time_range)

    # sigs.signals contains the raw signals
    print(sigs.signals.amp)  # amplifier channels
    # print(sigs.signals.gpio_ain)  # analog input channels
    # print(sigs.signals.gpio_din)  # digital input channels
    # print(sigs.signals.gpio_dout)  # digital output channels

    # sigs.channel_metadata contains the channel mapping info
    print(sigs.channel_metadata)


if __name__ == "__main__":
    main()

Get spikes from a data set


from pathlib import Path

import numpy as np
from radiens import VidereClient
from radiens.utils.enums import SignalType
from radiens.utils.util import dset_to_ntv_dict
from scipy.io import savemat

dpath = Path("~/radix/data/").expanduser().resolve()
base_name = "allego_0__uid1108-14-55-23"
spikes_base_name = "allego_0__uid1108-14-55-23_s0"
spikes_save_name = "allego_0__uid1108-14-55-23_s0.mat"


def main():
    # create a VidereClient object
    vc = VidereClient()

    # link client to continuous data
    meta = vc.link_data_file(dpath/base_name, link_spikes=True)

    # get channel mapping
    chan_idxs = meta.channel_metadata.index(SignalType.AMP)
    dset_to_ntv = dset_to_ntv_dict(chan_idxs)

    # get spikes
    spike_data = vc.spikes().get_spike_timestamps(
        spikes_base_name,
        # time_range=[0, 10],  # optional
    )

    # convert timestamps to seconds
    ts_sec = spike_data["timestamps"] / meta.TR.fs

    # labels
    labels = spike_data["labels"]

    # ntv idxs (for each spike)
    dset_idxs = spike_data["dset_idxs"]
    ntv_idxs = np.array([dset_to_ntv[dset_idx] for dset_idx in dset_idxs])

    print(f'Saving {len(ts_sec)} spikes to {dpath/spikes_save_name}')

    # save to .mat file (oned_as="column" means to save data as column vectors)
    savemat(dpath/spikes_save_name, {
            "timestamps": ts_sec,
            "labels": labels,
            "ntv_idxs": ntv_idxs,
            }, oned_as="column")


if __name__ == "__main__":
    main()