-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathbuffer.go
More file actions
113 lines (93 loc) · 2.73 KB
/
buffer.go
File metadata and controls
113 lines (93 loc) · 2.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package py
// #include "utils.h"
import "C"
import (
"unsafe"
)
type BufferProtocol interface {
Object
AsBufferMethods() *BufferMethods
}
// AsBuffer returns a BufferMethods instance that refers to the same
// underlying Python object as obj. If obj doesn't implement the "Buffer
// Protocol", then nil is returned.
//
// This method is more complete than the BufferProtocol interface, as it will
// also work with unknown or dynamic types that implement the "Buffer
// Protocol".
func AsBufferMethods(obj Object) *BufferMethods {
if n, ok := obj.(BufferProtocol); ok {
return n.AsBufferMethods()
}
if C.PyObject_CheckBuffer(c(obj)) > 0 {
return (*BufferMethods)(unsafe.Pointer(obj.Base()))
}
return nil
}
func (b *BufferMethods) GetBuffer(flags BufferFlags) (*Buffer, error) {
buf := newBuffer()
ret := C.PyObject_GetBuffer(c(b), buf.c(), C.int(flags))
if ret < 0 {
return nil, exception()
}
return buf, nil
}
func GetBuffer(obj Object, flags BufferFlags) (*Buffer, error) {
bm := AsBufferMethods(obj)
if bm == nil {
return nil, TypeError.Err("%s does implement Buffer Protocol", obj.Type())
}
return bm.GetBuffer(flags)
}
type BufferFlags int
// TODO(jp3): buffer flags
type BufferOrder byte
const (
BufferOrderC BufferOrder = 'C'
BufferOrderFortran BufferOrder = 'F'
BufferOrderAny BufferOrder = 'A'
)
type Buffer struct {
buf C.Py_buffer
}
func newBuffer() *Buffer {
return &Buffer{}
}
func (b *Buffer) c() *C.Py_buffer {
return &b.buf
}
func (b *Buffer) Release() {
C.PyBuffer_Release(&b.buf)
}
func (b *Buffer) IsContiguous(order BufferOrder) bool {
ret := C.PyBuffer_IsContiguous(b.c(), C.char(order))
return ret == 1
}
func (b *Buffer) GetPointer(indicies ...int) (*byte, error) {
if len(indicies) != int(b.buf.ndim) {
return nil, ValueError.Err("wrong number of indicies: %d (wanted %d)", len(indicies), b.buf.ndim)
}
ind := make([]C.Py_ssize_t, len(indicies))
for i, index := range indicies {
ind[i] = C.Py_ssize_t(index)
}
ret := C.PyBuffer_GetPointer(b.c(), &ind[0])
return (*byte)(ret), nil
}
func (b *Buffer) FromContinguous(buf []byte, order BufferOrder) error {
ret := C.PyBuffer_FromContiguous(b.c(), unsafe.Pointer(&buf[0]), C.Py_ssize_t(len(buf)), C.char(order))
return int2Err(ret)
}
func (b *Buffer) ToContinguous(buf []byte, order BufferOrder) error {
ret := C.PyBuffer_ToContiguous(unsafe.Pointer(&buf[0]), b.c(), C.Py_ssize_t(len(buf)), C.char(order))
return int2Err(ret)
}
// TODO(jp3): fill contiguous strides
func (b *Buffer) FillInfo(exporter Object, buf []byte, readonly bool, flags BufferFlags) error {
ro := C.int(0)
if readonly {
ro = C.int(1)
}
ret := C.PyBuffer_FillInfo(b.c(), c(exporter), unsafe.Pointer(&buf[0]), C.Py_ssize_t(len(buf)), ro, C.int(flags))
return int2Err(ret)
}