-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpipe.go
143 lines (121 loc) · 3.72 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package main
import (
"bytes"
"errors"
"fmt"
"io"
"strconv"
"strings"
)
func pipe(from io.Reader, to io.Writer) error {
buf := make([]byte, DefaultBufferSize)
for {
n, err := from.Read(buf)
if err != nil {
if errors.Is(err, io.EOF) {
break
} else {
return fmt.Errorf("failed to read from reader: %w", err)
}
}
if _, err := to.Write(buf[:n]); err != nil {
return fmt.Errorf("failed to write to writer: %w", err)
}
}
return nil
}
func procWindowChangeEvent(eventPayload string, windowResize func(h int, w int) error) error {
splits := strings.Split(eventPayload, ";")
if len(splits) != 2 {
return fmt.Errorf("invalid event payload: %s", eventPayload)
}
rows, err := strconv.Atoi(splits[0])
if err != nil {
return fmt.Errorf("failed to parse rows: %w", err)
}
cols, err := strconv.Atoi(splits[1])
if err != nil {
return fmt.Errorf("failed to parse cols: %w", err)
}
// Send new rows & cols to server
err = windowResize(rows, cols)
if err != nil {
return fmt.Errorf("failed to set window size: %w", err)
}
// No error
return nil
}
func inPipe(from io.Reader, to io.Writer, windowResize func(h int, w int) error) error {
inBuf := make([]byte, DefaultBufferSize)
for {
// Receive data
n, err := from.Read(inBuf)
if err != nil {
if errors.Is(err, io.EOF) {
break
} else {
return fmt.Errorf("failed to read from stdin: %w", err)
}
}
dataBuf := inBuf[:n]
for { // Process all events in a single input event (latest one overwrites all before)
// Check if event exists
dataLen := len(dataBuf)
eventStartIndex := bytes.Index(dataBuf, EscapeWindowChangePrefix)
if eventStartIndex == -1 {
// No event, just pipe normally
_, err = to.Write(dataBuf[:dataLen])
if err != nil {
return fmt.Errorf("failed to pipe to stdin: %w", err)
}
break // Proceed to next loop
} else if eventStartIndex > 0 { // 0 means nothing to send
// Send bytes before event
_, err = to.Write(dataBuf[:eventStartIndex])
if err != nil {
return fmt.Errorf("failed to pipe to stdin: %w", err)
}
}
// Match suffix
eventEndIndex := eventStartIndex + len(EscapeWindowChangePrefix)
// else: prefix all match, extract all bytes into event buffer till match suffix
for ; eventEndIndex < dataLen; eventEndIndex++ {
if dataBuf[eventEndIndex] == EscapeWindowChangeSuffix {
break
}
}
if eventEndIndex >= dataLen {
// Incomplete event, just pipe normally
_, err = to.Write(dataBuf[eventStartIndex:])
if err != nil {
return fmt.Errorf("failed to pipe to stdin: %w", err)
}
break // Proceed to next loop
}
// else: event all extracted! time to analyse
// Maybe contain some incomplete event fragments, so go reversely to find if any prefix match
eventPayload := dataBuf[eventStartIndex:eventEndIndex] // discard suffix
eventPrefixLastIndex := bytes.LastIndex(eventPayload, EscapeWindowChangePrefix)
if eventPrefixLastIndex > 0 {
// Include invalid events
// Send invalid events as raw content
_, err = to.Write(eventPayload[:eventPrefixLastIndex])
if err != nil {
return fmt.Errorf("failed to pipe to stdin: %w", err)
}
}
// Process event
if err = procWindowChangeEvent(string(eventPayload[eventPrefixLastIndex+len(EscapeWindowChangePrefix):]), windowResize); err != nil {
// Something is wrong, we can't handle this event, so send without processing
_, err = to.Write(dataBuf[eventStartIndex+eventPrefixLastIndex:]) // Send everything
if err != nil {
return fmt.Errorf("failed to proc window change event: %w", err)
}
break // Proceed to next loop
}
// Continue to process remain bytes
dataBuf = dataBuf[eventEndIndex+1:]
}
}
return nil
}