Skip to content
88 changes: 48 additions & 40 deletions element_interface/intan_loader.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import numpy as np
import os, sys, struct
from pathlib import Path
import matplotlib.pyplot as plt


def read_qstring(fid):
def _read_qstring(fid):
Comment thread
tdincer marked this conversation as resolved.
"""Read Qt style QString.

The first 32-bit unsigned number indicates the length of the string (in bytes).
Expand Down Expand Up @@ -37,7 +36,7 @@ def read_qstring(fid):
return a


def read_header(fid):
def _read_header(fid):
"""Reads the Intan File Format header from the given file."""

# Check 'magic number' at beginning of file to make sure this is an Intan
Expand All @@ -53,14 +52,6 @@ def read_header(fid):
(version["major"], version["minor"]) = struct.unpack("<hh", fid.read(4))
header["version"] = version

print("")
print(
"Reading Intan Technologies RHS2000 Data File, Version {}.{}".format(
version["major"], version["minor"]
)
)
print("")

# Read information of sampling rate and amplifier frequency settings.
(header["sample_rate"],) = struct.unpack("<f", fid.read(4))
(
Expand Down Expand Up @@ -129,16 +120,16 @@ def read_header(fid):
header["recovery_target_voltage"],
) = struct.unpack("fff", fid.read(12))

note1 = read_qstring(fid)
note2 = read_qstring(fid)
note3 = read_qstring(fid)
note1 = _read_qstring(fid)
note2 = _read_qstring(fid)
note3 = _read_qstring(fid)
header["notes"] = {"note1": note1, "note2": note2, "note3": note3}

(header["dc_amplifier_data_saved"], header["eval_board_mode"]) = struct.unpack(
"<hh", fid.read(4)
)

header["ref_channel_name"] = read_qstring(fid)
header["ref_channel_name"] = _read_qstring(fid)

# Create structure arrays for each type of data channel.
header["spike_triggers"] = []
Expand All @@ -150,11 +141,11 @@ def read_header(fid):

# Read signal summary from data file header.
(number_of_signal_groups,) = struct.unpack("<h", fid.read(2))
print("n signal groups {}".format(number_of_signal_groups))
# print("n signal groups {}".format(number_of_signal_groups))

for signal_group in range(1, number_of_signal_groups + 1):
signal_group_name = read_qstring(fid)
signal_group_prefix = read_qstring(fid)
signal_group_name = _read_qstring(fid)
signal_group_prefix = _read_qstring(fid)
(
signal_group_enabled,
signal_group_num_channels,
Expand All @@ -168,8 +159,8 @@ def read_header(fid):
"port_prefix": signal_group_prefix,
"port_number": signal_group,
}
new_channel["native_channel_name"] = read_qstring(fid)
new_channel["custom_channel_name"] = read_qstring(fid)
new_channel["native_channel_name"] = _read_qstring(fid)
new_channel["custom_channel_name"] = _read_qstring(fid)
(
new_channel["native_order"],
new_channel["custom_order"],
Expand Down Expand Up @@ -224,22 +215,23 @@ def read_header(fid):
return header


def load_rhs(folder: str, file_expr: str):
def load_rhs(folder: str, file_expr: str = "*"):
"""Load rhs data

Example:
# Read data
>>> rhs_data = load_rhs("/home/inbox/organoids21/032520_US_885kHz_sham", file_expr="amp*dat")

# Plot data
>>> import matplotlib.pyplot as plt
>>> plt.plot(rhs_data["time"], rhs_data["recordings"]["amp-B-000.dat"])
>>> plt.xlabel("Time (s)")
>>> plt.ylabel("Reading")
>>> plt.show()

Args:
folder (str): Folder that contains info.rhs, time.dat, and *.dat files
file_expr (str): regex pattern of the file names to be read.
file_expr (str): pattern matching of file names to be read. Defaults to "*" (read all files).

Returns:
rhs_data (dict): RHS data.
Expand All @@ -248,40 +240,56 @@ def load_rhs(folder: str, file_expr: str):
rhs_data["timestamps"] (np.array_like): Relative timestamps in seconds.
"""

rhs_data = {}

# Get header
header_filepath = next(Path(folder).glob("info.rhs"))
with open(header_filepath, "rb") as fid:
header = read_header(fid)
rhs_data["header"] = _read_header(fid)

# Get timestamps
time_file = next(Path(folder).glob("time.dat"))

timestamps = (
rhs_data["timestamps"] = (
np.memmap(time_file, dtype=np.int32)
/ header["frequency_parameters"]["amplifier_sample_rate"]
/ rhs_data["header"]["frequency_parameters"]["amplifier_sample_rate"]
)

rhs_data = dict(header=header, timestamps=timestamps, recordings={})

# Get data files
file_paths = Path(folder).glob(file_expr)
file_paths = [x for x in file_paths if x.as_posix() != "time.dat"]

for file_path in file_paths:
file_path = file_path.as_posix()
if "amp" in file_path:
exclude_list = ["time", "info", "Zone.Identifier"]

file_paths = [
file
for file in file_paths
if not any(string in file.as_posix() for string in exclude_list)
]

# Get recording data
rhs_data["recordings"] = {}

for file_path in sorted(file_paths):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JaerongA This for loop should be run with the condition of if len(file_paths):. It is to support the file_expr keyword. For instance if file_expr="time.dat", this code will fail because file_paths will be an empty list. Let's fix this and merge the PR.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdincer I thought file_expre was intended for selecting data channels only. I've made some revisions to fix that but right now, timestamps and info will always be read by default. Please feel free to modify if that wasn't your intention.

signal_type = file_path.stem.split("-")[0]

if signal_type == "amp":
signal = np.memmap(file_path, dtype=np.int16)
signal = signal * 0.195 # Convert to microvolts
elif "board-ANALOG-IN" in file_path or "board-ANALOG-OUT" in file_path:

elif signal_type == "board":
signal = np.memmap(file_path, dtype=np.uint16)
signal = (signal - 32768) * 0.0003125 # Convert to volts
elif "dc-" in file_path:

elif signal_type == "dc":
signal = np.memmap(file_path, dtype=np.uint16)
signal = (signal - 512) * 19.23 # Convert to milivolts
elif "board-DIGITAL-IN" in file_path or "board-DIGITAL-OUT" in file_path:

elif signal_type == "stim":
signal = np.memmap(file_path, dtype=np.uint16)
elif "stim-" in file_path:
data = np.memmap(file_path, dtype=np.uint16)
i = np.bitwise_and(data, 255) * header["stim_step_size"]
sign = (128 - np.bitwise_and(data, 255)) / 128
signal = i * sign
rhs_data["recordings"][Path(file_path).relative_to(folder).stem] = signal
# convert the signal from 9-bit one's complement to standard encoding
current = np.bitwise_and(signal, 255) * rhs_data["header"]["stim_step_size"]
sign = 1 - np.bitwise_and(signal, 256) // 128
signal = current * sign

rhs_data["recordings"][file_path.stem] = signal
return rhs_data