Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 83 additions & 5 deletions go/bind/keybase.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
Expand All @@ -16,6 +17,7 @@ import (
"runtime/trace"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/keybase/client/go/chat/globals"
Expand Down Expand Up @@ -59,6 +61,63 @@ var (
connMutex sync.Mutex // Protects conn operations
)

var (
connGeneration atomic.Uint64
resetGeneration atomic.Uint64
jsReadySignalCount atomic.Uint64
writeErrCount atomic.Uint64
readErrCount atomic.Uint64
readErrStreak atomic.Uint64
readErrStreakStart atomic.Int64
)

func describeConn(c net.Conn) string {
if c == nil {
return "<nil>"
}
return fmt.Sprintf("%T@%p", c, c)
}

func describeErr(err error) string {
if err == nil {
return "<nil>"
}
return fmt.Sprintf("%T: %v", err, err)
}

func appStateForLog() string {
if kbCtx == nil || kbCtx.MobileAppState == nil {
return "<unknown>"
}
return fmt.Sprintf("%v", kbCtx.MobileAppState.State())
}

func noteReadSuccess(c net.Conn, n int) {
streak := readErrStreak.Swap(0)
startUnix := readErrStreakStart.Swap(0)
if streak == 0 {
return
}
var dur time.Duration
if startUnix > 0 {
dur = time.Since(time.Unix(0, startUnix))
}
log("Go: ReadArr recovered after streak=%d conn=%s nextReadBytes=%d appState=%s duration=%s",
streak, describeConn(c), n, appStateForLog(), dur)
}

func noteReadError(c net.Conn, err error) {
total := readErrCount.Add(1)
streak := readErrStreak.Add(1)
if streak == 1 {
readErrStreakStart.Store(time.Now().UnixNano())
}
if streak <= 5 || streak == 10 || streak%50 == 0 {
log("Go: ReadArr error streak=%d total=%d conn=%s appState=%s err=%s eof=%v",
streak, total, describeConn(c), appStateForLog(), describeErr(err), errors.Is(err, io.EOF))
}
}

// log writes to kbCtx.Log if available, otherwise falls back to fmt.Printf
func log(format string, args ...interface{}) {
msg := fmt.Sprintf(format, args...)
Expand Down Expand Up @@ -469,9 +528,14 @@ func WriteArr(b []byte) (err error) {

n, err := currentConn.Write(bytes)
if err != nil {
total := writeErrCount.Add(1)
log("Go: WriteArr error total=%d conn=%s len=%d appState=%s err=%s",
total, describeConn(currentConn), len(bytes), appStateForLog(), describeErr(err))
return fmt.Errorf("Write error: %s", err)
}
if n != len(bytes) {
log("Go: WriteArr short write conn=%s wrote=%d expected=%d appState=%s",
describeConn(currentConn), n, len(bytes), appStateForLog())
return errors.New("Did not write all the data")
}
return nil
Expand Down Expand Up @@ -507,10 +571,12 @@ func ReadArr() (data []byte, err error) {

n, err := currentConn.Read(buffer)
if n > 0 && err == nil {
noteReadSuccess(currentConn, n)
return buffer[0:n], nil
}

if err != nil {
noteReadError(currentConn, err)
// Attempt to fix the connection
if ierr := Reset(); ierr != nil {
log("failed to Reset: %v", ierr)
Expand All @@ -535,6 +601,8 @@ func ensureConnection() error {
}

var err error
log("ensureConnection: attempting dial listener=%T@%p existingConn=%s appState=%s",
kbCtx.LoopbackListener, kbCtx.LoopbackListener, describeConn(conn), appStateForLog())
conn, err = kbCtx.LoopbackListener.Dial()
if err != nil {
// The listener was closed (isClosed=true, returns syscall.EINVAL). Recreate it and
Expand All @@ -551,10 +619,14 @@ func ensureConnection() error {
log("ensureConnection: Dial failed after restart: %v", err)
return fmt.Errorf("failed to dial after loopback restart: %s", err)
}
log("ensureConnection: loopback server restarted successfully in %v", time.Since(start))
gen := connGeneration.Add(1)
log("ensureConnection: loopback server restarted successfully in %v gen=%d conn=%s appState=%s",
time.Since(start), gen, describeConn(conn), appStateForLog())
return nil
}
log("Go: Established loopback connection in %v", time.Since(start))
gen := connGeneration.Add(1)
log("Go: Established loopback connection in %v gen=%d conn=%s appState=%s",
time.Since(start), gen, describeConn(conn), appStateForLog())
return nil
}

Expand All @@ -563,31 +635,37 @@ func Reset() error {
connMutex.Lock()
defer connMutex.Unlock()

resetID := resetGeneration.Add(1)
log("Go: Reset #%d start conn=%s appState=%s", resetID, describeConn(conn), appStateForLog())
if conn != nil {
conn.Close()
conn = nil
}
if kbCtx == nil || kbCtx.LoopbackListener == nil {
log("Go: Reset #%d complete without listener appState=%s", resetID, appStateForLog())
return nil
}

// Connection will be re-established lazily on next read/write
log("Go: Connection reset, will reconnect on next operation")
log("Go: Connection reset, will reconnect on next operation (reset=%d appState=%s)", resetID, appStateForLog())
return nil
}

// NotifyJSReady signals that the JavaScript side is ready to send/receive RPCs.
// This unblocks the ReadArr loop and allows bidirectional communication.
// jsReadyCh is closed once and stays closed — repeated calls from engine resets are no-ops.
func NotifyJSReady() {
call := jsReadySignalCount.Add(1)
notified := false
jsReadyOnce.Do(func() {
notified = true
log("Go: JS signaled ready, unblocking RPC communication")
log("Go: JS signaled ready, unblocking RPC communication (call=%d appState=%s conn=%s)",
call, appStateForLog(), describeConn(conn))
close(jsReadyCh)
})
if !notified {
log("Go: NotifyJSReady called again (no-op, channel already closed — engine reset?)")
log("Go: NotifyJSReady called again (no-op, channel already closed — engine reset?) call=%d appState=%s conn=%s",
call, appStateForLog(), describeConn(conn))
}
}

Expand Down
82 changes: 75 additions & 7 deletions rnmodules/react-native-kb/ios/Kb.mm
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#import <UIKit/UIKit.h>
#import <UserNotifications/UserNotifications.h>
#import <cstring>
#include <atomic>
#import <jsi/jsi.h>
#import <sys/utsname.h>
#import <objc/runtime.h>
Expand All @@ -24,12 +25,14 @@
using namespace std;
using namespace kb;

extern void *currentRuntime;

// used to keep track of objects getting destroyed on the js side
class KBTearDown : public jsi::HostObject {
public:
KBTearDown() { Tearup(); }
virtual ~KBTearDown() {
NSLog(@"KBTeardown!!!");
NSLog(@"KBTeardown!!! currentRuntime=%p", currentRuntime);
Teardown();
}
virtual jsi::Value get(jsi::Runtime &, const jsi::PropNameID &name) {
Expand Down Expand Up @@ -101,6 +104,13 @@ @implementation Kb

// sanity check the runtime isn't out of sync due to reload etc
void *currentRuntime = nil;
std::atomic<uint64_t> gKbInstallCount{0};
std::atomic<uint64_t> gNotifyJSReadyCount{0};
std::atomic<uint64_t> gReadLoopGeneration{0};

static const char *KBStringOrNil(NSString *value) {
return value ? value.UTF8String : "<nil>";
}

RCT_EXPORT_MODULE()

Expand Down Expand Up @@ -160,6 +170,8 @@ + (void)handlePastedImages:(NSArray<UIImage *> *)images {
}

- (void)invalidate {
NSLog(@"Kb.invalidate: self=%p bridge=%p readQueue=%p jsRuntime=%p currentRuntime=%p scheduler=%p",
self, self.bridge, self.readQueue, [self javaScriptRuntimePointer], currentRuntime, jsScheduler.get());
[[NSNotificationCenter defaultCenter] removeObserver:self];
currentRuntime = nil;
_jsRuntime = nil;
Expand Down Expand Up @@ -200,6 +212,14 @@ - (void)sendToJS:(NSData *)data {
NSLog(@"Failed to find jsi in sendToJS invokeAsync!!!");
return;
}
if (currentRuntime && currentRuntime != jsRuntimePtr) {
NSLog(@"sendToJS: stored runtime mismatch self=%p bridge=%p storedRuntime=%p currentRuntime=%p callbackRuntime=%p",
strongSelf, strongSelf.bridge, jsRuntimePtr, currentRuntime, &jsiRuntime);
}
if (currentRuntime && currentRuntime != &jsiRuntime) {
NSLog(@"sendToJS: callback runtime mismatch self=%p bridge=%p storedRuntime=%p currentRuntime=%p callbackRuntime=%p",
strongSelf, strongSelf.bridge, jsRuntimePtr, currentRuntime, &jsiRuntime);
}

int size = (int)[data length];
if (size <= 0) {
Expand Down Expand Up @@ -311,7 +331,8 @@ - (NSDictionary *)getConstants {
}

RCT_EXPORT_METHOD(engineReset) {
NSLog(@"engineReset: called (JS hot reload), resetting Go engine");
NSLog(@"engineReset: called (JS hot reload), resetting Go engine self=%p bridge=%p jsRuntime=%p currentRuntime=%p scheduler=%p",
self, self.bridge, [self javaScriptRuntimePointer], currentRuntime, jsScheduler.get());
NSError *error = nil;
KeybaseReset(&error);
[self sendEventWithName:metaEventName body:metaEventEngineReset];
Expand All @@ -322,10 +343,15 @@ - (NSDictionary *)getConstants {

RCT_EXPORT_METHOD(notifyJSReady) {
__weak __typeof__(self) weakSelf = self;
uint64_t notifyCount = gNotifyJSReadyCount.fetch_add(1) + 1;

NSLog(@"notifyJSReady: called from JS, queuing main thread block");
NSLog(@"notifyJSReady[%llu]: called from JS, queuing main thread block self=%p bridge=%p jsRuntime=%p currentRuntime=%p scheduler=%p",
(unsigned long long)notifyCount, self, self.bridge, [self javaScriptRuntimePointer], currentRuntime, jsScheduler.get());
dispatch_async(dispatch_get_main_queue(), ^{
NSLog(@"notifyJSReady: main thread block executing");
uint64_t readLoopGen = gReadLoopGeneration.fetch_add(1) + 1;
NSLog(@"notifyJSReady[%llu]: main thread block executing self=%p bridge=%p jsRuntime=%p currentRuntime=%p nextReadLoop=%llu",
(unsigned long long)notifyCount, self, self.bridge, [self javaScriptRuntimePointer], currentRuntime,
(unsigned long long)readLoopGen);
// Setup infrastructure
[[NSNotificationCenter defaultCenter]
addObserver:self
Expand All @@ -336,27 +362,59 @@ - (NSDictionary *)getConstants {

// Signal to Go that JS is ready
KeybaseNotifyJSReady();
NSLog(@"Notified Go that JS is ready, starting ReadArr loop");
NSLog(@"notifyJSReady[%llu]: Notified Go that JS is ready, starting ReadArr loop=%llu queue=%p",
(unsigned long long)notifyCount, (unsigned long long)readLoopGen, self.readQueue);

// Start the read loop
dispatch_async(self.readQueue, ^{
uint64_t consecutiveErrors = 0;
uint64_t totalErrors = 0;
CFAbsoluteTime errorStreakStart = 0;
NSLog(@"ReadArr loop[%llu] start self=%p bridge=%p readQueue=%p jsRuntime=%p currentRuntime=%p scheduler=%p",
(unsigned long long)readLoopGen, self, self.bridge, self.readQueue, [self javaScriptRuntimePointer],
currentRuntime, jsScheduler.get());
while (true) {
{
__typeof__(self) strongSelf = weakSelf;
if (!strongSelf || !strongSelf.bridge) {
NSLog(@"Bridge dead, bailing from ReadArr loop");
NSLog(@"ReadArr loop[%llu] exit: bridge dead self=%p bridge=%p totalErrors=%llu consecutiveErrors=%llu jsRuntime=%p currentRuntime=%p",
(unsigned long long)readLoopGen, strongSelf, strongSelf.bridge, (unsigned long long)totalErrors,
(unsigned long long)consecutiveErrors,
strongSelf ? [strongSelf javaScriptRuntimePointer] : nil, currentRuntime);
return;
}
}

NSError *error = nil;
NSData *data = KeybaseReadArr(&error);
if (error) {
NSLog(@"Error reading data: %@", error);
totalErrors++;
consecutiveErrors++;
if (errorStreakStart == 0) {
errorStreakStart = CFAbsoluteTimeGetCurrent();
}
if (consecutiveErrors <= 5 || consecutiveErrors % 50 == 0) {
__typeof__(self) strongSelf = weakSelf;
NSLog(@"ReadArr loop[%llu] error streak=%llu total=%llu domain=%s code=%ld desc=%s self=%p bridge=%p jsRuntime=%p currentRuntime=%p",
(unsigned long long)readLoopGen, (unsigned long long)consecutiveErrors,
(unsigned long long)totalErrors, KBStringOrNil(error.domain), (long)error.code,
KBStringOrNil(error.localizedDescription), strongSelf, strongSelf.bridge,
strongSelf ? [strongSelf javaScriptRuntimePointer] : nil, currentRuntime);
}
// Back off on error to avoid spinning at ~35K/sec and starving the main thread CPU
// during foreground re-entry (seen during hang investigation: 419K errors in 12s).
[NSThread sleepForTimeInterval:0.1];
} else if (data) {
if (consecutiveErrors > 0) {
CFAbsoluteTime streakDuration = errorStreakStart > 0 ? CFAbsoluteTimeGetCurrent() - errorStreakStart : 0;
__typeof__(self) strongSelf = weakSelf;
NSLog(@"ReadArr loop[%llu] recovered after streak=%llu total=%llu duration=%.3fs nextBytes=%lu self=%p bridge=%p jsRuntime=%p currentRuntime=%p",
(unsigned long long)readLoopGen, (unsigned long long)consecutiveErrors,
(unsigned long long)totalErrors, streakDuration, (unsigned long)data.length,
strongSelf, strongSelf.bridge, strongSelf ? [strongSelf javaScriptRuntimePointer] : nil, currentRuntime);
consecutiveErrors = 0;
errorStreakStart = 0;
}
__typeof__(self) strongSelf = weakSelf;
if (strongSelf) {
[strongSelf sendToJS:data];
Expand All @@ -371,9 +429,19 @@ - (NSDictionary *)getConstants {

RCT_EXPORT_BLOCKING_SYNCHRONOUS_METHOD(install) {
RCTCxxBridge *cxxBridge = (RCTCxxBridge *)self.bridge;
void *previousRuntime = currentRuntime;
_jsRuntime = (jsi::Runtime *)cxxBridge.runtime;
currentRuntime = cxxBridge.runtime;
auto &rnRuntime = *(jsi::Runtime *)cxxBridge.runtime;
jsScheduler = std::make_shared<KBJSScheduler>(rnRuntime, _callInvoker.callInvoker);
uint64_t installCount = gKbInstallCount.fetch_add(1) + 1;
NSLog(@"install[%llu]: self=%p bridge=%p runtime=%p previousRuntime=%p scheduler=%p callInvoker=%p",
(unsigned long long)installCount, self, self.bridge, cxxBridge.runtime, previousRuntime, jsScheduler.get(),
_callInvoker.callInvoker.get());
if (previousRuntime && previousRuntime != cxxBridge.runtime) {
NSLog(@"install[%llu]: runtime changed without a matching invalidate? previousRuntime=%p newRuntime=%p bridge=%p",
(unsigned long long)installCount, previousRuntime, cxxBridge.runtime, self.bridge);
}

// stash the current runtime to keep in sync
auto rpcOnGoWrap = [](Runtime &runtime, const Value &thisValue, const Value *arguments, size_t count) -> Value {
Expand Down