可以通过不转换为COO格式并直接在矩阵上遍历值来获取每行的内容,这样做通常更快,原因有三个。
- 首先,将CSR格式转换为COO格式会比不做任何转换更慢。
- 其次,在COO循环中,为了识别每行的起始和结束位置,需要比较每一行坐标与前一行坐标的差别。相反,CSR格式本身就提供了这些信息。
- 第三,从数组中切片可以让我们无需实际复制数据就能获得代表该数据的数组。
以下函数利用indptr属性找到每行开始和结束的位置,从而对每行的值和索引调用指定的函数:
def process_matrix_rows(matrix, func):
rows = matrix.shape[0]
for index in range(rows):
indptr_start = matrix.indptr[index]
indptr_end = matrix.indptr[index + 1]
values = matrix.data[indptr_start:indptr_end]
indices = matrix.indices[indptr_start:indptr_end]
func(indices, values)
请注意行为上的一个差异:如果矩阵某行没有元素,此实现会使用空的indices和values数组调用函数,而另一种实现则会跳过没有任何非零值的行。
下面是一个比较两种实现的基准测试脚本。它基于以下假设:
- 矩阵是10,000×5,000大小的,以CSR格式存储,并且由密度为1%的随机值构成。
- COO方法中包含了转换到COO格式的时间成本。
- 每个函数必须找到每行的值和列索引,并调用提供的函数。
代码如下:
import scipy.sparse
matrix = scipy.sparse.random(10000, 5000, format='csr', density=0.01)
# 定义一个空操作函数
def donothing(*args):
pass
# 初始方法(使用getrow)
def get_matrix_original(matrix, func):
for index in range(matrix.shape[0]):
row = matrix.getrow(index)
indices = row.indices
values = row.data
func(indices, values)
# 基于COO的方法
def get_matrix_rows_coo(matrix, func):
coo_matrix = matrix.tocoo()
old_i = None
indices = []
values = []
for i, j, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data):
if i != old_i:
if old_i is not None:
func(indices, values)
indices = [j]
values = [v]
else:
indices.append(j)
values.append(v)
old_i = i
# 处理最后一组数据
if indices and values:
func(indices, values)
# 直接使用CSR格式的方法
def get_matrix_rows_njo(matrix, func):
process_matrix_rows(matrix, func) # 使用上面定义的优化函数
%timeit get_matrix_original(matrix, donothing)
%timeit get_matrix_rows_coo(matrix, donothing)
%timeit get_matrix_rows_njo(matrix, donothing)
基准测试结果:
.getrow() 方法:
634 ms ± 16.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
基于COO的迭代方法:
270 ms ± 4.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
直接使用CSR格式的方法:
12.4 ms ± 112 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
需要注意的是,在足够低的密度下(大约约为0.05%的非零值时),COO方法可能比CSR方法更快,因为对于空行,COO方法无需执行任何操作。