Examples
Stream live signals
from time import sleep
from radiens import AllegoClient
from radiens.utils.enums import SignalType
def main():
ac = AllegoClient()
# this script assums you're already streaming
# if you wanted to start streaming within this script, uncomment this line:
# ac.set_streaming('on')
# duration of each signal chunk during streaming
# divide by 1000 to convert to seconds
loop_dur = ac.get_stream_loop_dur_ms()/1000
# channel_metadata describes all the channel metadata
chan_meta = ac.get_channel_metadata()
# set time to primary cache head (live signals)
ac.set_time_to_cache_head()
# wait for cache to build up before starting aquisition loop
sleep(loop_dur)
# aquisition loop
while True:
# gets all signals up to primary cache head since last call (or since last set_time_range_to_head())
sigs, time_range = ac.get_signals()
# do somthing
# wait for cache to build up before getting more signals
sleep(loop_dur)
if __name__ == '__main__':
main()
Stream and write signal metrics
"""
Sample signal metrics from the Radiens API and save results to CSV.
"""
from time import sleep
import pandas as pd
from radiens import AllegoClient
from radiens.utils.enums import MetricID, MetricMode, MetricName
def main():
# initialize AllegoClient to interact with Radiens API
client = AllegoClient()
# use the client to get a signal metrics client
metrics_client = client.signal_metrics()
# define the metrics to be sampled from the Radiens API
desired_metrics = [
MetricID(MetricMode.BASE, MetricName.NOISE_UV),
MetricID(MetricMode.BASE, MetricName.EVENT_MAX_MIN_DIFF_ABS),
MetricID(MetricMode.BASE, MetricName.MAX_ABS)
]
# define the number of samples to collect and the sample window length in seconds
num_samples = 5
sample_window_sec = 1
# initialize dictionary to hold the sampled metric results
results = {metric.name.name.lower(): [] for metric in desired_metrics}
# initialize list to hold the timestamps of each sample
sample_times = []
# start a loop to collect the defined number of samples
for _ in range(num_samples):
# get the latest metrics
metrics = metrics_client.get_metrics(
sample_window_sec,
metrics=desired_metrics,
tail=True # option to get only the last packet of each sample window
)
# record results and timestamp
for (i, metric_id) in enumerate(metrics.metric_ids):
results[metric_id.name.name.lower()].append(metrics.val[0, :, i])
sample_times.append(metrics.time_range[0])
# pause for the length of the sample window before collecting the next sample
sleep(sample_window_sec)
# save each metric's samples to a separate CSV file
for metric_id, samples in results.items():
# create a DataFrame from the samples
df = pd.DataFrame(samples)
# set the DataFrame's index to be the sample_times and its columns to be the native indexes from the metrics
df.index = pd.Index(sample_times, name='time')
df.columns = pd.Index(metrics.ntv_idxs, name='ntv_idx')
# save to csv file
df.to_csv(f"stream_{num_samples*sample_window_sec}sec_{metric_id}.csv")
if __name__ == '__main__':
main()
Closed loop stimulation (X-Series)
from time import sleep
from radiens import AllegoClient
# before running this, set the system mode to "XDAQ One with XSR Headstage"
# the corresponding python command would be allego.restart('xdaq-one-stim')
# integer in the range [1,8] (inclusive); arbitrary but must be specified and consistent
TRIGGER_SOURCE = 1
def main():
allego = AllegoClient()
# make sure streaming is off
allego.set_streaming('off')
# channels that you want the stimulation on
stim_chans_sys_idxs = [0, 1]
# set some stim params
for sys_idx in stim_chans_sys_idxs:
allego.set_stim_params({
'stim_sys_chan_idx': sys_idx,
'enabled': True, # default is False
'first_phase_amplitude_uA': 10, # default is 0
'first_phase_duration_us': 200, # default is 100
'trigger_source_idx': TRIGGER_SOURCE,
'trigger_source_is_keypress': True, # default is True
})
# set streaming AFTER modifying params (cannot be modified while streaming - this will throw an exception)
allego.set_streaming('on')
# repeated stimulation on stim_chans_sys_idxs every 1 sec
while True:
# must subtract 1 for this call
allego.set_manual_stim_trigger(TRIGGER_SOURCE-1)
sleep(1)
if __name__ == '__main__':
main()
Get signals from a data set
from radiens import VidereClient
dpath = "~/radix/data/"
base_name = "DATASET_NAME"
def main():
# create a VidereClient object
vc = VidereClient()
# meta is returned by link_data_file
meta = vc.link_data_file(dpath+base_name)
# get signals
sigs = vc.signals().get_signals(
dataset_metadata=meta,
# time_range=[10, 20], # time range (sec) optional
)
# sigs.time_range contains the returned time range
print(sigs.time_range)
# sigs.signals contains the raw signals
print(sigs.signals.amp) # amplifier channels
# print(sigs.signals.gpio_ain) # analog input channels
# print(sigs.signals.gpio_din) # digital input channels
# print(sigs.signals.gpio_dout) # digital output channels
# sigs.channel_metadata contains the channel mapping info
print(sigs.channel_metadata)
if __name__ == "__main__":
main()
Get spikes from a data set
from pathlib import Path
import numpy as np
from radiens import VidereClient
from radiens.utils.enums import SignalType
from radiens.utils.util import dset_to_ntv_dict
from scipy.io import savemat
dpath = Path("~/radix/data/").expanduser().resolve()
base_name = "allego_0__uid1108-14-55-23"
spikes_base_name = "allego_0__uid1108-14-55-23_s0"
spikes_save_name = "allego_0__uid1108-14-55-23_s0.mat"
def main():
# create a VidereClient object
vc = VidereClient()
# link client to continuous data
meta = vc.link_data_file(dpath/base_name, link_spikes=True)
# get channel mapping
chan_idxs = meta.channel_metadata.index(SignalType.AMP)
dset_to_ntv = dset_to_ntv_dict(chan_idxs)
# get spikes
spike_data = vc.spikes().get_spike_timestamps(
spikes_base_name,
# time_range=[0, 10], # optional
)
# convert timestamps to seconds
ts_sec = spike_data["timestamps"] / meta.TR.fs
# labels
labels = spike_data["labels"]
# ntv idxs (for each spike)
dset_idxs = spike_data["dset_idxs"]
ntv_idxs = np.array([dset_to_ntv[dset_idx] for dset_idx in dset_idxs])
print(f'Saving {len(ts_sec)} spikes to {dpath/spikes_save_name}')
# save to .mat file (oned_as="column" means to save data as column vectors)
savemat(dpath/spikes_save_name, {
"timestamps": ts_sec,
"labels": labels,
"ntv_idxs": ntv_idxs,
}, oned_as="column")
if __name__ == "__main__":
main()