Example use cases

Streaming live signals

from time import sleep

from radiens import AllegoClient
from radiens.utils.enums import SignalType


def main():
    allego = AllegoClient()

    # this script assums you're already streaming
    # if you wanted to start streaming within this script, uncomment this line:
    # allego.set_streaming('on')

    # duration of each signal chunk during streaming
    # divide by 1000 to convert to seconds
    loop_dur = allego.get_stream_loop_dur_ms()/1000

    # channel_metadata describes all the channel metadata
    chan_meta = allego.get_channel_metadata()

    # set time to primary cache head (live signals)
    allego.set_time_to_cache_head()

    # wait for cache to build up before starting aquisition loop
    sleep(loop_dur)

    # aquisition loop
    while True:
        # gets all signals up to primary cache head since last call (or since last set_time_range_to_head())
        sigs, time_range = allego.get_signals()

        # do somthing

        # wait for cache to build up before getting more signals
        sleep(loop_dur)


if __name__ == '__main__':
    main()

Closed loop stimulation (X-Series)

from time import sleep

from radiens import AllegoClient

# before running this, set the system mode to "XDAQ One with XSR Headstage"
# the corresponding python command would be allego.restart('xdaq-one-stim')

# integer in the range [1,8] (inclusive); arbitrary but must be specified and consistent
TRIGGER_SOURCE = 1


def main():
    allego = AllegoClient()

    # make sure streaming is off
    allego.set_streaming('off')

    # channels that you want the stimulation on
    stim_chans_sys_idxs = [0, 1]

    # set some stim params
    for sys_idx in stim_chans_sys_idxs:
        allego.set_stim_params({
            'stim_sys_chan_idx': sys_idx,
            'enabled': True,  # default is False
            'first_phase_amplitude_uA': 10,  # default is 0
            'first_phase_duration_us': 200,  # default is 100
            'trigger_source_idx': TRIGGER_SOURCE,
            'trigger_source_is_keypress': True,  # default is True
        })

    # set streaming AFTER modifying params (cannot be modified while streaming - this will throw an exception)
    allego.set_streaming('on')

    # repeated stimulation on stim_chans_sys_idxs every 1 sec
    while True:
        # must subtract 1 for this call
        allego.set_manual_stim_trigger(TRIGGER_SOURCE-1)
        sleep(1)


if __name__ == '__main__':
    main()

Streaming signal metrics

"""
Sample signal metrics from the Radiens API and save results to CSV.
"""

from time import sleep

import pandas as pd
from radiens import AllegoClient
from radiens.utils.enums import MetricID, MetricMode, MetricName

# initialize AllegoClient to interact with Radiens API
client = AllegoClient()

# use the client to get a signal metrics client
metrics_client = client.signal_metrics()

# define the metrics to be sampled from the Radiens API
desired_metrics = [
    MetricID(MetricMode.BASE, MetricName.NOISE_UV),
    MetricID(MetricMode.BASE, MetricName.EVENT_MAX_MIN_DIFF_ABS),
    MetricID(MetricMode.BASE, MetricName.MAX_ABS)
]

# define the number of samples to collect and the sample window length in seconds
num_samples = 5
sample_window_sec = 1

# initialize dictionary to hold the sampled metric results
results = {metric.name.name.lower(): [] for metric in desired_metrics}

# initialize list to hold the timestamps of each sample
sample_times = []

# start a loop to collect the defined number of samples
for _ in range(num_samples):

    # get the latest metrics
    metrics = metrics_client.get_metrics(
        sample_window_sec,
        metrics=desired_metrics,
        tail=True  # option to get only the last packet of each sample window
    )

    # record results and timestamp
    for (i, metric_id) in enumerate(metrics.metric_ids):
        results[metric_id.name.name.lower()].append(metrics.val[0, :, i])
    sample_times.append(metrics.time_range[0])

    # pause for the length of the sample window before collecting the next sample
    sleep(sample_window_sec)

# save each metric's samples to a separate CSV file
for metric_id, samples in results.items():

    # create a DataFrame from the samples
    df = pd.DataFrame(samples)

    # set the DataFrame's index to be the sample_times and its columns to be the native indexes from the metrics
    df.index = pd.Index(sample_times, name='time')
    df.columns = pd.Index(metrics.ntv_idxs, name='ntv_idx')

    # save to csv file
    df.to_csv(f"stream_{num_samples*sample_window_sec}sec_{metric_id}.csv")